状态管理之托管状态
1. 按键分区状态(Keyed State)
按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以key为作用范围进行隔离。需要注意,使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State。
按键分区状态主要包括:值状态(ValueState)、列表状态(ListState)、Map状态(MapState)、归约状态(ReducingState)、聚合状态(AggregatingState)
2. 值状态(ValueState)
顾名思义,状态中只保存一个"值"(value)。ValueState<T>
本身是一个接口,源码中定义如下:
public interface ValueState<T> extends State {
// 获取当前状态的值
T value() throws IOException;
//对状态进行更新,传入的参数value就是要覆写的状态值。
void update(T value) throws IOException;
}
在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个"状态描述器"(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState的状态描述器构造方法如下:
public ValueStateDescriptor(String name, Class<T> typeClass) {
super(name, typeClass, null);
}
案例需求:检测每种传感器的水位值,如果连续的两个水位值超过10,就输出报警。
public class KeyedValueStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777)
.map((MapFunction<String, WaterSensor>) value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
})
// 处理最多延迟3秒的乱序事件
.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((value, ts) -> value.getTs() * 1000L));
sensorDS.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
// 1.定义状态
ValueState<Integer> lastVcState; // int lastVcState=0; 这样定义普通变量写会有问题
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 2.在open方法中,通过运行时上下文来初始化状态
lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));
}
@Override
public void processElement(WaterSensor current, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
if (current.getVc() >= 10) {
int lastVc = lastVcState.value(); // 取出 本组 值状态 的数据
if (lastVc >= 10) {
out.collect("连续的两个水位值超过10:" + current.getVc() + " 上一次值:" + lastVc);
}
}
// 3. 更新状态里的水位值
lastVcState.update(current.getVc());
}
}).print();
env.execute();
}
}
运行效果: 如果直接定义普通变量来保存上一条vc的值,在key不同的时候会出现问题,出现即使不同的key也会进行比较,所以之前我们使用hashmap按照key来存数据。
3. 列表状态(ListState)
将需要保存的数据,以列表(List)的形式组织起来。在ListState<T>
接口中同样有一个类型参数T,表示列表中数据的类型。ListState也提供了一系列的方法来操作状态:
Iterable<T> get()
:获取当前的列表状态,返回的是一个可迭代类型Iterable<T>
;update(List<T> values)
:传入一个列表values,直接对状态进行覆盖;add(T value)
:在状态列表中添加一个元素value;addAll(List<T> values)
:向列表中添加多个元素,以列表values形式传入。
类似地,ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。
案例需求:针对每种传感器输出最高的3个水位值
public class FlinkDemo {
public static void applyHandleData(Consumer<SingleOutputStreamOperator<WaterSensor>> consumer) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777)
.map((MapFunction<String, WaterSensor>) value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
})
// 处理最多延迟3秒的乱序事件
.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((value, ts) -> value.getTs() * 1000L));
consumer.accept(sensorDS);
env.execute();
}
}
public class KeyedListStateDemo {
public static void main(String[] args) throws Exception {
FlinkDemo.applyHandleData(sensorDS -> {
sensorDS.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
// 1.定义状态
ListState<Integer> last10VcState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
last10VcState = getRuntimeContext().getListState(
new ListStateDescriptor<Integer>("last10Vc", Types.INT));
}
@Override
public void processElement(WaterSensor current, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
last10VcState.add(current.getVc());
// 2.从list状态拿出来(Iterable), 拷贝到一个List中,排序, 只留3个最大的
Iterable<Integer> vcListIt = last10VcState.get();
ArrayList<Integer> list = new ArrayList<>();
// 使用for-each循环来遍历Iterable对象
for (Integer vc : vcListIt) {
list.add(vc);
}
list.sort((o1, o2) -> o2 - o1);
String collected = list.subList(0, Math.min(3, list.size())).stream().map(Object::toString).collect(Collectors.joining(","));
out.collect(collected);
out.collect("key="+ctx.getCurrentKey()+", 最大的三个vc值为:"+ collected);
// list只保留3个
if(list.size()>3){
list.remove(3);
}
// 3. 更新状态
last10VcState.update(list);
}
}).print();
});
}
}
这里将重复性的代码抽取出来,具体数据处理Consumer函数接口里面实现。运行效果:
4. Map状态(MapState)
把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。对应的MapState<UK, UV>
接口中,就会有UK、UV两个泛型,分别表示保存的key和value的类型。同样,MapState提供了操作映射状态的方法,与Map的使用非常类似:
UV get(UK key)
:传入一个key作为参数,查询对应的value值;put(UK key, UV value)
:传入一个键值对,更新key对应的value值;putAll(Map<UK, UV> map)
:将传入的映射map中所有的键值对,全部添加到映射状态中;remove(UK key)
:将指定key对应的键值对删除;boolean contains(UK key)
:判断是否存在指定的key,返回一个boolean值。Iterable<Map.Entry<UK, UV>> entries()
:获取映射状态中所有的键值对;Iterable<UK> keys()
:获取映射状态中所有的键(key),返回一个可迭代Iterable类型;Iterable<UV> values()
:获取映射状态中所有的值(value),返回一个可迭代Iterable类型;boolean isEmpty()
:判断映射是否为空,返回一个boolean值。
案例需求:统计每种传感器每种水位值出现的次数。
public class KeyedMapStateDemo {
public static void main(String[] args) throws Exception {
FlinkDemo.applyHandleData(sensorDS -> {
sensorDS.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
// 1.定义状态
MapState<Integer, Integer> vcCountState;
// 2.在open方法中,初始化状态
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
vcCountState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountState", Types.INT, Types.INT));
}
@Override
public void processElement(WaterSensor current, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
// 3. 更新状态里的水位值
Integer currentVc = current.getVc();
Integer count = vcCountState.get(currentVc);
if (count == null) {
vcCountState.put(currentVc, 1);
} else {
vcCountState.put(currentVc, count + 1);
}
// 4. 取出状态里的数据
StringBuffer sb = new StringBuffer();
sb.append("=====================================\n");
for (Map.Entry<Integer, Integer> entry : vcCountState.entries()) {
sb.append("key="+ctx.getCurrentKey()+",vc=" + entry.getKey() + ",出现的次数=" + entry.getValue() + "\n");
}
out.collect(sb.toString());
}
}).print();
});
}
}
运行效果:
5. 归约状态(ReducingState)
对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。ReducingState<T>
这个接口调用的方法类似于ListState,只不过它保存的只是一个聚合值,所以调用add()
方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。
public ReducingStateDescriptor(
String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}
这里的描述器有三个参数,其中第二个参数就是定义了归约聚合逻辑的ReduceFunction,另外两个参数则是状态的名称和类型。
案例:计算每种传感器的水位和。
public class KeyedReducingStateDemo {
public static void main(String[] args) throws Exception {
FlinkDemo.applyHandleData(sensorDS -> {
sensorDS.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
private ReducingState<Integer> sumState;
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
sumState = getRuntimeContext().getReducingState(
new ReducingStateDescriptor<Integer>(
"sumState",
// 聚合逻辑就是累加
(o1, o2)->o1+o2,
Types.INT));
}
@Override
public void processElement(WaterSensor current, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
sumState.add(current.getVc());
out.collect("key="+ctx.getCurrentKey()+",sumVc="+sumState.get());
}
}).print();
});
}
}
运行效果:
6. 聚合状态(AggregatingState)
与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是,AggregatingState的输入输出不要求类型是完全一致的, 使用更加灵活。
案例需求:计算每种传感器的平均水位
public class KeyedAggregatingStateDemo {
public static void main(String[] args) throws Exception {
FlinkDemo.applyHandleData(sensorDS -> {
sensorDS.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
AggregatingState<WaterSensor, Double> aggregatingState;
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
// 构造AggregatingStateDescriptor对象
// 第一个参数:状态的名称
// 第二个参数:AggregateFunction
// 第三个参数:累加器的类型
aggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor(
"aggregatingStateAvg",
new AggregateFunction<WaterSensor, Tuple2<Integer, Integer>, Double>() {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return new Tuple2<>(0, 0);
}
@Override
public Tuple2<Integer, Integer> add(WaterSensor value, Tuple2<Integer, Integer> accumulator) {
return Tuple2.of(value.getVc()+accumulator.f0, accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
// 计算平均值
if(accumulator.f1 != 0){
return accumulator.f0 * 1D / accumulator.f1;
}else{
return 0D;
}
}
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
// 合并, 如果没有用到会话窗口,可以不写,直接返回null即可
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
},
Types.TUPLE(Types.INT, Types.INT)));
}
@Override
public void processElement(WaterSensor current, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
// aggregatingState数据清空api
// aggregatingState.clear();
aggregatingState.add(current);
out.collect("key=" + ctx.getCurrentKey() + ",平均水位值=" + aggregatingState.get());
}
}).print();
});
}
}
运行结果:
7. 状态生存时间(TTL)
在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用clear()
方法去清除状态值,另外一种办法是交给Flink去管理,配置一个状态的"生存时间"(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它自动清除。
Flink底层处理流程是:默认会标记一个失效时间(失效时间 = 当前时间 + TTL),之后如果有对状态的访问和修改,再对失效时间进行更新;当设置的清除条件被触发时(比如被动访问或者周期性的扫描时候)就可以判断状态是否失效、从而进行清除了。
配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的enableTimeToLive()
方法启动TTL功能。
StateTtlConfig ttlConfig = StateTtlConfig
// 状态TTL配置的构造器方法
.newBuilder(Time.seconds(10))
// 设置更新类型。更新类型指定了什么时候更新状态失效时间。OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。
// OnReadAndWrite则表示无论读写操作都会更新失效时间,
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// 设置过期数据状态的可见性。因为清除数据并不是实时的,状态过期后可能数据还在,这时可以配置是否允许访问
// NeverReturnExpired是默认行为,表示从不返回过期值。
// ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
除此之外,TTL配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对RocksDB状态后端使用压缩过滤器(compaction filter)进行后台清理。这里需要注意,目前的TTL设置只支持处理时间。
public class KeyedValueTTLStateDemo {
public static void main(String[] args) throws Exception {
FlinkDemo.applyHandleData(sensorDS -> {
sensorDS.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
ValueState<Integer> valueState;
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
// 设置创建的5s后过期
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(5))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<Integer> valueStateTTL = new ValueStateDescriptor<Integer>("valueStateTTL", Types.INT);
valueStateTTL.enableTimeToLive(ttlConfig);
valueState = getRuntimeContext().getState(valueStateTTL);
}
@Override
public void processElement(WaterSensor current, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
out.collect("key=" + ctx.getCurrentKey() + ",vc值=" + valueState.value());
valueState.update(current.getVc());
}
}).print();
});
}
}
运行结果:
8. 算子状态(Operator State)
算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,所以不同key的数据只要被分发到同一个并行子任务,就会访问到同一个Operator State。
算子状态的实际应用场景不如Keyed State多,一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。比如Flink的Kafka连接器中,就用到了算子状态。当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。
算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。
9. 列表状态(ListState)
与Keyed State中的ListState一样,将状态表示为一组数据的列表,与Keyed State中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个"列表"(list)。算子状态中不会存在"键组"(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了"列表"(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。
案例实操:在map算子中计算数据的个数。
public class OperatorListStateDemo {
public static void main(String[] args) throws Exception {
// 在map算子中计算数据的个数
FlinkDemo.applyHandleData(sensorDS -> {
sensorDS.map(new MyMapFunction())
.print();
});
}
// 1.实现 CheckpointedFunction 接口
static class MyMapFunction implements MapFunction<WaterSensor, Integer>, CheckpointedFunction {
int count = 0;
private ListState<Integer> state;
@Override
public Integer map(WaterSensor value) throws Exception {
return count++;
}
// 2.本地变量持久化:将 本地变量 拷贝到 算子状态中,开启checkpoint时会被调用
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
System.out.println("snapshotState...");
// 2.1 清空算子状态
state.clear();
// 2.2 将 本地变量 添加到 算子状态 中
state.add(count);
}
// 3.初始化本地变量:程序启动和恢复时, 从状态中 把数据添加到 本地变量,每个子任务调用一次
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
System.out.println("initializeState...");
// 3.1 从 上下文 初始化 算子状态
state = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<Integer>("state", Types.INT));
// 3.2 从 算子状态中 把数据 拷贝到 本地变量
if (context.isRestored()) {
for (Integer c : state.get()) {
count += c;
}
}
}
}
}
运行结果:
10. 联合列表状态(UnionListState)
与ListState类似,联合列表状态也会将状态表示为一个列表。UnionListState的重点就在于"联合"(union)。在并行度发生变化时(比如分区从2个变成3个),ListState状态将所有状态汇聚到一起,然后轮询重新分配状态项到每个分区((比如状态值在2个分区[(1,2,3),(4,5,6)]->[(1,2,3,4,5,6)]->3个分区轮询[(1,4),(2,5),(3,6)])),而联合列表状态的算子则会直接广播一个完整列表状态([(1,2,3,4,5,6),(1,2,3,4,5,6),(1,2,3,4,5,6)])。这样的话,并行度改变之后的每个并行子任务就获取到了联合后完整的"大列表",可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作"联合重组"(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。使用如下:
state = context
.getOperatorStateStore()
.getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));
11. 广播状态(BroadcastState)
有时我们希望算子并行子任务都保持同一份"全局"状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被"广播"到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。
案例实操:水位超过指定的阈值发送告警,阈值可以动态修改(也就是通过流直接修改而不是读取MySQL数据库)。
public class OperatorBroadcastStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777)
.map((MapFunction<String, WaterSensor>) value -> {
String[] split = value.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
})
// 处理最多延迟3秒的乱序事件
.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((value, ts) -> value.getTs() * 1000L));
SingleOutputStreamOperator<String> configBS = env.socketTextStream("hadoop102", 8888);
// 1. 定义一个广播状态,用来保存配置信息
MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcastState", String.class, Integer.class);
BroadcastStream<String> broadcastStream = configBS.broadcast(broadcastMapState);
// 2.把 数据流 和 广播后的配置流 connect 起来
BroadcastConnectedStream<WaterSensor, String> connectedCS = sensorDS.connect(broadcastStream);
// 3.调用 process
connectedCS.process(new BroadcastProcessFunction<WaterSensor, String, String>() {
@Override
public void processElement(WaterSensor value, BroadcastProcessFunction<WaterSensor, String, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {
// 5.通过上下文获取广播状态,取出里面的值(只读,不能修改)
ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
Integer threshold = broadcastState.get("threshold");
// 判断广播状态里是否有数据,因为刚启动时,可能是数据流的第一条数据先来
threshold = (threshold == null ? 0 : threshold);
if (value.getVc() > threshold) {
out.collect(value + ",水位超过指定的阈值:" + threshold + "!!!");
}
}
@Override
public void processBroadcastElement(String value, BroadcastProcessFunction<WaterSensor, String, String>.Context ctx, Collector<String> out) throws Exception {
// 4. 通过上下文获取广播状态,往里面写数据
BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
broadcastState.put("threshold", Integer.valueOf(value));
}
}).print();
env.execute();
}
}
运行结果: