当前位置: 首页> 房产> 建筑 > 免费网站建站 知乎_河北邯郸大风_潜江seo_做外贸用什么软件找客户

免费网站建站 知乎_河北邯郸大风_潜江seo_做外贸用什么软件找客户

时间:2025/7/29 20:58:02来源:https://blog.csdn.net/gegeyanxin/article/details/146984422 浏览次数:2次
免费网站建站 知乎_河北邯郸大风_潜江seo_做外贸用什么软件找客户

异步需要考虑的问题其实很多。所以一般情况下能同步解决的就用同步。

这次是有个数据源QPS太高了。20万以上,同步吞吐跟不上。或者同样的处理速率,同步比异步消耗资源多太多。所以就想到了异步方法。需要外部数据源也支持异步访问。刚好是支持的。


异步IO官方文档:异步 I/O | Apache Flink



 核心操作:

SingleOutputStreamOperator<ArrayList<UsageToPegasus>> appIdStreamCompute = AsyncDataStream.orderedWait(appIdStream,new MapAsynProcessState(clusterName, StatePegasusName, stateTtl),asynTimeout,TimeUnit.MILLISECONDS,asynStateCapacity).uid("uid1").name("name1").returns(arrayListTypeInfo).disableChaining();

异步方法内部:

public class MapAsynProcessState extends RichAsyncFunction<AppIdState, ArrayList<UsageToPegasus>> {public void open(Configuration parameters) throws Exception {super.open(parameters);client = PegasusClientFactory.createClient(properties);table = client.openTable(StatePegasusName);RuntimeContext runtimeContext = getRuntimeContext();OperatorMetricGroup metricGroup = runtimeContext.getMetricGroup();// 注册监控指标successCounter = metricGroup.counter("xxx1"); //falcon监控用。xxx1为指标名称failureCounter = metricGroup.counter("xxx");timeoutCounter = metricGroup.counter("xxx");}//核心方法
public void asyncInvoke(AppIdState appState, ResultFuture<ArrayList<UsageToPegasus>> resultFuture) throws Exception {table.asyncMultiGet(appState.getHashKey().getBytes(), null, 30000).addListener((PegasusTableInterface.MultiGetListener) future -> {String gaid = appState.getHashKey();if (future.isSuccess()) {PegasusTableInterface.MultiGetResult res = future.get();ConcurrentHashMap<Long, long[]> one_hour_map = new ConcurrentHashMap<>();ConcurrentHashMap<Long, long[]> six_hour_map = new ConcurrentHashMap<>();处理逻辑……successCounter.inc();
//                            successRateMeter.markEvent();
//结果回收 (非常重要)                            resultFuture.complete(Collections.singleton(longMapToResult(id, one_hour_map, six_hour_map, useLatest, current15minTimestamp)));} //asyncMultiGet监听到结果处理完成else { //asyncMultiGet监听成功,但没结果
//每条路都要有结果回收 (非常重要)    
//有些异常可以返回空,但是也需要返回resultFuture.complete(Collections.emptyList());resultFuture.complete(Collections.singleton(longMapToResult(id, one_hour_map, six_hour_map, useLatest, current15minTimestamp)));System.out.println("本次查询成功,但是结果为空,这个用户当前尚没有记录");}}}

注意结果回收:resultFuture.complete(Collections.singleton(结果对象))

//有些异常可以返回空,但是返回这个动作不能省。 resultFuture.complete(Collections.emptyList());

之前有出现过一个bug。异常情况没有回收,flink程序卡住运行不了。加上回收以后就好了。

写部分:

写部分:{Future<Void> setFuture = table.asyncSet(appState.getHashKey().getBytes(),appState.getSortKey().getBytes(),MapSerializeAndDeserialize.serializeHashMap(appState.getAppState()),stateTtl,0);setFuture.addListener((PegasusTableInterface.SetListener) future ->{if (future.isSuccess()) {stateSetSuccessCounter.inc();} else {stateSetFailureCounter.inc();System.err.println("写状态失败的原因是"+future.cause());}});

另外的示例:

inal Future<Void> pegasusResultFuture = this.table.asyncSet(pegasusKey.getBytes(), realtimeSampleSortKey, streamSample.getFeatureRowBytes(), ttl, 10000);CompletableFuture.supplyAsync(  // 写Pegasus() -> {try {markEventSync(metricGroup, counterMap, "pegasus_write", ENV);return pegasusResultFuture.get();} catch (Exception e) {markEventSync(metricGroup, counterMap, "pegasus_write_err_1", ENV);LOGGER.error("Pegasus write error: ", e);return null;}}).handle((Void dbResult, Throwable throwable) -> {// 异步查询异常,可进行重试if (throwable != null) {markEventSync(metricGroup, counterMap, "pegasus_write_err_2", ENV);LOGGER.warn("Future complete error: ", throwable);resultFuture.completeExceptionally(throwable);return "fail";}markEventSync(metricGroup, counterMap, "pegasus_write_success", ENV);resultFuture.complete(Collections.singleton(streamSample));return "success";});}

关键字:免费网站建站 知乎_河北邯郸大风_潜江seo_做外贸用什么软件找客户

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: