DataStream方式的应用
1. 添加依赖
由于Hadoop集群使用的是JDK8环境,如果部署FLink为yarn模式的话,需要设置FLink的jdk环境为1.8。
xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink-version>1.17.2</flink-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink-version}</version>
<scope>provided</scope>
</dependency>
<!-- Flink CDC 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.31</version>
</dependency>
<!--idea运行时也有webui-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.6</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>io.debezium:debezium-connector-mysql:1.9.8.Final</artifact>
<excludes>
<!--MySqlConnection类在flink-connector-mysql-cdc-3.3.0.jar被重写,不能使用debezium-connector-mysql中的MySqlConnection类 -->
<exclude>io/debezium/connector/mysql/MySqlConnection.class</exclude>
</excludes>
</filter>
</filters>
<!-- 以下配置确保生成可执行的 Flat JAR -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 指定主类,如果你有可执行的主类 -->
<mainClass>com.rocket.flink.FlinkCDCDataStreamTest</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
2. 编写代码
java
public class FlinkCDCDataStreamTest {
public static void main(String[] args) throws Exception {
// 1. 准备流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 开启检查点 Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,
// 需要从Checkpoint或者Savepoint启动程序
// 2.1 开启Checkpoint,每隔5秒钟做一次CK ,并指定CK的一致性语义
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
// 2.2 设置超时时间为 1 分钟
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
// 2.3 设置两次重启的最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// 2.4 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 2.5 指定从 CK 自动重启策略
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1L), Time.minutes(1L)
));
// 2.6 设置状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://hadoop102:8020/flinkCDC"
);
// 2.7 设置访问HDFS的用户名
System.setProperty("HADOOP_USER_NAME", "jack");
// 使用flinkcdc构建mysql数据源
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hadoop104")
.port(3307)
.username("root")
.password("root")
.databaseList("test")
.tableList("test.t_user")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
// 读取数据
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source")
.print();
env.execute();
}
}
点击maven命令:clean package,上传到FLink的UI界面上: 运行后查看日志: