Flink集群部署
Flink是一个非常灵活的处理框架,它支持多种不同的部署场景,还可以和不同的资源管理平台方便地集成。
1. 集群角色
Flink提交作业和执行任务,需要几个关键组件:
1.1 客户端(Client)
代码由客户端获取并做转换,之后提交给JobManger
1.2 JobManager
Flink集群里的"管事人",对作业进行中央调度管理;而它获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的TaskManager。
1.3 TaskManager
真正"干活的人",数据的处理操作都是它们来做的。
2. Flink集群搭建
Flink是一个非常灵活的处理框架,它支持多种不同的部署场景,还可以和不同的资源管理平台方便地集成。接下来讲述不同情形下的Flink部署。
2.1 集群规划
节点服务器 | hadoop102 | hadoop103 | hadoop04 |
---|---|---|---|
角色 | JobManager、TaskManager | TaskManager | TaskManager |
2.2 下载并解压安装包
- 访问flink官网,下载安装包flink-1.17.2-bin-scala_2.12.tgz,将该压缩包上传到hadoop102节点服务器的/opt/software路径上。
[jack@hadoop102 software]$ ll
总用量 490400
-rw-rw-r--. 1 jack jack 360277461 8月 3 22:22 flink-1.17.2-bin-scala_2.12.tgz
-rw-rw-r--. 1 jack jack 141887242 8月 3 21:04 jdk-8u391-linux-x64.tar.gz
- 在/opt/software路径上解压flink-1.17.2-bin-scala_2.12.tgz到/opt/module路径上。
[jack@hadoop102 software]$ tar -xvf flink-1.17.2-bin-scala_2.12.tgz -C ../module/
2.3 修改集群配置
- 进入conf路径,修改flink-conf.yaml文件,指定hadoop102节点服务器为JobManager。
[jack@hadoop102 software]$ cd /opt/module/flink-1.17.2/conf/
[jack@hadoop102 conf]$ vim flink-conf.yaml
## 修改如下内容:
# JobManager节点地址,bind-host是给集群中其他机器识别用的,0.0.0.0表示允许任何机器连接.
jobmanager.rpc.address: hadoop102
rest.address: hadoop102
rest.bind-address: 0.0.0.0
- 修改workers文件,指定hadoop102、hadoop103和hadoop104为TaskManager。
[jack@hadoop102 conf]$ vim workers
## 修改如下内容:
hadoop102
hadoop103
hadoop104
- 修改masters文件
[jack@hadoop102 conf]$ vim masters
修改如下内容:
hadoop102:8081
- 在flink-conf.yaml文件中可以对集群中的JobManager和TaskManager组件进行优化配置,主要配置项如下:
jobmanager.memory.process.size
:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。taskmanager.memory.process.size
:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。taskmanager.numberOfTaskSlots
:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。parallelism.default
:Flink任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
提示
除了taskmanager.memory.process.size
之外,还可以配置taskmanager.memory.flink.size
参数进行设置TaskManager的使用内存大小,区别是taskmanager.memory.flink.size
只包含flink任务处理的内存大小不包含JVM的开销,只需要配置其一即可,在flink部署为standlone模式下推荐使用taskmanager.memory.flink.size
,在部署为docker或者整合hadoop集群时推荐使用taskmanager.memory.process.size
。taskmanager.numberOfTaskSlots
和parallelism.default
区别在于:taskmanager.numberOfTaskSlots
设置taskmanager同时能跑多少任务数目,它是一个静态参数,设置为taskmanager任务总共资源有多少,parallelism.default
表示集群运行动态决定用多少任务、用不用任务资源的限制配置。
2.4 分发安装目录
- 配置修改完毕后,将Flink安装目录发给另外两个节点服务器。
[jack@hadoop102 module]$ xsync flink-1.17.2/
2.5 启动集群
- 在hadoop102节点服务器上执行start-cluster.sh启动Flink集群:
[jack@hadoop102 flink-1.17.2]$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop103.
Starting taskexecutor daemon on host hadoop104.
- 查看进程情况:
[jack@hadoop102 bin]$ jpsall
============= hadoop102 ==========
114625 Jps
104212 TaskManagerRunner
103801 StandaloneSessionClusterEntrypoint
============= hadoop103 ==========
91425 TaskManagerRunner
101558 Jps
============= hadoop104 ==========
50896 TaskManagerRunner
60333 Jps
3. 访问Web UI
启动成功后可以访问http://hadoop102:8081, 进行flink集群和任务进行监控管理。 可以明显看到,当前集群的TaskManager数量为3;由于默认每个TaskManager的Slot数量为1,所以总Slot数和可用Slot数都为3。
4. 向集群提交作业
可以使用已经编写好的读取socket发送的单词并统计单词的个数程序,以该程序SocketStreamWordCount为例,演示如何将任务提交到集群中进行执行。
4.1 环境准备
在hadoop102中执行以下命令启动netcat
[jack@hadoop102 flink-1.17.2]$ nc -lk 7777
4.2 程序打包
- 在我们编写的Flink入门程序的pom.xml文件中添加打包插件的配置,具体如下:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
- 插件配置完毕后,可以配置flink依赖包生命周期改为provide,也就是打包的时候不加入最终的jar包里面,毕竟flink集群已经有了相关的flink的jar,可以避免jar过大:
如果本地运行会报错类找不到,可以修改IDEA运行参数即可:
- 使用IDEA的Maven工具执行package命令:
- 出现如下提示即表示打包成功。
打包完成后,在target目录下即可找到所需JAR包,JAR包会有两个,flink-demo-1.0-SNAPSHOT.jar和original-flink-demo-1.0-SNAPSHOT.jar,因为集群中已经具备任务运行所需的所有依赖,所以建议使用flink-demo-1.0-SNAPSHOT.jar。
4.3 在Web UI上提交作业
- 任务打包完成后,我们打开Flink的WEB UI页面,在右侧导航栏点击"Submit New Job",然后点击按钮"+ Add New",选择要上传运行的JAR包,如下图所示。
JAR包上传完成,如下图所示:
- 点击该JAR包,出现任务配置页面,进行相应配置。 主要配置程序入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点路径等,如下图所示,配置完成后,即可点击按钮"Submit",将任务提交到集群运行。
- 任务提交成功之后,可点击左侧导航栏的"Running Jobs"查看程序运行列表情况。
- 点击该程序,进入程序实时执行情况详情页面:
4.4 测试集群运行count程序
- 在socket端口中输入hello world
[jack@hadoop102 ~]$ nc -lk 7777
hello world
- 快速进入flink执行输出结果页面的方法:点击程序执行流程图的Std.Out:
- 点击TaskManagers菜单,查看具体干活的机器是分配的那一台:
- 可以看到是hadoop103机器在执行,点击查看日志,页面发生跳转后,点击Stdout:
- 可以看到实时计算结果已经出来了,右边刷新按钮可以查看最新单词的统计结果:
5. 命令行提交作业
除了通过WEB UI界面提交任务之外,也可以直接通过命令行来提交任务。
- 先把jar包直接上传到目录flink-1.17.2/job_jars下,没有该文件夹可以手动创建。
[jack@hadoop102 job_jars]$ ll
总用量 24
-rw-rw-r--. 1 jack jack 23108 8月 5 02:37 flink-demo-1.0-SNAPSHOT.jar
- 在命令行使用flink run命令提交作业,参数
-m
指定了提交到的JobManager,-c
指定了入口类。
[jack@hadoop102 flink-1.17.2]$ bin/flink run -m hadoop102:8081 -c com.rocket.flink.SocketStreamWordCount ./job_jars/flink-demo-1.0-SNAPSHOT.jar
Job has been submitted with JobID 0cf9a06026114a52077fa1b8f6d8b102
- 在浏览器中打开Web UI,http://hadoop102:8081 查看应用执行情况。用netcat输入数据,可以在TaskManager的标准输出(Stdout)看到对应的统计结果。