当前位置:首页 > 编程笔记 > 正文
已解决

flink的AggregateFunction,merge方法作用范围

来自网友在路上 161861提问 提问时间:2023-11-09 01:14:48阅读次数: 61

最佳答案 问答题库618位专家为你答疑解惑

背景

AggregateFunction接口是我们经常用的窗口聚合函数,其中有一个merge方法,我们一般情况下也是实现了的,但是你知道吗,其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现

AggregateFunction.merge方法调用时机

AggregateFunction.merge方法其实只有在使用会话窗口进行窗口合并的时候才会用到,如下所示
在这里插入图片描述

对应的源码首先查看WindowOperator.processElement方法对要合并的窗口的状态进行合并

public void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows =windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);// if element is handled by none of assigned elementWindowsboolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {MergingWindowSet<W> mergingWindows = getMergingWindowSet();for (W window : elementWindows) {// adding the new window might result in a merge, in that case the actualWindow// is the merged window and we work with that. If we don't merge then// actualWindow == windowW actualWindow =mergingWindows.addWindow(window,new MergingWindowSet.MergeFunction<W>() {@Overridepublic void merge(W mergeResult,Collection<W> mergedWindows,W stateWindowResult,Collection<W> mergedStateWindows)throws Exception {triggerContext.key = key;triggerContext.window = mergeResult;triggerContext.onMerge(mergedWindows);for (W m : mergedWindows) {triggerContext.window = m;triggerContext.clear();deleteCleanupTimer(m);}// 合并窗口的状态windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);}});

继续查看AbstractHeapMergingState.mergeNamespaces方法,

public void mergeNamespaces(N target, Collection<N> sources) throws Exception {if (sources == null || sources.isEmpty()) {return; // nothing to do}final StateTable<K, N, SV> map = stateTable;SV merged = null;// merge the sourcesfor (N source : sources) {// get and remove the next source per namespace/keySV sourceState = map.removeAndGetOld(source);if (merged != null && sourceState != null) {//此处合并状态并调用AggregateFunction.merge方法merged = mergeState(merged, sourceState);} else if (merged == null) {merged = sourceState;}}// merge into the target, if neededif (merged != null) {map.transform(target, merged, mergeTransformation);}
}//真正调用AggregateFunction.merge方法合并自定义的状态
@Override
protected ACC mergeState(ACC a, ACC b) {return aggregateTransformation.aggFunction.merge(a, b);
}

这样AggregateFunction.merge的调用过程就清楚了,实际应用中,我们只需要在使用会话窗口时才需要实现这个方法,其他的基于时间窗口的方式不需要实现这个方法,当然实现了也不会有错

查看全文

99%的人还看了

猜你感兴趣

版权申明

本文"flink的AggregateFunction,merge方法作用范围":http://eshow365.cn/6-35734-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!