用户自定义函数(UDF)
用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。用户自定义函数分为:函数类、匿名函数、富函数类。
1. 函数类(Function Classes)
Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
比如需求:从用户的点击数据中筛选包含"sensor_1"的内容,之前也用过函数类:
java
public class TransFilterFunctionUDF implements FilterFunction<WaterSensor> {
private String id;
public TransFilterFunctionUDF(String id){
this.id = id;
}
@Override
public boolean filter(WaterSensor ws) throws Exception {
return this.id.equals(ws.getId());
}
}
java
public class TransFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> streamSource = env.fromElements(
new WaterSensor("sensor_1", 1L, 1),
new WaterSensor("sensor_1", 2L, 2),
new WaterSensor("sensor_2", 2L, 2),
new WaterSensor("sensor_3", 3L, 3)
);
streamSource.filter(new TransFilterFunctionUDF("sensor_1")).print();
env.execute();
}
}
2. 富函数类(Rich Function Classes)
"富函数类"也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
open()
方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()
或者filter()
方法被调用之前,open()
会首先被调用。close()
方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。 需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map()
,在每条数据到来后都会触发一次调用。
java
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4);
SingleOutputStreamOperator<Integer> mapOperator = streamSource.map(new RichMapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value + 1;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext context = getRuntimeContext();
System.out.println("子任务编号: " + context.getIndexOfThisSubtask()
+ ",子任务名称: " + context.getTaskNameWithSubtasks() + "调用open()");
}
@Override
public void close() throws Exception {
super.close();
RuntimeContext context = getRuntimeContext();
System.out.println("子任务编号: " + context.getIndexOfThisSubtask()
+ ",子任务名称: " + context.getTaskNameWithSubtasks() + "调用close()");
}
});
mapOperator.print();
env.execute();
}
执行结果: