水位线(Watermark)
1. 事件时间和窗口
在窗口的处理过程中,我们可以基于数据的时间戳,自定义一个"逻辑时钟"。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。 这样的好处在于,计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计处理,得到的结果都是正确的。而一般实时流处理的场景中,事件时间可以基本与处理时间保持同步,只是略微有一点延迟,同时保证了窗口计算的正确性。
2. 什么是水位线
在Flink中,用来衡量事件时间进展的标记,就被称作"水位线"(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
2.1 有序流中的水位线
- 理想状态(数据量小),数据应该按照生成的先后顺序进入流中,每条数据产生一个水位线;
- 实际应用中,如果当前数据量非常大,且同时涌来的数据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线。
2.2 乱序流中的水位线
在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是所谓的"乱序数据"。
乱序 + 数据量小: 我们还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。不过现在的情况是数据乱序,所以插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。乱序 + 数据量大: 如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线。
乱序 + 迟到数据: 我们无法正确处理"迟到"的数据。为了让窗口能够正确收集到迟到的数据,我们也可以等上一段时间,比如2秒;也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。这样的话,9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,这时迟到数据也都已收集齐,0~9秒的窗口就可以正确计算结果了。
现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。
3. 水位线特性
- 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据。
- 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展。
- 水位线是基于数据的时间戳生成的。
- 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进。
- 水位线可以通过设置延迟,来保证正确处理乱序数据。
- 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ ≤ t的数据。
提示
水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。
4. 水位线和窗口的工作原理
误解:我们定义一个时间窗口,每10秒统计一次数据,那么就相当于把窗口放在那里,从0秒开始收集数据;到10秒时,处理当前窗口内所有数据,输出一个结果,然后清空窗口继续收集数据;到20秒时,再对窗口内所有数据进行计算处理,输出结果;依次类推。
问题描述:对于处理时间下的窗口而言,这样理解似乎没什么问题。然而如果我们采用事件时间语义,就会有些费解了。由于有乱序数据,我们需要设置一个延迟时间来等所有数据到齐。比如上面的例子中,我们可以设置延迟时间为2秒,这样0~10秒的窗口会在时间戳为12的数据到来之后,才真正关闭计算输出结果,这样就可以正常包含迟到的9秒数据了。
但是这样一来,0~10秒的窗口不光包含了迟到的9秒数据,连11秒和12秒的数据也包含进去了。我们为了正确处理迟到数据,结果把早到的数据划分到了错误的窗口——最终结果都是错误的。正确理解:在Flink中,窗口其实并不是一个"框",应该把窗口理解成一个"桶"。在Flink中,窗口可以把流切割成有限大小的多个"存储桶"(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。
总结
Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上"触发计算"和"窗口关闭"两个行为也可以分开,这部分内容我们会在后面详述。
5. 生成水位线
5.1 生成水位线的总体原则
完美的水位线是"绝对正确"的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。
5.2 水位线生成策略
watermark的生成方式本质上是有两种:周期性生成和标记生成。在Flink的DataStream API中,有一个单独用于生成水位线的方法:assignTimestampsAndWatermarks()
,它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。
说明:WatermarkStrategy作为参数,这就是所谓的"水位线生成策略"。WatermarkStrategy是一个接口,该接口中包含了一个"时间戳分配器"TimestampAssigner和一个"水位线生成器"WatermarkGenerator。
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>,
WatermarkGeneratorSupplier<T>{
// 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
// 主要负责按照既定的方式,基于时间戳生成水位线
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
5.3 Flink内置水位线
- 有序流中内置水位线设置
对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()
方法就可以实现。
/**
* 时间有序数据流
*/
public class FlinkWaterMarkMonoDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
DataStream<WaterSensor> lines = env.socketTextStream("192.168.73.44", 7777)
.map(value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
});
// 1.定义Watermark策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 1.1 指定watermark生成:升序的watermark,没有等待时间
.<WaterSensor>forMonotonousTimestamps()
// 1.2 指定 时间戳分配器,从数据中提取得到时间戳
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
System.out.println("当前数据" + element + ", recordTimestamp: " + recordTimestamp);
// 简单起见,输入就直接是秒值,这里要求毫秒值,需要乘以1000
return element.getTs() * 1000;
}
});
SingleOutputStreamOperator<WaterSensor> watermarkStream =
// 2. 指定使用我们定义的 watermark策略
lines.assignTimestampsAndWatermarks(watermarkStrategy);
WindowedStream<WaterSensor, String, TimeWindow> window = watermarkStream.keyBy(value -> value.getId())
// 3.使用 事件时间语义 的窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)));
window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "mm:ss.SSS");
long size = elements.spliterator().estimateSize();
out.collect("key="+key+"窗口:("+windowStart + ", " + windowEnd+")收集到了"+size+"条数据==>"+elements);
}
}).print();
env.execute();
}
}
运行结果,可见如果输入数据ts字段的值大于等于10的倍数就会触发窗口计算:
- 乱序流中内置水位线设置
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()
方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示"最大乱序程度",它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
/**
* 时间乱序数据流
*/
public class FlinkWatermarkOutOfOrderDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
DataStream<WaterSensor> lines = env.socketTextStream("192.168.73.44", 7777)
.map(value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
});
// 1.定义Watermark策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 1.1 指定watermark生成: 乱序的watermark,等待时间 5s
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// 1.2 指定 时间戳分配器,从数据中提取得到时间戳
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
System.out.println("当前数据" + element + ", recordTimestamp: " + recordTimestamp);
// 简单起见,输入就直接是秒值,这里要求毫秒值,需要乘以1000
return element.getTs() * 1000;
}
});
SingleOutputStreamOperator<WaterSensor> watermarkStream =
// 2. 指定使用我们定义的 watermark策略
lines.assignTimestampsAndWatermarks(watermarkStrategy);
WindowedStream<WaterSensor, String, TimeWindow> window = watermarkStream.keyBy(value -> value.getId())
// 3.使用 事件时间语义 的窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)));
window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "mm:ss.SSS");
long size = elements.spliterator().estimateSize();
out.collect("key="+key+"窗口:("+windowStart + ", " + windowEnd+")收集到了"+size+"条数据==>"+elements);
}
}).print();
env.execute();
}
}
运行结果:
5.4 自定义水位线生成器
- 周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过onEvent()
观察判断输入的事件,而在onPeriodicEmit()
里发出水位线。下面是一段自定义周期性生成水位线的代码:
/**
* 可以基于事件或者周期性的生成 watermark
*/
public class MyWatermarkStrategyGenerator<T> implements WatermarkGenerator<T> {
private long delayTs = 0L; // 延迟时间
private long maxTs = 0;
public MyWatermarkStrategyGenerator(long delayTs){
this.delayTs = delayTs;
this.maxTs = Long.MIN_VALUE + this.delayTs + 1;
}
/**
* 每条数据来都会调用一次,用来获取最大的事件时间,可以用来生成新的 watermark
* @param event
* @param eventTimestamp 提取到的数据的中的 事件时间
* @param output
*/
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTs = Math.max(maxTs, eventTimestamp);
System.out.println("调用onEvent方法,获取最大的时间戳= "+maxTs);
}
/**
* 周期性的调用,可以用来生成新的 watermark
* <p>调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。
*/
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 这里使用基于周期生成水位线
output.emitWatermark(new Watermark(maxTs - delayTs - 1L));
System.out.println("调用onPeriodicEmit方法,获取最大的时间戳= "+maxTs);
}
}
public class CustomWatermarkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
// 设置周期2000ms, 默认为200ms
env.getConfig().setAutoWatermarkInterval(2000);
DataStream<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777)
.map(value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
});
// 1.使用自定义 Watermark策略, 设置延迟时间为 3s
WatermarkStrategy customWatermarkStrategy =
WatermarkStrategy
.forGenerator(ctx -> new MyWatermarkStrategyGenerator<WaterSensor>(3000L))
// 从数据中提取得到时间戳, 以便后续使用
.withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000L));
// 2. 指定使用我们定义的 watermark策略
SingleOutputStreamOperator<WaterSensor> sensorDSwithWaterMark = sensorDS
.assignTimestampsAndWatermarks(customWatermarkStrategy);
sensorDSwithWaterMark.keyBy(waterSensor -> waterSensor.getId())
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "mm:ss.SSS");
long size = elements.spliterator().estimateSize();
out.collect("key=" + key + "窗口:(" + windowStart + ", " + windowEnd + ")收集到了" + size + "条数据==>" + elements);
}
}).print();
env.execute();
}
}
运行结果: 我们在
onPeriodicEmit()
里调用output.emitWatermark()
,就可以发出水位线了;这个方法由系统框架周期性地调用,默认200ms一次。
如果想修改默认周期时间,可以通过下面方法修改。例如:修改为400ms-env.getConfig().setAutoWatermarkInterval(400L)
;
2. 断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测onEvent()
中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在MyWatermarkStrategyGenerator.java的onEvent
方法当中即可。
public class MyWatermarkStrategyGenerator2<T> implements WatermarkGenerator<T> {
private long delayTs = 0L; // 延迟时间
private long maxTs = 0;
public MyWatermarkStrategyGenerator2(long delayTs){
this.delayTs = delayTs;
this.maxTs = Long.MIN_VALUE + this.delayTs + 1;
}
/**
* 每条数据来都会调用一次,用来获取最大的事件时间,可以用来生成新的 watermark
* @param event
* @param eventTimestamp 提取到的数据的中的 事件时间
* @param output
*/
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTs = Math.max(maxTs, eventTimestamp);
output.emitWatermark(new Watermark(maxTs - delayTs - 1L));
System.out.println("调用onEvent方法,获取最大的时间戳= "+maxTs + ",水位线Watermark:"+(maxTs-delayTs-1L));
}
/**
* 周期性的调用,可以用来生成新的watermark
* <p>调用此方法生成watermark的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。
*/
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 这里使用基于周期生成水位线
}
}
public class CustomWatermarkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
// 设置周期2000ms, 默认为200ms
env.getConfig().setAutoWatermarkInterval(2000);
DataStream<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777)
.map(value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
});
// 1.使用自定义 Watermark策略, 设置延迟时间为 3s
WatermarkStrategy customWatermarkStrategy =
WatermarkStrategy
.forGenerator(ctx -> new MyWatermarkStrategyGenerator2<WaterSensor>(3000L))
// 从数据中提取得到时间戳, 以便后续使用
.withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000L));
// 2. 指定使用我们定义的 watermark策略
SingleOutputStreamOperator<WaterSensor> sensorDSwithWaterMark = sensorDS
.assignTimestampsAndWatermarks(customWatermarkStrategy);
sensorDSwithWaterMark.keyBy(waterSensor -> waterSensor.getId())
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "mm:ss.SSS");
long size = elements.spliterator().estimateSize();
out.collect("key=" + key + "窗口:(" + windowStart + ", " + windowEnd + ")收集到了" + size + "条数据==>" + elements);
}
}).print();
env.execute();
}
}
运行结果: 3. 在数据源中发送水位线
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks()
方法来生成水位线了。也就是在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks()
方法生成水位线二者只能取其一。
env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)
6. 水位线的传递
6.1 多并行度的水位线处理
之前水位线的在流处理中,并行度为1,而实际往往是数据处理是多线程并发处理,真实情况是上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟(和管理学中的短板效应类似,短板效应说的是一个木桶由许多块木板组成,如果这些木板的长度不一,那么木桶的最大容量不是由最长的木板决定的,而是由最短的木板决定的)。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以"处理完之前所有数据"为标准来确定自己的时钟。 图中以某一个数据处理节点Task进行数据处理过程, 该Task具有4个分区说明:
- Task里的分区接收到处理2、4、3、6的数据,最小的数据是2,数据4、6、7还没到该Task节点,该Task将2作为水位线广播出去。
- 2处理完成数据4来后,分区当前处理的数据有4、4、3、6, 最小的数据是3,数据6、7还没到该Task节点,该Task将3作为水位线广播出去。
- 4处理完成数据7来后,分区当前处理的数据有4、7、3、6, 最小的数据还是3,数据6还没到该Task节点,该Task将3作为水位线广播出去。
- 3处理完成数据6来后,分区当前处理的数据有4、7、3、6, 最小的数据还是4,该Task将4作为水位线广播出去。
使用之前的FlinkWatermarkOutOfOrderDemo.java代码为例, 设置并行度为2:
public class FlinkWatermarkOutOfOrderDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 设置并行度为2
env.setParallelism(2);
DataStream<WaterSensor> lines = env.socketTextStream("hadoop102", 7777)
.map(value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
});
// 1.定义Watermark策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 1.1 指定watermark生成: 乱序的watermark,等待时间 5s
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// 1.2 指定 时间戳分配器,从数据中提取得到时间戳
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
System.out.println("当前数据" + element + ", recordTimestamp: " + recordTimestamp);
// 简单起见,输入就直接是秒值,这里要求毫秒值,需要乘以1000
return element.getTs() * 1000;
}
});
SingleOutputStreamOperator<WaterSensor> watermarkStream =
// 2. 指定使用我们定义的 watermark策略
lines.assignTimestampsAndWatermarks(watermarkStrategy);
WindowedStream<WaterSensor, String, TimeWindow> window = watermarkStream.keyBy(value -> value.getId())
// 3.使用 事件时间语义 的窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)));
window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "mm:ss.SSS");
long size = elements.spliterator().estimateSize();
out.collect("key="+key+"窗口:("+windowStart + ", " + windowEnd+")收集到了"+size+"条数据==>"+elements);
}
}).print();
env.execute();
}
}
运行结果:
6.2 水位线空闲等待
在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待。
public class FlinkWatermarkWithIdlenessOutOfOrderDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 设置并行度为2
env.setParallelism(2);
DataStream<WaterSensor> lines = env.socketTextStream("hadoop102", 7777)
.map(value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}).partitionCustom(new Partitioner<WaterSensor>(){
@Override
public int partition(WaterSensor key, int numPartitions) {
return key.getVc() % numPartitions;
}
}, r->r);
// 1.定义Watermark策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 1.1 指定watermark生成: 乱序的watermark,等待时间 5s
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// 1.2 指定 时间戳分配器,从数据中提取得到时间戳
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
System.out.println("当前数据" + element + ", recordTimestamp: " + recordTimestamp);
// 简单起见,输入就直接是秒值,这里要求毫秒值,需要乘以1000
return element.getTs() * 1000;
}
})
// .withIdleness(Duration.ofSeconds(5))
;
SingleOutputStreamOperator<WaterSensor> watermarkStream =
// 2. 指定使用我们定义的 watermark策略
lines.assignTimestampsAndWatermarks(watermarkStrategy);
WindowedStream<WaterSensor, String, TimeWindow> window = watermarkStream.keyBy(value -> value.getId())
// 3.使用 事件时间语义 的窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)));
window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "mm:ss.SSS");
long size = elements.spliterator().estimateSize();
out.collect("key="+key+"窗口:("+windowStart + ", " + windowEnd+")收集到了"+size+"条数据==>"+elements);
}
}).print();
env.execute();
}
}
运行结果,会发现即使WaterSensor对象中Ts的值为99也没有触发窗口计算: 原因在于当前flink分区一共有两个,自定义分区器里面取模的时候,由于WaterSensor对象中Ts的值一直输入的是奇数,导致数据集中在map的某一个分区中,而水位线进行传递到第二个算子process的时候,process算子要接收所有分区的水位线,而map算子没有数据的分区里面水位线一直都为Long.minValue,因此process算子接收这两个分区的水位线,按照最小水位线作为自己的水位线原则,导致process算子的水位线就一直没有变化递增,就会一直等待中。
解决办法就是设置空闲等待时间:
public class FlinkWatermarkWithIdlenessOutOfOrderDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 设置并行度为2
env.setParallelism(2);
DataStream<WaterSensor> lines = env.socketTextStream("hadoop102", 7777)
.map(value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}).partitionCustom(new Partitioner<WaterSensor>(){
@Override
public int partition(WaterSensor key, int numPartitions) {
return key.getVc() % numPartitions;
}
}, r->r);
// 1.定义Watermark策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 1.1 指定watermark生成: 乱序的watermark,等待时间 5s
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// 1.2 指定 时间戳分配器,从数据中提取得到时间戳
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
System.out.println("当前数据" + element + ", recordTimestamp: " + recordTimestamp);
// 简单起见,输入就直接是秒值,这里要求毫秒值,需要乘以1000
return element.getTs() * 1000;
}
})
// 设置等待时间
.withIdleness(Duration.ofSeconds(5));
SingleOutputStreamOperator<WaterSensor> watermarkStream =
// 2. 指定使用我们定义的 watermark策略
lines.assignTimestampsAndWatermarks(watermarkStrategy);
WindowedStream<WaterSensor, String, TimeWindow> window = watermarkStream.keyBy(value -> value.getId())
// 3.使用 事件时间语义 的窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)));
window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "mm:ss.SSS");
long size = elements.spliterator().estimateSize();
out.collect("key="+key+"窗口:("+windowStart + ", " + windowEnd+")收集到了"+size+"条数据==>"+elements);
}
}).print();
env.execute();
}
}
7. 迟到数据的处理
7.1 推迟水位线推进
在水位线产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
7.2 设置窗口延迟关闭
Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark超过了窗口结束时间+推迟时间,此时窗会真正关闭。设置窗口延迟关闭时间:allowedLateness(Time.seconds(3))
。
public class FlinkWatermarkAllowedLatenessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 设置并行度为1
env.setParallelism(1);
DataStream<WaterSensor> lines = env.socketTextStream("hadoop102", 7777)
.map(value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
});
// 1.定义Watermark策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 1.1 指定watermark生成: 乱序的watermark,等待时间 5s
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// 1.2 指定 时间戳分配器,从数据中提取得到时间戳
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
System.out.println("当前数据" + element + ", recordTimestamp: " + recordTimestamp);
// 简单起见,输入就直接是秒值,这里要求毫秒值,需要乘以1000
return element.getTs() * 1000;
}
});
SingleOutputStreamOperator<WaterSensor> watermarkStream =
// 2. 指定使用我们定义的 watermark策略
lines.assignTimestampsAndWatermarks(watermarkStrategy);
WindowedStream<WaterSensor, String, TimeWindow> window = watermarkStream.keyBy(value -> value.getId())
// 3.使用 事件时间语义 的窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 设置允许迟到时间为 2s
.allowedLateness(Time.seconds(2));
window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "mm:ss.SSS");
long size = elements.spliterator().estimateSize();
out.collect("key="+key+"窗口:("+windowStart + ", " + windowEnd+")收集到了"+size+"条数据==>"+elements);
}
}).print();
env.execute();
}
}
运行结果:
7.3 侧流接收迟到的数据
使用侧输出流来进行兜底,如果窗口已经关闭,来了一条迟到的数据,已经关闭的窗口不会接收该数据,侧输出流接收进行接收数据,后续可以将侧输出流的数据落库,通过监控数据库来了解数据遗漏情况,或者使用代码处理合并到统计结果里面。
public class FlinkWatermarkSideOutputDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 设置并行度为1
env.setParallelism(1);
DataStream<WaterSensor> lines = env.socketTextStream("hadoop102", 7777)
.map(value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
});
// 1.定义Watermark策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 1.1 指定watermark生成: 乱序的watermark,等待时间 5s
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// 1.2 指定 时间戳分配器,从数据中提取得到时间戳
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
System.out.println("当前数据" + element + ", recordTimestamp: " + recordTimestamp);
// 简单起见,输入就直接是秒值,这里要求毫秒值,需要乘以1000
return element.getTs() * 1000;
}
});
SingleOutputStreamOperator<WaterSensor> watermarkStream =
// 2. 指定使用我们定义的 watermark策略
lines.assignTimestampsAndWatermarks(watermarkStrategy);
OutputTag<WaterSensor> outputTag = new OutputTag<WaterSensor>("late_data", Types.POJO(WaterSensor.class));
WindowedStream<WaterSensor, String, TimeWindow> window = watermarkStream.keyBy(value -> value.getId())
// 3.使用 事件时间语义 的窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 设置允许迟到时间为 2s
.allowedLateness(Time.seconds(2))
.sideOutputLateData(outputTag);
SingleOutputStreamOperator<String> process = window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "mm:ss.SSS");
long size = elements.spliterator().estimateSize();
out.collect("key=" + key + "窗口:(" + windowStart + ", " + windowEnd + ")收集到了" + size + "条数据==>" + elements);
}
});
process.getSideOutput(outputTag).printToErr();
process.print();
env.execute();
}
}
运行结果: