转换算子(Transformation)
数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
1. 基本转换算子(map/filter/flatMap)
1.1 映射(map)
map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个"一一映射",消费一个元素就产出一个元素。 我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改变。
下面的代码用不同的方式,实现了提取WaterSensor中的id字段的功能。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1L, 1),
new WaterSensor("sensor_2", 2L, 2),
new WaterSensor("sensor_3", 3L, 3)
);
// 方式一:传入匿名类,实现MapFunction
stream.map(new MapFunction<WaterSensor, String>() {
@Override
public String map(WaterSensor e) throws Exception {
return e.id;
}
}).print();
// 方式二:传入MapFunction的实现类
stream.map(new UserMap()).print();
// 方式三: 使用lambda表达式
stream.map(e -> e.getId()).print();
env.execute();
}
public static class UserMap implements MapFunction<WaterSensor, String> {
@Override
public String map(WaterSensor e) throws Exception {
return e.id;
}
}
上面代码中,MapFunction实现类的泛型类型,与输入数据类型和输出数据的类型有关。在实现MapFunction接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还需要重写一个map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。
1.2 过滤(filter)
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。 进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。
下面的代码会将数据流中传感器id为sensor_1的数据过滤出来:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1L, 1),
new WaterSensor("sensor_1", 2L, 22),
new WaterSensor("sensor_2", 2L, 2),
new WaterSensor("sensor_3", 3L, 3)
);
// 方式一:传入匿名类实现FilterFunction
stream.filter(new FilterFunction<WaterSensor>() {
@Override
public boolean filter(WaterSensor e) throws Exception {
return e.id.equals("sensor_1");
}
}).print();
// 方式二:传入FilterFunction实现类
stream.filter(new WaterSensorFilter()).print();
// 方式三: 使用Lambda表达式
stream.filter(it -> it.getId().equals("sensor_1")).print();
env.execute();
}
public static class WaterSensorFilter implements FilterFunction<WaterSensor> {
@Override
public boolean filter(WaterSensor e) {
return e.getVc().equals("sensor_1");
}
}
1.3 扁平映射(flatMap)
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是"扁平化"(flatten)和"映射"(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。 同map一样,flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。
如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc。实现代码如下:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1L, 1),
new WaterSensor("sensor_1", 2L, 2),
new WaterSensor("sensor_2", 2L, 2),
new WaterSensor("sensor_3", 3L, 3)
);
// stream.flatMap(new MyFlatMap()).print();
// 推荐使用lambda表达式
stream.flatMap(((value, out) -> {
if (value.id.equals("sensor_1")) {
out.collect(String.valueOf(value.getVc()));
} else if (value.id.equals("sensor_2")) {
out.collect(String.valueOf(value.getTs()));
out.collect(String.valueOf(value.getVc()));
}
}));
env.execute();
}
public static class MyFlatMap implements FlatMapFunction<WaterSensor, String> {
@Override
public void flatMap(WaterSensor value, Collector<String> out) {
if (value.id.equals("sensor_1")) {
out.collect(String.valueOf(value.getVc()));
} else if (value.id.equals("sensor_2")) {
out.collect(String.valueOf(value.getTs()));
out.collect(String.valueOf(value.getVc()));
}
}
}
2. 聚合算子(Aggregation)
计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的"聚合"(Aggregation),类似于MapReduce中的reduce操作。
2.1 按键分区(keyBy)
对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。
基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。 在内部,是通过计算key的哈希值(hash code),对分区数进行取模运算来实现的。所以这里key如果是POJO的话,必须要重写hashCode()方法。
keyBy()方法需要传入一个参数,这个参数指定了一个或一组key。有很多不同的方法来指定key:比如对于Tuple数据类型,可以指定字段的位置或者多个位置的组合;对于POJO类型,可以指定字段的名称(String);另外,还可以传入Lambda表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取key的逻辑。KeySelector类的泛型有两个:除去当前流中的元素类型外,还需要指定key的类型。
我们可以以id作为key做一个分区操作,代码实现如下:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为2
env.setParallelism(2);
DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1L, 1),
new WaterSensor("sensor_1", 2L, 2),
new WaterSensor("sensor_2", 2L, 2),
new WaterSensor("sensor_3", 3L, 3)
);
// 方式一:使用Lambda表达式
KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);
// 方式二:使用匿名类实现KeySelector
// KeyedStream<WaterSensor, String> keyedStream1 = stream.keyBy(new KeySelector<WaterSensor, String>() {
// @Override
// public String getKey(WaterSensor e) throws Exception {
// return e.id;
// }
// });
keyedStream.print();
env.execute();
}
需要注意的是,keyBy得到的结果将不再是DataStream,而是会将DataStream转换为KeyedStream。KeyedStream可以认为是"分区流"或者"键控流",内部使用它是对DataStream按照key的进行逻辑分区,KeyedStream也继承自DataStream,所以基于它的操作也都归属于DataStream API。但它跟之前的转换操作得到的SingleOutputStreamOperator不同,只是一个流的分区操作,并不是一个转换算子。比如KeyedStream没有setParallelism()方法, KeyedStream是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如sum,reduce)。
按照key分组要点
- 返回的是一个keyedStream, 键控流
- keyBy不是转换算子,只是对数据进行重分区,不能设置并行度
- 分组与分区的关系:
- keyBy是对数据分组,保证相同key的数据, 在同一个分区(子任务)
- 分区: 一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
2.1 简单聚合(sum/min/max/minBy/maxBy)
有了KeyedStream,我们就可以基于它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:
sum()
:在输入流上,对指定的字段做叠加求和的操作。min()
:在输入流上,对指定的字段求最小值。max()
:在输入流上,对指定的字段求最大值。minBy()
:与min()
类似,在输入流上针对指定字段求最小值。不同的是,min()
只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。maxBy()
:与max()
类似,在输入流上针对指定字段求最大值。两者区别与min()
/minBy()
完全一致。
简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。
对于元组类型的数据,可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以f0、f1、f2、…来命名的。
如果数据流的类型是POJO类,那么就只能通过字段名称来指定,不能通过位置来指定了。
public class TransAggregation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1L, 1),
new WaterSensor("sensor_1", 2L, 2),
new WaterSensor("sensor_2", 2L, 2),
new WaterSensor("sensor_3", 3L, 3)
);
// 简单聚合算子,需要在keyBy后调用, 分组内聚合
KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);
// 如果是pojo需要指定字段名称
// 如果是tuple则指定位置索引
keyedStream.max("vc");
// maxBy和max的区别:
// max只会取出指定字段的最大值,非比较字段仍然是第一次的值
// maxBy去比较字段的最大值,同时非比较字段取最大值这条数据的值
keyedStream.maxBy("vc");
env.execute();
}
}
简单聚合算子返回的,同样是一个SingleOutputStreamOperator,也就是从KeyedStream又转换成了常规的DataStream。所以可以这样理解:keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个DataStream。而且经过简单聚合之后的数据流,元素的数据类型保持不变。
一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作"状态"(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,应该只用在含有有限个key的数据流上。
2.2 归约聚合(reduce)
reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。调用KeyedStream的reduce方法时,需要传入一个参数,实现ReduceFunction接口。接口在源码中的定义如下:
public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}
ReduceFunction接口里需要实现reduce()方法,这个方法接收两个相同类型的数据,经过转换处理之后输出一个相同类型的数据。在流处理的底层实现过程中,实际上是将中间"合并的结果"作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
我们可以单独定义一个函数类实现ReduceFunction接口,也可以直接传入一个匿名类。当然,同样也可以通过传入Lambda表达式实现类似的功能。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1L, 1),
new WaterSensor("sensor_1", 2L, 2),
new WaterSensor("sensor_2", 2L, 2),
new WaterSensor("sensor_3", 3L, 3)
);
// 简单聚合算子,需要在keyBy后调用
// key的第一条数据来的时候,不会执行reduce方法,存起来,直接输出
KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);
keyedStream.reduce(((value1, value2) -> {
System.out.println("value1 = " + value1 + ", value2 = " + value2);
return new WaterSensor(value1.getId(), value1.getTs()+value2.getTs(), value2.getVc()+value1.getVc());
})).print();
env.execute();
}