代码中使用FlinkSQL
1. 需要引入的依赖
在代码中使用Table API,必须引入相关的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
flink-table-api-java-bridge主要就是负责Table API和下层DataStream API的连接支持,按照不同的语言分为Java版和Scala版。如果我们希望在本地的集成开发环境(IDE)里运行Table API和SQL,还需要引入以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
2. 创建表环境
使用Table API和SQL需要一个特别的运行时环境,这就是所谓的"表环境"(TableEnvironment)。它主要负责:
(1)注册Catalog和表;
(2)执行 SQL 查询;
(3)注册用户自定义函数(UDF);
(4)DataStream 和表之间的转换。
每个表和SQL的执行,都必须绑定在一个表环境(TableEnvironment)中。
// 方式一
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode() // 使用流处理模式
.build();
TableEnvironment tableEnv = TableEnvironment.create(setting);
// 方式二
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
3. 创建表
创建表的方式,有通过连接器(connector)和虚拟表(virtual tables)两种。
3.1 连接器表(Connector Tables)
通过连接器(connector)连接到一个外部系统,然后定义出对应的表结构。
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");
3.2 虚拟表(Virtual Tables)
在环境中注册之后,我们就可以在SQL中直接使用这张表进行查询转换了。
tableEnv.createTemporaryView("MyTable", MyTable);
Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");
我们发现,这里的注册其实是创建了一个"虚拟表"(Virtual Table)。这个概念与SQL语法中的视图(View)非常类似,所以调用的方法也叫作创建"虚拟视图"(createTemporaryView)。
4. 表的查询
对一个表的查询(Query)操作,就对应着流数据的转换(Transform)处理。Flink为我们提供了两种查询方式:SQL,和Table API。
4.1 执行SQL进行查询
调用表环境的sqlQuery()方法,传入一个字符串形式的SQL查询语句就可以了。执行得到的结果,是一个Table对象。
// 创建表环境
TableEnvironment tableEnv = ...;
// 创建表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
// 查询用户Alice的点击事件,并提取表中前两个字段
Table aliceVisitTable = tableEnv.sqlQuery(
"SELECT user, url " +
"FROM EventTable " +
"WHERE user = 'Alice' "
);
4.2 调用Table API进行查询
Table API是嵌入编程语言中的DSL,SQL中的很多特性和功能必须要有对应的实现才可以使用,因此跟直接写SQL比起来肯定就要麻烦一些。"$"符号用来指定表中的一个字段
Table maryClickTable = eventTable
.where($("user").isEqual("Alice"))
.select($("url"), $("user"));
5. 输出表
表的创建和查询,就对应着流处理中的读取数据源(Source)和转换(Transform);而最后一个步骤Sink,也就是将结果数据输出到外部系统,就对应着表的输出操作。
// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
// 经过查询转换,得到结果表
Table result = ...
// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");
在底层,表的输出是通过将数据写入到TableSink来实现的。TableSink是Table API中提供的一个向外部系统写入数据的通用接口,可以支持不同的文件格式(比如CSV、Parquet)、存储数据库(比如JDBC、Elasticsearch)和消息队列(比如Kafka)。
6. 表和流的转换
6.1 将流(DataStream)转换成表(Table)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 读取数据源
SingleOutputStreamOperator<WaterSensor> sensorDS = env.fromSource(...)
// 将数据流转换成表
Table sensorTable = tableEnv.fromDataStream(sensorDS);
// 希望直接在SQL中引用这张表
tableEnv.createTemporaryView("sensorTable",sensorDS, $("id"),$("ts"),$("vc"));
另外,我们还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置:
// 提取Event中的timestamp和url作为表中的列
Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id"), $("vc"));
6.2 将表(Table)转换成流(DataStream)
将一个Table对象转换成DataStream非常简单,只要直接调用表环境的方法toDataStream()就可以了。
tableEnv.toDataStream(table).print();
对于分组聚合统计的表,所以表中的每一行是会"更新"的。对于这样有更新操作的表,需要记录一下它的"更新日志"(change log)。
Table table = tableEnv.sqlQuery(
"SELECT id, sum(vc) " +
"FROM source " +
"GROUP BY id "
);
// 将表转换成更新日志流
tableEnv.toChangelogStream(table).print();