已解决
flink的AggregateFunction,merge方法作用范围
来自网友在路上 161861提问 提问时间:2023-11-09 01:14:48阅读次数: 61
最佳答案 问答题库618位专家为你答疑解惑
背景
AggregateFunction接口是我们经常用的窗口聚合函数,其中有一个merge方法,我们一般情况下也是实现了的,但是你知道吗,其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现
AggregateFunction.merge方法调用时机
AggregateFunction.merge方法其实只有在使用会话窗口进行窗口合并的时候才会用到,如下所示
对应的源码首先查看WindowOperator.processElement方法对要合并的窗口的状态进行合并
public void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows =windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);// if element is handled by none of assigned elementWindowsboolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {MergingWindowSet<W> mergingWindows = getMergingWindowSet();for (W window : elementWindows) {// adding the new window might result in a merge, in that case the actualWindow// is the merged window and we work with that. If we don't merge then// actualWindow == windowW actualWindow =mergingWindows.addWindow(window,new MergingWindowSet.MergeFunction<W>() {@Overridepublic void merge(W mergeResult,Collection<W> mergedWindows,W stateWindowResult,Collection<W> mergedStateWindows)throws Exception {triggerContext.key = key;triggerContext.window = mergeResult;triggerContext.onMerge(mergedWindows);for (W m : mergedWindows) {triggerContext.window = m;triggerContext.clear();deleteCleanupTimer(m);}// 合并窗口的状态windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);}});
继续查看AbstractHeapMergingState.mergeNamespaces方法,
public void mergeNamespaces(N target, Collection<N> sources) throws Exception {if (sources == null || sources.isEmpty()) {return; // nothing to do}final StateTable<K, N, SV> map = stateTable;SV merged = null;// merge the sourcesfor (N source : sources) {// get and remove the next source per namespace/keySV sourceState = map.removeAndGetOld(source);if (merged != null && sourceState != null) {//此处合并状态并调用AggregateFunction.merge方法merged = mergeState(merged, sourceState);} else if (merged == null) {merged = sourceState;}}// merge into the target, if neededif (merged != null) {map.transform(target, merged, mergeTransformation);}
}//真正调用AggregateFunction.merge方法合并自定义的状态
@Override
protected ACC mergeState(ACC a, ACC b) {return aggregateTransformation.aggFunction.merge(a, b);
}
这样AggregateFunction.merge的调用过程就清楚了,实际应用中,我们只需要在使用会话窗口时才需要实现这个方法,其他的基于时间窗口的方式不需要实现这个方法,当然实现了也不会有错
查看全文
99%的人还看了
相似问题
- Kotlin学习——kt里的集合,Map的各种方法之String篇
- Office文件在线预览大全-Word文档在线预览的实现方法-OFD文档在线预览-WPS文件在线预览
- composer切换全局镜像源的方法
- Python通过selenium调用IE11浏览器报错解决方法
- 测试用例的设计方法(全):正交实验设计方法|功能图分析方法|场景设计方发
- Java8新特性 ----- Lambda表达式和方法引用/构造器引用详解
- C#中抽象类、抽象方法和接口暨内联临时变量的精彩表达
- ChatGLM2 大模型微调过程中遇到的一些坑及解决方法(更新中)
- 类方法,静态方法和实例方法的区别及应用场景
- 【链表的说明、方法---顺序表与链表的区别】
猜你感兴趣
版权申明
本文"flink的AggregateFunction,merge方法作用范围":http://eshow365.cn/6-35734-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!
- 上一篇: SQL进阶教程学习笔记
- 下一篇: 查看apk签名