合流
在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。
1. 联合(Union)
最简单的合流操作,就是直接将多条流合在一起,叫作流的"联合"(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。 在代码中,我们只要基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合了;得到的依然是一个DataStream:
stream1.union(stream2, stream3, ...)
注意:union()
的参数可以是多个DataStream,所以联合操作可以实现多条流的合并。
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> s1 = env.fromElements(1, 2, 3, 4);
DataStreamSource<Integer> s2 = env.fromElements(11, 22, 33, 44);
DataStreamSource<String> s3 = env.fromElements("111", "222", "333", "444");
// 1. 流的数据类型必须一致 2. 一次可以合并多条流
// 第一种写法
// DataStream<Integer> unionS1 = s1.union(s2).union(s3);
// 第二种写法
DataStream<Integer> unionS2 = s1.union(s2, s3.map(it->Integer.parseInt(it)));
unionS2.print();
env.execute();
}
2. 连接(Connect)
流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect)。
2.1 连接流(ConnectedStreams)
为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型,所以连接得到的并不是DataStream,而是一个"连接流"。连接流可以看成是两条流形式上的"统一",被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的DataStream,还需要进一步定义一个"同处理"(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。
所以整体上来,两条流的连接就像是"一国两制",两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个DataStream中。 需要分为两步:首先基于一条DataStream调用.connect()方法,传入另外一条DataStream作为参数,将两条流连接起来,得到一个ConnectedStreams;然后再调用同处理方法得到DataStream。这里可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法。
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> s1 = env.fromElements(1, 2, 3, 4);
DataStreamSource<String> s2 = env.fromElements("a", "b", "c", "d");
// 1. 一次只能连接一个流
// 2. 流的数据类型可以不一致
// 3. 连接后可以调用map/flatMap/process来处理, 但是各自处理各自的
ConnectedStreams<Integer, String> connectedStream = s1.connect(s2);
SingleOutputStreamOperator<String> mapS = connectedStream.map(new CoMapFunction<Integer, String, String>() {
@Override
public String map1(Integer value) throws Exception {
return value.toString();
}
@Override
public String map2(String value) throws Exception {
return value;
}
});
mapS.print();
env.execute();
}
上面的代码中,ConnectedStreams有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要"一国两制",因此调用.map()方法时传入的不再是一个简单的MapFunction,而是一个CoMapFunction,表示分别对两条流中的数据执行map操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1()就是对第一条流中数据的map操作,.map2()则是针对第二条流。
2.2 CoProcessFunction
与CoMapFunction类似,如果是调用.map()就需要传入一个CoMapFunction,需要实现map1()、map2()两个方法;而调用.process()时,传入的则是一个CoProcessFunction。它也是"处理函数"家族中的一员,用法非常相似。它需要实现的就是processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。
值得一提的是,ConnectedStreams也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个ConnectedStreams:
connectedStreams.keyBy(keySelector1, keySelector2);
这里传入两个参数keySelector1和keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的keyBy用法完全一致。ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。
案例需求:连接两条流,输出能根据id匹配上的数据(类似inner join效果)
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// 设置并行度为 1
env.setParallelism(1);
DataStreamSource<Tuple3<String, Integer, Integer>> s1 = env.fromElements(
new Tuple3<String, Integer, Integer>("a",1, 1),
new Tuple3<String, Integer, Integer>("a",1, 2),
new Tuple3<String, Integer, Integer>("b",2, 2),
new Tuple3<String, Integer, Integer>("c",3, 3)
);
DataStreamSource<Tuple2<String, Integer>> s2 = env.fromElements(
new Tuple2<String, Integer>("a",11),
new Tuple2<String, Integer>("b",22),
new Tuple2<String, Integer>("c",33)
);
ConnectedStreams<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> connect = s1.connect(s2);
/**
* 实现互相匹配的效果: 两条流, 不一定谁的先来
* 1. 每条流, 有数据来, 存到一个变量中
* hashmap
* => key = id ,第一个字段值
* => value=List<数据>
* 2. 每条流有数据来的时候,除了存变量中,不知道对方是否有匹配的数据,要去另一条流存的变量中, 查找是否有匹配上的
*/
SingleOutputStreamOperator<String> process = connect.process(new CoProcessFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>, String>() {
HashMap<String, List<Tuple3<String, Integer, Integer>>> s1Cache = new HashMap<>();
HashMap<String, List<Tuple2<String, Integer>>> s2Cache = new HashMap<>();
/**
*
* @param value 第一条流数据
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement1(Tuple3<String, Integer, Integer> value, CoProcessFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
// s1数据来了,就存到变量中
if (s1Cache.get(value.f0) == null) {
// 如果key不存在,说明该key的第一条数据,初始化,put进map中
ArrayList<Tuple3<String, Integer, Integer>> list = new ArrayList<>();
list.add(value);
s1Cache.put(value.f0, list);
} else {
// key存在,不是该key的第一条数据, 直接添加到value的list中
s1Cache.get(value.f0).add(value);
}
// 去s2Cache中查找是否有id能匹配上的, 匹配上就输出, 没有就不输出
if (s2Cache.get(value.f0) != null) {
for (Tuple2<String, Integer> tuple2 : s2Cache.get(value.f0)) {
out.collect("s1: " + value + "<======>" + "s2: " + tuple2);
}
}
}
/**
*
* @param value 第二条流数据
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement2(Tuple2<String, Integer> value, CoProcessFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
// s2数据来了,就存到变量中
if (s2Cache.get(value.f0) == null) {
// 如果key不存在,说明该key的第一条数据,初始化,put进map中
ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(value);
s2Cache.put(value.f0, list);
} else {
// key存在,不是该key的第一条数据, 直接添加到value的list中
s2Cache.get(value.f0).add(value);
}
// 去s1Cache中查找是否有id能匹配上的, 匹配上就输出, 没有就不输出
if (s1Cache.get(value.f0) != null) {
for (Tuple3<String, Integer, Integer> tuple3 : s1Cache.get(value.f0)) {
out.collect("s1: " + tuple3 + "<======>" + "s2: " + value);
}
}
}
});
process.print();
env.execute();
}
运行结果: 如果修改并行度为2:
这是因为在执行process算子之前没有keyBy分区,容易发生相关的key默认随机进入不同的分区,使得CoProcessFunction里面的cache判断不再准确。解决办法是指定按照keyBy分区:
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// 设置并行度为 1
env.setParallelism(2);
DataStreamSource<Tuple3<String, Integer, Integer>> s1 = env.fromElements(
new Tuple3<String, Integer, Integer>("a",1, 1),
new Tuple3<String, Integer, Integer>("a",1, 2),
new Tuple3<String, Integer, Integer>("b",2, 2),
new Tuple3<String, Integer, Integer>("c",3, 3)
);
DataStreamSource<Tuple2<String, Integer>> s2 = env.fromElements(
new Tuple2<String, Integer>("a",11),
new Tuple2<String, Integer>("b",22),
new Tuple2<String, Integer>("c",33)
);
ConnectedStreams<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> connect = s1.connect(s2);
// 多并行度情况下,需要根据关联条件进行keyBy, 才能保证key相同的数据进入同一个分区
ConnectedStreams<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> keyedBy = connect.keyBy(s1Element -> s1Element.f0, s2Element -> s2Element.f0);
/**
* 实现互相匹配的效果: 两条流, 不一定谁的先来
* 1. 每条流, 有数据来, 存到一个变量中
* hashmap
* => key = id ,第一个字段值
* => value=List<数据>
* 2. 每条流有数据来的时候,除了存变量中,不知道对方是否有匹配的数据,要去另一条流存的变量中, 查找是否有匹配上的
*/
SingleOutputStreamOperator<String> process = keyedBy.process(new CoProcessFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>, String>() {
HashMap<String, List<Tuple3<String, Integer, Integer>>> s1Cache = new HashMap<>();
HashMap<String, List<Tuple2<String, Integer>>> s2Cache = new HashMap<>();
/**
*
* @param value 第一条流数据
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement1(Tuple3<String, Integer, Integer> value, CoProcessFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
// s1数据来了,就存到变量中
if (s1Cache.get(value.f0) == null) {
// 如果key不存在,说明该key的第一条数据,初始化,put进map中
ArrayList<Tuple3<String, Integer, Integer>> list = new ArrayList<>();
list.add(value);
s1Cache.put(value.f0, list);
} else {
// key存在,不是该key的第一条数据, 直接添加到value的list中
s1Cache.get(value.f0).add(value);
}
// 去s2Cache中查找是否有id能匹配上的, 匹配上就输出, 没有就不输出
if (s2Cache.get(value.f0) != null) {
for (Tuple2<String, Integer> tuple2 : s2Cache.get(value.f0)) {
out.collect("s1: " + value + "<======>" + "s2: " + tuple2);
}
}
}
/**
*
* @param value 第二条流数据
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement2(Tuple2<String, Integer> value, CoProcessFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
// s2数据来了,就存到变量中
if (s2Cache.get(value.f0) == null) {
// 如果key不存在,说明该key的第一条数据,初始化,put进map中
ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(value);
s2Cache.put(value.f0, list);
} else {
// key存在,不是该key的第一条数据, 直接添加到value的list中
s2Cache.get(value.f0).add(value);
}
// 去s1Cache中查找是否有id能匹配上的, 匹配上就输出, 没有就不输出
if (s1Cache.get(value.f0) != null) {
for (Tuple3<String, Integer, Integer> tuple3 : s1Cache.get(value.f0)) {
out.collect("s1: " + tuple3 + "<======>" + "s2: " + value);
}
}
}
});
process.print();
env.execute();
}
执行结果就正常了,不再每次结果都变化且不对: