窗口(Window)
在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入"窗口"。
所谓的"窗口",一般就是划定的一段时间范围,也就是"时间窗";对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来我们就深入了解一下Flink中的时间语义和窗口的应用。
1. 窗口的概念
Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的"数据块"进行处理,这就是所谓的"窗口"(Window)。
在Flink中,窗口其实并不是一个"框",应该把窗口理解成一个"桶"。在Flink中,窗口可以把流切割成有限大小的多个"存储桶"(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。
😎注意
Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。比如数据流比作水流,每隔一个小时用桶去装水,flink不会提前每天准备24个桶,基于事件驱动而言,flink当发现有水来了才会创建水桶。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上"触发计算"和"窗口关闭"两个行为也可以分开,这部分内容我们会在后面详述。
2. 窗口的分类
在Flink中,窗口的应用非常灵活,我们可以使用各种不同类型的窗口来实现需求。接下来我们就从不同的角度,对Flink中内置的窗口做一个分类说明。
2.1 按照驱动类型分
窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是"怎样截取数据"。换句话说,就是以什么标准来开始和结束数据的截取,我们把它叫作窗口的"驱动类型"。
- 时间窗口(Time Window)
时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。所以可以说基本思路就是"定点发车"。 - 计数窗口(Count Window)
计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。基本思路是"人齐发车"。
2.2 按照窗口分配数据的规则分类
根据分配数据的规则,窗口的具体实现可以分为4类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。
滚动窗口(Tumbling Windows)
滚动窗口有固定的大小,是一种对数据进行"均匀切片"的划分方式。窗口之间没有重叠,也不会有间隔,是"首尾相接"的状态。这是最简单的窗口形式,每个数据都会被分配到一个窗口,而且只会属于一个窗口。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。
比如我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。滚动窗口应用非常广泛,它可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。
滑动窗口(Sliding Windows)
滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的,而是可以"错开"一定的位置。
定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个"滑动步长"(window slide),它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率。
当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会被同时分配到多个窗口中。而具体的个数,由窗口大小和滑动步长的比值(size/slide)来决定。滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于动步长(size = slide)。滑动窗口适合计算结果更新频率非常高的场景。
会话窗口(Session Windows)
会话窗口,是基于"会话"(session)来来对数据进行分组的。会话窗口只能基于时间来定义。
会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果gap大于size,那么新来的数据就应该属于新的会话窗口而前一个窗口就应该关闭了。会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。会话窗口之间一定是不会重叠的,而且会留有至少为size的间隔(session gap)。在一些类似保持会话的场景下,可以使用会话窗口来进行数据的处理统计。
全局窗口(Global Windows)
"全局窗口",这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义"触发器"(Trigger)。
全局窗口没有结束的时间点,所以一般在希望做更加灵活的窗口处理时自定义使用。Flink中的计数窗口(Count Window),底层就是用全局窗口实现的。
3. 窗口API概览
3.1 按键分区(Keyed)和非按键分区(Non-Keyed)
在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是说,在调用窗口算子之前,是否有keyBy操作。
- 按键分区窗口(Keyed Windows)
经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。
在代码实现上,我们需要先对DataStream调用keyBy()
进行按键分区,然后再调用window()
定义窗口。
stream.keyBy(...)
.window(...)
- 非按键分区(Non-Keyed Windows)
如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。
在代码中,直接基于DataStream调用windowAll()
定义窗口。
stream.windowAll(...)
警告
对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。
3.2 代码中窗口API的调用
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
// 代码总体结构
stream.keyBy(<key selector>)
.window(<window assigner>) // 下面代码详细介绍窗口分配器API
.aggregate(<window function>)
//窗口分配器 基于时间的
// keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口,窗口10s
// keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)) //滑动窗口,窗口10s,滑动步长2s
// keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.minutes(20))) // 会话窗口,超时间隔为20s,20s期间没有数据启动新会话
//窗口分配器 基于计数的
// keyedStream.countWindow(5) // 滚动窗口,窗口长度=5个元素
// keyedStream.countWindow(5,2); // 滑动窗口,窗口长度为5个元素,滑动步长为2
// keyedStream.window(GlobalWindows.create()) // 全局窗口,计数窗口底层就是它,它用的时候需要自定义的时候用
}
其中window()
方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的aggregate()
方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只aggregate()
一种,我们接下来就详细展开讲解。
4. 窗口分配器
定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被"分配"到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型。 窗口分配器最通用的定义方式,就是调用window()
方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用windowAll()
方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。
窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。
4.1 时间窗口
时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。
- 滚动处理时间窗口
窗口分配器由类TumblingProcessingTimeWindows提供,需要调用它的静态方法of()
。
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)
这里of()
方法需要传入一个Time类型的参数size,表示滚动窗口的大小,我们这里创建了一个长度为5秒的滚动窗口。另外of()
还有一个重载方法,可以传入两个Time类型的参数:size和offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(2);
SingleOutputStreamOperator<WaterSensor> streamOperator = env.socketTextStream("hadoop102", 7777).map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
});
KeyedStream<WaterSensor, String> keyedStream = streamOperator.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
});
// 基于时间的
WindowedStream<WaterSensor, String, TimeWindow> window =
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 滚动窗口,窗口10s
window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String startStr = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
String endStr = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");
long size = elements.spliterator().estimateSize();
out.collect("key="+key+"窗口:("+startStr + ", " + endStr+")收集到了"+size+"条数据==>"+elements);
}
}).print();
env.execute();
}
运行结果:
- 滑动处理时间窗口
窗口分配器由类SlidingProcessingTimeWindows提供,同样需要调用它的静态方法of()
。
stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.aggregate(...)
这里of()
方法需要传入两个Time类型的参数:size和slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为10秒、滑动步长为5秒的滑动窗口。滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(2);
SingleOutputStreamOperator<WaterSensor> streamOperator = env.socketTextStream("hadoop102", 7777).map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
});
KeyedStream<WaterSensor, String> keyedStream = streamOperator.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
});
// 基于时间的
WindowedStream<WaterSensor, String, TimeWindow> window =
keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));// 滑动窗口,窗口10s, 步长 5s
window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String startStr = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
String endStr = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");
long size = elements.spliterator().estimateSize();
out.collect("key="+key+"窗口:("+startStr + ", " + endStr+")收集到了"+size+"条数据==>"+elements);
}
}).print();
env.execute();
}
运行结果:
- 处理时间会话窗口
窗口分配器由类ProcessingTimeSessionWindows提供,需要调用它的静态方法withGap()
或者withDynamicGap()
。
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
这里withGap()
方法需要传入一个Time类型的参数size,表示会话的超时时间,也就是最小间隔session gap。我们这里创建了静态会话超时时间为10秒的会话窗口。
另外,还可以调用withDynamicGap()
方法定义session gap的动态提取逻辑。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(2);
SingleOutputStreamOperator<WaterSensor> streamOperator = env.socketTextStream("hadoop102", 7777).map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
});
KeyedStream<WaterSensor, String> keyedStream = streamOperator.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
});
// 基于时间的
WindowedStream<WaterSensor, String, TimeWindow> window =
// keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));// 会话窗口,间隔5s
// 动态间隔时间,这里使用输入的vc值作为间隔时间,每次输入都会被重置间隔时间
keyedStream.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<WaterSensor>() {
@Override
public long extract(WaterSensor element) {
// 返回要求毫秒值
return element.getVc()*1000;
}
}));
window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String startStr = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
String endStr = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");
long size = elements.spliterator().estimateSize();
out.collect("key="+key+"窗口:("+startStr + ", " + endStr+")收集到了"+size+"条数据==>"+elements);
}
}).print();
env.execute();
}
运行结果:
- 滚动事件时间窗口
窗口分配器由类TumblingEventTimeWindows提供,用法与滚动处理事件窗口完全一致。
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)
- 滑动事件时间窗口
窗口分配器由类SlidingEventTimeWindows提供,用法与滑动处理事件窗口完全一致。
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.aggregate(...)
- 事件时间会话窗口
窗口分配器由类EventTimeSessionWindows提供,用法与处理事件会话窗口完全一致。
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
4.2 计数窗口
计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用countWindow()
方法。根据分配规则的不同,又可以分为滚动计数窗口和滑动计数窗口两类,下面我们就来看它们的具体实现。
- 滚动计数窗口
滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。
stream.keyBy(...)
.countWindow(10)
我们定义了一个长度为10的滚动计数窗口,当窗口中元素数量达到10的时候,就会触发计算执行并关闭窗口。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(2);
SingleOutputStreamOperator<WaterSensor> streamOperator = env.socketTextStream("hadoop102", 7777).map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
});
KeyedStream<WaterSensor, String> keyedStream = streamOperator.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
});
// 基于时间的
WindowedStream<WaterSensor, String, GlobalWindow> window = keyedStream.countWindow(5L); // 计数窗口, 长度为5
window.process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long size = elements.spliterator().estimateSize();
out.collect("key=" + key + "计数窗口,收到:" + size+"条数据==>"+elements);
}
}).print();
env.execute();
}
运行结果: 2. 滑动计数窗口
与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。
stream.keyBy(...)
.countWindow(10L, 3)
我们定义了一个长度为10、滑动步长为3的滑动计数窗口。每个窗口统计10个数据,每隔3个数据就统计输出一次结果。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(2);
SingleOutputStreamOperator<WaterSensor> streamOperator = env.socketTextStream("hadoop102", 7777).map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
});
KeyedStream<WaterSensor, String> keyedStream = streamOperator.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
});
//计数窗口,窗口长度为5,滑动步长为2
WindowedStream<WaterSensor, String, GlobalWindow> window =
keyedStream.countWindow(5L, 2);
window.process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long size = elements.spliterator().estimateSize();
out.collect("key=" + key + "计数窗口,收到:" + size+"条数据==>"+elements);
}
}).print();
env.execute();
}
运行结果:
- 全局窗口
全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用window()
,分配器由GlobalWindows类提供。
stream.keyBy(...)
.window(GlobalWindows.create());
需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。
5. 窗口处理函数
定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的"窗口函数"(window functions)。 窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。
5.1 增量聚合函数(ReduceFunction/AggregateFunction)
窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是"增量聚合"。典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。
- 归约函数ReduceFunction
public class FlinkWindowFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> streamOperator = env
.socketTextStream("hadoop102", 7777)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
});
KeyedStream<WaterSensor, String> keyedStream =
streamOperator.keyBy(sensor -> sensor.getId());
WindowedStream<WaterSensor, String, TimeWindow> windowDs =
// 设置滚动事件时间窗口
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
/**
* 窗口函数: 增量聚合reduce
* 1. 相同的key的第一条数据来的时候,不会调用reduce
* 2. 增量聚合: 来一条,就会计算一条, 但不会输出
* 3. 在窗口触发的时候,才会输出当前窗口的计算结果
*/
SingleOutputStreamOperator<WaterSensor> ds = windowDs.reduce(new ReduceFunction<WaterSensor>() {
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("调用reduce方法,之前的结果:"+value1 + ",现在来的数据:"+value2);
return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
}
});
ds.print();
env.execute();
}
}
运行结果: 2. 聚合函数AggregateFunction
ReduceFunction可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。Flink Window API中的aggregate就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction的实现类作为参数。
AggregateFunction可以看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型IN就是输入流中元素的数据类型;累加器类型ACC则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。接口中有四个方法:
createAccumulator()
:创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。add()
:将输入的元素添加到累加器中。getResult()
:从累加器中提取聚合的输出结果。merge()
:合并两个累加器,并将合并后的状态作为一个累加器返回。
所以可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()
为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()
方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()
方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。
public class WindowAggregateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
DataStream<WaterSensor> lines = env.socketTextStream("hadoop102", 7777).map(
value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
);
lines.keyBy(value -> value.getId())
// 1. 窗口分配器
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
/**
* 第一个泛型: 输入数据的类型
* 第二个泛型: 累加器的类型,存储中间结果的类型
* 第三个泛型: 输出的类型
*/
.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {
// 初始化累加器
@Override
public Integer createAccumulator() {
System.out.println("创建累加器");
return 0;
}
// 计算逻辑
@Override
public Integer add(WaterSensor value, Integer accumulator) {
System.out.println("调用add方法,value="+value);
return accumulator + value.getVc();
}
// 获取窗口触发时的最终结果
@Override
public String getResult(Integer accumulator) {
System.out.println("调用getResult方法");
return accumulator.toString();
}
// 只有会话窗口才会用到,其他方式用不到
@Override
public Integer merge(Integer a, Integer b) {
System.out.println("调用merge方法");
return null;
}
})
.print();
env.execute();
}
}
程序运行结果:
总结-增量聚合Aggregate
- 属于本窗口的第一条数据来,创建窗口,创建累加器
- 增量聚合: 来一条计算一条, 调用一次add方法
- 窗口输出时调用一次getResult方法
- 输入,中间值累加器、输出的类型都可以不一样,非常灵活
- 全窗口函数full window functions
有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。所以,我们还需要有更丰富的窗口计算方式。窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
在Flink中,全窗口函数也有两种:WindowFunction和ProcessWindowFunction。
窗口函数(WindowFunction): WindowFunction字面上就是"窗口函数",它其实是老版本的通用窗口函数接口。我们可以基于WindowedStream调用apply()
方法,传入一个WindowFunction的实现类。
stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。不过WindowFunction能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用。
处理窗口函数(ProcessWindowFunction):ProcessWindowFunction是Window API中最底层的通用窗口函数接口。之所以说它"最底层",是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个"上下文对象"(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。事实上,ProcessWindowFunction是Flink底层API——处理函数(process function)中的一员,关于处理函数我们会在后续章节展开讲解。
public class WindowFullDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
DataStream<WaterSensor> lines = env.socketTextStream("hadoop102", 7777).map(
value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
);
lines.keyBy(value -> value.getId())
// 1. 窗口分配器
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
// 老的写法
// .apply(new WindowFunction<WaterSensor, String, String, TimeWindow>() {
// /**
// * @param s 分组的key
// * @param window 窗口对象
// * @param input 保存的数据
// * @param out 采集器
// * @throws Exception
// */
// @Override
// public void apply(String s, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {
//
// }
// })
// 全窗口函数计算逻辑: 窗口触发时,才会调用一次,统一计算窗口的所有资源
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
/**
*
* @param s 分组的kdy
* @param context 上下文
* @param elements 保存的数据
* @param out 收集器
* @throws Exception
*/
@Override
public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
// context很灵活,可以得到window,可以分流 等等
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "YYYY-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "YYYY-MM-dd HH:mm:ss.SSS");
long size = elements.spliterator().estimateSize();
out.collect("key="+s+"的窗口("+windowStart + "," + windowEnd + ")包含" + size + "条数据==>" + size);
}
})
.print();
env.execute();
}
}
运行结果: 4. 增量聚合和全窗口函数的结合使用
在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink的Window API就给我们实现了这样的用法。我们之前在调用WindowedStream的reduce()
和aggregate()
方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。
// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,WindowFunction<T,R,K,W> function)
// ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,ProcessWindowFunction<T,R,K,W> function)
// AggregateFunction与WindowFunction结合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T,ACC,V> aggFunction,WindowFunction<V,R,K,W> windowFunction)
// AggregateFunction与ProcessWindowFunction结合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T,ACC,V> aggFunction,
ProcessWindowFunction<V,R,K,W> windowFunction)
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。
具体实现代码如下:
public class WindowComplexDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
DataStream<WaterSensor> lines = env.socketTextStream("hadoop102", 7777).map(
value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
);
lines.keyBy(value -> value.getId())
// 1. 窗口分配器
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
// 2. 传入两个自定义窗口函数
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
.print();
env.execute();
}
}
public class MyAggregateFunction implements AggregateFunction<WaterSensor, Integer, String> {
// 初始化累加器
@Override
public Integer createAccumulator() {
System.out.println("创建累加器");
return 0;
}
// 计算逻辑
@Override
public Integer add(WaterSensor value, Integer accumulator) {
System.out.println("调用add方法,value="+value);
return value.getVc() + accumulator;
}
@Override
public String getResult(Integer accumulator) {
System.out.println("调用getResult方法");
return accumulator.toString();
}
@Override
public Integer merge(Integer a, Integer b) {
System.out.println("调用merge方法");
return 0;
}
}
// 需要注意的时候,MyProcessWindowFunction作为第二个参数,它的泛型IN并不是WaterSensor,而是第一个函数的结果String
public class MyProcessWindowFunction extends ProcessWindowFunction<String, String, String, TimeWindow>{
@Override
public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
// context很灵活,可以得到window,可以分流 等等
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "YYYY-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "YYYY-MM-dd HH:mm:ss.SSS");
long size = elements.spliterator().estimateSize();
out.collect("key="+s+"的窗口("+windowStart + "," + windowEnd + ")包含" + size + "条数据==>" + size);
}
}
增量聚合Aggregate+全窗口process总结
- 增量聚合函数处理数据: 来一条计算一条
- 窗口触发时, 增量聚合的结果(只有一条) 传递给 全窗口函数
- 经过全窗口函数的处理包装后,输出
结合两者的优点:
- 增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少
- 全窗口函数: 可以通过 上下文 实现灵活的功能
6. 其他API
对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink还提供了其他一些可选的API,让我们可以更加灵活地控制窗口行为。
6.1 触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算。所谓的"触发计算",本质上就是执行窗口处理函数,所以可以认为是计算得到结果并输出的过程。基于WindowedStream调用trigger()
方法,就可以传入一个自定义的窗口触发器(Trigger)。
stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())
6.2 移除器(Evictor)
移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用evictor()
方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
stream.keyBy(...)
.window(...)
.evictor(new MyEvictor())
总结
- 窗口什么时候触发输出?
以时间滚动窗口为例,时间进展 >= 窗口的最大时间戳(end-1ms) - 窗口是怎么划分的?
其实就是:start = 向下取整,取窗口的整数倍; end = start + 窗口长度
比如窗口长度10,3s时候来了一条数据,按照代码逻辑公式:(3-0)%10=3,此时remainder=3;
timestamp为当前毫秒值,也就是3, 窗口的起始时间为3-3=0,也就是当前数据在[0, 10)范围内。
- 窗口的生命周期?
创建:属于本窗口的第一条数据来的时候,现New的,放入一个singleton单例的集合中。
销毁(关窗):时间进展 >= 窗口的最大时间戳(end-1ms) + 允许迟到的时间(默认为0)