资源配置调优
Flink性能调优的第一步,就是为任务分配合适的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。
提交方式主要是yarn-per-job,资源的分配在使用脚本提交Flink任务时进行指定。 从1.11开始,增加了通用客户端模式,参数使用-D <property=value>
指定:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定yarn队列
-Djobmanager.memory.process.size=1024mb \ 指定JM的总进程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每个TM的总进程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每个TM的slot数
-c com.jack.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
1. 内存设置
1.1 TaskManager内存模型
- JVM特定内存:JVM本身使用的内存,包含JVM的metaspace和over-head:
- JVM metaspace(JVM元空间):
taskmanager.memory.jvm-metaspace.size
,默认256mb - JVM over-head执行开销:JVM执行时自身所需要的内容,包括线程堆栈、IO、编译缓存等所使用的内存。
taskmanager.memory.jvm-overhead.fraction
,默认0.1taskmanager.memory.jvm-overhead.min
,默认192mbtaskmanager.memory.jvm-overhead.max
,默认1gb
总进程内存*fraction,如果小于配置的min(或大于配置的max)大小,则使用min/max大小
- JVM metaspace(JVM元空间):
- 框架内存: Flink框架,即TaskManager本身所占用的内存,不计入Slot的资源中。
- 堆内:
taskmanager.memory.framework.heap.size
,默认128MB - 堆外:
taskmanager.memory.framework.off-heap.size
,默认128MB
- 堆内:
- Task内存:Task执行用户代码时所使用的内存
- 堆内:
taskmanager.memory.task.heap.size
,默认none,由Flink内存扣除掉其他部分的内存得到。
堆外:taskmanager.memory.task.off-heap.size
,默认0,表示不使用堆外内存。
- 堆内:
- 网络内存:网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区
- 堆外:
taskmanager.memory.network.fraction
,默认0.1taskmanager.memory.network.min
,默认64mbtaskmanager.memory.network.max
,默认1gb
Flink内存*fraction,如果小于配置的min(或大于配置的max)大小,则使用min/max大小
- 堆外:
- 托管内存:用于RocksDB State Backend的本地内存和批的排序、哈希表、缓存中间结果。
- 堆外:
taskmanager.memory.managed.fraction
,默认0.4taskmanager.memory.managed.size
,默认none
如果size没指定,则等于Flink内存*fraction
- 堆外:
1.2 案例分析
基于Yarn模式,一般参数指定的是总进程内存,比如指定为4G,每一块内存得到大小如下:
(1)计算JVM开销
JVM元空间256m
JVM执行开销:4g0.1=409.6m,在[192m,1g]之间,最终结果409.6m
Flink内存=4g-256m-409.6m=3430.4m
(2)网络内存=3430.4m0.1=343.04m,在[64m,1g]之间,最终结果 343.04m
(3)托管内存=3430.4m*0.4=1372.16m
(4)框架内存,堆内和堆外都是128m
(5)Task堆内内存=3430.4m-128m-128m-343.04m-1372.16m=1459.2m 所以进程内存给多大,每一部分内存需不需要调整,可以看内存的使用率来调整。
1.3 生产资源配置示例
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定yarn队列
-Djobmanager.memory.process.size=2048mb \ JM2~4G足够
-Dtaskmanager.memory.process.size=4096mb \ 单个TM2~8G足够
-Dtaskmanager.numberOfTaskSlots=2 \ 与容器核数1core:1slot或2core:1slot
-c com.jack.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
Flink是实时流处理,关键在于资源情况能不能抗住高峰时期每秒的数据量,通常用QPS/TPS来描述数据情况。
2. 合理利用cpu资源
Yarn的容量调度器默认情况下是使用“DefaultResourceCalculator”分配策略,只根据内存调度资源,所以在Yarn的资源管理页面上看到每个容器的vcore个数还是1。
可以修改策略为DominantResourceCalculator,该资源计算器在计算资源的时候会综合考虑cpu和内存的情况。在capacity-scheduler.xml中修改属性:
<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>
2.1 使用DefaultResourceCalculator策略
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.jack.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
可以看到一个容器只有一个vcore:
2.2 使用DominantResourceCalculator策略
修改后yarn配置后,分发配置并重启yarn,再次提交flink作业:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.jack.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
看到容器的vcore数变了: JobManager1个,占用1个容器,vcore=1
TaskManager3个,占用3个容器,每个容器vcore=2,总vcore=2*3=6,因为默认单个容器的vcore数=单TM的slot数
2.3 使用DominantResourceCalculator策略并指定容器vcore数
指定yarn容器的vcore数,提交:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Dyarn.containers.vcores=3 \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.jack.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
JobManager1个,占用1个容器,vcore=1
TaskManager3个,占用3个容器,每个容器vcore=3,总vcore=3*3=9
3. 并行度设置
3.1 全局并行度计算
开发完成后,先进行压测。任务并行度给10以下,测试单个并行度的处理上限。然后总QPS/单并行度的处理能力=并行度 开发完Flink作业,压测的方式很简单,先在kafka中积压数据,之后开启Flink任务,出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。 不能只从QPS去得出并行度,因为有些字段少、逻辑简单的任务,单并行度一秒处理几万条数据。而有些数据字段多,处理逻辑复杂,单并行度一秒只能处理1000条数据。最好根据高峰期的QPS压测,并行度*1.2倍,富余一些资源。