Skip to content

与Flink DataStream集成

1. 环境准备

xml
<properties>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.17.2</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>1.7.36</slf4j.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>   <!--不会打包到依赖中,只参与编译,不参与运行 -->
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-files</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!--idea运行时也有webui-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.14.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.3.6</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-flink-runtime-1.19</artifactId>
        <version>1.6.1</version>
    </dependency>
</dependencies>

1.2 配置日志

resources目录下新建log4j.properties。

ini
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

2. 读取数据

2.1 常规Source写法

  1. Batch方式
java
public static void main(String[] args) throws Exception {
    System.setProperty("HADOOP_USER_NAME", "jack");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(new Configuration());
    env.setParallelism(1);
    // 获取TableLoader
    String path = "hdfs://hadoop102:8020/warehouse/flink-iceberg/default_database/ice_sample3";
    // 只能加载iceberg中hadoop类别的表,hive类别的不能通过fromHadoopTable加载
    TableLoader tableLoader = TableLoader
            .fromHadoopTable(path);
    // 通过FlinkSource加载表
    DataStream<RowData> rowDataDS = FlinkSource
            .forRowData()
            .env(env)
            .tableLoader(tableLoader)
            // batch方式-false, streaming方式-true
            .streaming(false)
            .build();
    rowDataDS.map(rowData-> Tuple2.of(rowData.getInt(0), rowData.getString(1).toString()))
                    .returns(Types.TUPLE(Types.INT, Types.STRING))
                    .print();
    env.execute();
}

运行结果:
Alt text 读取hive类别的iceberg表麻烦一些:

java
public static void main(String[] args) throws Exception {
    System.setProperty("HADOOP_USER_NAME", "jack");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
    env.setParallelism(1);
    // 获取TableLoader
    String hiveMetastoreUri = "thrift://hadoop103:9083";
    HashMap<String, String> properties = new HashMap<>();
    properties.put("uri", hiveMetastoreUri);
    properties.put("warehouse", "hdfs://hadoop102:8020/flink/warehouse/");
    org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
    // 获得hive的CataLoader
    CatalogLoader hiveCatalogLoader = CatalogLoader.hive("hive_catalog", hadoopConf, properties);
    // 库名 + 表名
    TableIdentifier tableIdentifier = TableIdentifier.of("flink_db", "t_t1");
    TableLoader tableLoader = TableLoader.fromCatalog(hiveCatalogLoader, tableIdentifier);
    tableLoader.open();
    // 通过FlinkSource加载表
    DataStream<RowData> rowDataDS = FlinkSource
            .forRowData()
            .env(env)
            .tableLoader(tableLoader)
            // batch方式-false, streaming方式-true
            .streaming(false)
            .build();
    rowDataDS.print();
    env.execute();
}

执行结果:
Alt text 2. Streaming方式

java
public static void main(String[] args) throws Exception {
    System.setProperty("HADOOP_USER_NAME", "jack");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(new Configuration());
    env.setParallelism(1);
    // 获取TableLoader
    String path = "hdfs://hadoop102:8020/warehouse/flink-iceberg/default_database/ice_sample3";
    // 只能加载iceberg中hadoop类别的表,hive类别的不能通过fromHadoopTable加载
    TableLoader tableLoader = TableLoader
            .fromHadoopTable(path);
    // 通过FlinkSource加载表
    DataStream<RowData> rowDataDS = FlinkSource
            .forRowData()
            .env(env)
            .tableLoader(tableLoader)
            // batch方式-false, streaming方式-true
            .streaming(true)
            .build();
    rowDataDS.map(rowData-> Tuple2.of(rowData.getInt(0), rowData.getString(1).toString()))
                    .returns(Types.TUPLE(Types.INT, Types.STRING))
                    .print();
    env.execute();
}

2.2 FLIP-27 Source写法

  1. Batch方式
java
public static void main(String[] args) throws Exception {
    System.setProperty("HADOOP_USER_NAME", "jack");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(new Configuration());
    env.setParallelism(1);
    // 获取TableLoader
    String path = "hdfs://hadoop102:8020/warehouse/flink-iceberg/default_database/ice_sample3";
    TableLoader tableLoader = TableLoader.fromHadoopTable(path);
    // flink1.12后版本支持IcebergSource写法
    IcebergSource<RowData> icebergSource = IcebergSource
            .forRowData()
            .tableLoader(tableLoader)
            .assignerFactory(new SimpleSplitAssignerFactory())
            .build();

    DataStreamSource<RowData> icebergDS = env.fromSource(icebergSource,
            WatermarkStrategy.noWatermarks(),
            "iceberg source",
            TypeInformation.of(RowData.class));
    icebergDS.map(rowData -> Tuple2.of(rowData.getInt(0), rowData.getString(1).toString()))
            .returns(Types.TUPLE(Types.INT, Types.STRING))
            .print();
    env.execute();
}

执行的时候报错包访问受限,可以设置JVM参数:

ini
--add-opens java.base/java.lang.invoke=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED

执行结果:
Alt text 2. Streaming方式

java
public static void main(String[] args) throws Exception {
    System.setProperty("HADOOP_USER_NAME", "jack");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(new Configuration());
    env.setParallelism(1);
    // 获取TableLoader
    String path = "hdfs://hadoop102:8020/warehouse/flink-iceberg/default_database/ice_sample3";
    TableLoader tableLoader = TableLoader.fromHadoopTable(path);
    // flink1.12后版本支持IcebergSource写法
    IcebergSource<RowData> icebergSource = IcebergSource
            .forRowData()
            .tableLoader(tableLoader)
            .assignerFactory(new SimpleSplitAssignerFactory())
            // 默认是false
            .streaming(true)
            .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
            .monitorInterval(Duration.ofSeconds(60))
            .build();

    DataStreamSource<RowData> icebergDS = env.fromSource(icebergSource,
            WatermarkStrategy.noWatermarks(),
            "iceberg source",
            TypeInformation.of(RowData.class));
    icebergDS.map(rowData -> Tuple2.of(rowData.getInt(0), rowData.getString(1).toString()))
            .returns(Types.TUPLE(Types.INT, Types.STRING))
            .print();
    env.execute();
}

3. 写入数据

目前支持DataStream<RowData>DataStream<Row>格式的数据流写入Iceberg表。

3.1 写入方式支持append、overwrite、upsert

java
public static void main(String[] args) throws Exception {
    System.setProperty("HADOOP_USER_NAME", "jack");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(new Configuration());
    env.setParallelism(1);
    // 获取TableLoader
    String path = "hdfs://hadoop102:8020/warehouse/flink-iceberg/default_database/ice_sample3";
    TableLoader tableLoader = TableLoader.fromHadoopTable(path);

    SingleOutputStreamOperator<RowData> input = env.fromElements("")
            .map((MapFunction<String, RowData>) s -> {
                GenericRowData genericRowData = new GenericRowData(2);
                genericRowData.setField(0, 2);
                genericRowData.setField(1, new BinaryStringData("Anna"));
                return genericRowData;
            });
    FlinkSink.forRowData(input)
            .tableLoader(tableLoader)
            .append();            // append方式
            //.overwrite(true);   // overwrite方式
            //.upsert(true);       // upsert方式
    env.execute("执行插入数据");
}

查询数据插入情况:
Alt text

4. 合并小文件

Iceberg现在不支持在flink sql中检查表,需要使用Iceberg提供的Java API来读取元数据来获得表信息。将小文件重写为大文件:

scala
public static void main(String[] args) throws Exception {
    System.setProperty("HADOOP_USER_NAME", "jack");
    String path = "hdfs://hadoop102:8020/warehouse/flink-iceberg";
    // 1.获取 Table对象
    // 1.1 创建 catalog对象
    org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
    HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, path);

    // 1.2 通过 catalog加载 Table对象
    Table table = hadoopCatalog.loadTable(TableIdentifier.of("default_database", "ice_sample3"));

    // 有Table对象,就可以获取元数据、进行维护表的操作
    System.out.println(table.history());
//        System.out.println(table.expireSnapshots().expireOlderThan());
    // 2.通过 Actions 来操作 合并
    Actions.forTable(table)
            .rewriteDataFiles()
            .targetSizeInBytes(1024L)
            .execute();
}

可以看到代码只调用iceberg相关的api,没有使用flink。在得到Table对象后,就可以获取元数据、进行维护表的操作。运行结果:
Alt text