Flink快速上手
1. 创建项目
1.1 添加项目依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<target.java.version>1.8</target.java.version>
<flink.version>1.17.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2. WordCount代码编写
需求:统计一段文字中,每个单词出现的频次。
环境准备:新建一个包,命名为com.rocket.flink
2.1 数据准备
- 在工程根目录下新建一个input文件夹,并在下面创建文本文件words.txt
- 在words.txt中输入一些文字,例如:
Hello Flink
Scala Spark Flink Hadoop
Hello World
BigData Scala Flink
3. 批处理方式
批处理基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。
3.1 代码编写
在com.rocket.flink包下新建类BatchWordCount1,在静态main方法中编写代码
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取数据
Path filePath =new Path( );
DataSource<String> fileSource = env.readTextFile("input/data.txt");
fileSource.flatMap((String value, Collector<Tuple2<String, Integer>> out)->{
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word,1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.groupBy(0).sum(1).print();
}
运行结果:
需要注意的是,这种代码的实现方式,是基于DataSet API的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH
来进行批处理:
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
4. 流处理方式
对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API更加强大,可以直接处理批处理和流处理的所有场景。DataStream API是Flink的核心层API。一个Flink程序,代码基本上都由以下几部分构成:
4.1 代码实现
在com.rocket.flink包下新建类StreamWordCount2,在静态main方法中编写代码。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取数据
Path filePath =new Path( "input/data.txt");
TextLineInputFormat inputFormat = new TextLineInputFormat();
FileSource<String> fileSource = FileSource.forRecordStreamFormat(inputFormat, filePath).build();
DataStreamSource<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source");
fileDS.flatMap((String value, Collector<Tuple2<String, Integer>> out)->{
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word,1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(value->value.f0).sum(1).print();
env.execute("wordCount");
}
运行结果:
提示
主要观察与批处理程序BatchWordCount1的不同:
- 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。
- 转换处理之后,得到的数据对象类型不同。
- 分组操作改为调用
keyBy()
方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。 - 代码末尾额外需要调用env的
execute()
方法,开始执行任务。
5. 读取socket文本流
在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。为了模拟这种场景,可以监听socket端口,然后向该端口不断的发送数据。
5.1 代码实现
在com.rocket.flink包下新建类SocketStreamWordCount,在静态main方法中编写代码:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取数据
env.socketTextStream("hadoop102",7777)
.flatMap((String value, Collector<Tuple2<String, Integer>> out)->{
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word,1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(value->value.f0)
.sum(1)
.print();
env.execute("streamWordCount");
}
5.2 模拟流式环境
在Linux环境的主机hadoop102上,执行下列命令,发送数据进行测试, 其中nc是netcat工具的简称,一个网络工具,可以用来端口扫描、文件传输等功能。一般nc只用来做TCP/UDP协议的端口测试,其它功能少用!
## 安装netcat包
[jack@hadoop102 ~]$ sudo yum install -y nc
[jack@hadoop102 ~]$ nc -lk 7777
提示
要先启动7777端口nc服务程序,后启动StreamWordCount程序,否则会报超时连接异常。
5.3 启动StreamWordCount程序
我们会发现程序启动之后没有任何输出、也不会退出。这是正常的,因为Flink的流处理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。
5.4 从hadoop102发送数据
- 在hadoop102主机中,输入"hello flink",程序控制台输出如下内容:
13> (flink,1)
5> (hello,1)
- 再输入"hello world",程序控制台输出如下内容:
2> (world,1)
5> (hello,2)