当前位置:首页 > 编程笔记 > 正文
已解决

【API篇】八、Flink窗口函数

来自网友在路上 147847提问 提问时间:2023-10-26 00:36:07阅读次数: 47

最佳答案 问答题库478位专家为你答疑解惑

文章目录

  • 1、增量聚合之ReduceFunction
  • 2、增量聚合之AggregateFunction
  • 3、全窗口函数full window functions
  • 4、增量聚合函数搭配全窗口函数
  • 5、会话窗口动态获取间隔值
  • 6、触发器和移除器
  • 7、补充

//窗口操作
stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

上一节的窗口分配器,指明了窗口类型,知道了数据属于哪个窗口并收集。而窗口函数,则是定义如何对这些数据做计算操作。

在这里插入图片描述

  • 增量聚合来一条数据,计算一条数据,窗口触发的时候输出计算结果
  • 全窗口函数数据来了不计算,存起来,窗口触发的时候,计算并输出计算结果

1、增量聚合之ReduceFunction

public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()).keyBy(r -> r.getId())// 设置滚动事件时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(30))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("调用reduce方法,value1=:"+value1 + ",value2=:"+value2);return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc()+value2.getVc());}}).print();env.execute();}
}

运行,输入数据,查看控制台:

在这里插入图片描述

2、增量聚合之AggregateFunction

上面使用ReduceFunction的限制是,输入数据的类型、聚合中间状态的类型、输出结果的类型必须一致,AggregateFunction则没有这个限制。AggregateFunction接口有四个方法:

  • createAccumulator:创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add:将输入的元素添加到累加器中。
  • getResult:从累加器中提取聚合的输出结果。
  • merge:合并两个累加器,并将合并后的状态作为一个累加器返回

AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果

public class WindowAggregateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());    //自定义的实现类,String转自定义对象WaterSensorKeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}//value即输入的数据,accumulator即之前的计算结果@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add方法,value="+value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用merge方法");return null;}});aggregate.print();env.execute();}
}

运行,输入数据,查看控制台:

在这里插入图片描述

3、全窗口函数full window functions

全窗口函数,即数据来了不计算,存起来,窗口触发的时候,计算并输出计算结果Flink全窗口函数有两种,第一种为apply方法下的:

stream.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());

传入一个WindowFunction的实现类,该方法已被第二种ProcessWindowFunction全覆盖,因而逐渐弃用。ProcessWindowFunction除了可以拿到窗口中的所有数据之外,还可以获取到一个“上下文对象”(Context),通过这个上下文对象,可以获取窗口对象、窗口处理时间、事件时间水位线

public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {/*** 全窗口函数计算逻辑,窗口结束时触发才调用一次* s 分组的key* context 上下文对象* elements 窗口内存的所有数据* out 采集器对象*/@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long count = elements.spliterator().estimateSize();long windowStartTs = context.window().getStart();long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();env.execute();}
}

效果:

在这里插入图片描述

在这里插入图片描述

4、增量聚合函数搭配全窗口函数

可以看出,增量和全窗口各有好处:

  • 增量聚合下,来一条计算一条,只存储中间计算结果,占用空间少
  • 全窗口函数则是可以通过上下文对象来实现灵活的功能

像同时拥有两者的优点,可以调用aggregate方法的另一个重载方法:

在这里插入图片描述

// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<TRKW> function) // ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)// AggregateFunction与WindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)// AggregateFunction与ProcessWindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,ProcessWindowFunction<VRKW> windowFunction)

此时:

  • 基于第一个参数,即增量聚合函数,来处理数据,来一条聚合一条
  • 窗口触发后,调用第二个参数的处理逻辑,此时,把增量聚合的结果(只有一条数据)再传递给全窗口函数,也就是说全窗口的Iterable<> elements,长度为1,注意全窗口不再缓存所有数据
  • 经过全窗口,执行处理和包装,再输出
public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));//sensorWS.reduce()   //也可以传两个SingleOutputStreamOperator<String> result = sensorWS.aggregate(new MyAgg(),new MyProcess());result.print();env.execute();}}
public  class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add方法,value="+value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用merge方法");return null;}}// 全窗口函数的输入类型 = 增量聚合函数的输出类型
public  class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{@Overridepublic void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}
}

注意,二者搭配时,根据前面分析,可以知道,必有:增量聚合函数的输出类型 = 全窗口函数的输入类型

5、会话窗口动态获取间隔值

到此,窗口API需要的窗口分配器(见上一篇)和窗口函数都已整理完。上面demo中用的窗口分配器都是滚动窗口,但应该有以下这些:

  • 时间滚动窗口
  • 时间滑动窗口
  • 时间会话窗口
  • 计数滚动窗口
  • 计数滑动窗口

这里再记录下时间会话窗口+动态获取会话间隔:

public class WindowSessionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(ProcessingTimeSessionWindows.withDynamicGap(t -> t.getTs() * 1000L));SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long count = elements.spliterator().estimateSize();long windowStartTs = context.window().getStart();long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();env.execute();}
}

来一条数据,根据这条数据获取一个值做为会话间隔,到达这个间隔前,下条数据到来了,则会话间隔又成了另一个值,动态的。运行:

在这里插入图片描述

可以看到,会话间隔动态获取,到达间隔时下条数据还没来,则结束本窗户,窗口口结束时触发才调用一次process,和分析的一致。最后补充一点,展开demo代码里的Lambda表达式,其实是一个抓取会话间隔的方法,定义了会话窗口间隔的获取逻辑。

在这里插入图片描述

再贴个计数滑动窗口:

在这里插入图片描述

6、触发器和移除器

触发器主要是用来控制窗口什么时候触发计算,即什么时候执行窗口函数

//基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)
stream.keyBy(...).window(...).trigger(new MyTrigger())

移除器主要用来定义移除某些数据的逻辑

基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。

stream.keyBy(...).window(...).evictor(new MyEvictor())

Flink提供的几个窗口,比如滑动、滚动等,都有对触发器和移除器的默认实现,不用自定义。

7、补充

窗口的划分:

  • 窗口开始时间start是窗口长度的整数倍,向下取整

在这里插入图片描述
在这里插入图片描述

  • 窗口结束时间是start+窗口长度

在这里插入图片描述
在这里插入图片描述

  • 窗口是左闭右开,因为属于本窗口的最大时间戳为end-1

在这里插入图片描述

  • 窗口的生命周期,创建是属于本窗口的第一条数据来的时候,现new的,放入一个singleton单例的集合中
  • 窗口的销毁是时间的进展 >= 窗口的最大时间戳(end-1ms) + 允许迟到的时间(默认0)
  • 窗口什么时候触发输出:当时间进展 >= 窗口的最大时间戳(end -1ms)
查看全文

99%的人还看了

猜你感兴趣

版权申明

本文"【API篇】八、Flink窗口函数":http://eshow365.cn/6-24608-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!