DDL语句
1. 数据库
1.1 创建数据库
- 语法
CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
[COMMENT database_comment]
WITH (key1=val1, key2=val2, ...)
1.2 查询数据库
-- 查询所有数据库
SHOW DATABASES;
-- 查询当前数据库
SHOW CURRENT DATABASE;
1.3 修改数据库
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)
1.4 删除数据库
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
- RESTRICT:删除非空数据库会触发异常。默认启用。
- CASCADE:删除非空数据库也会删除所有相关的表和函数。
1.5 切换当前数据库
USE database_name;
2. 创建表
2.1 语法
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] | AS select_query ]
2.2 physical_column_definition
物理列是数据库中所说的常规列。其定义了物理介质中存储的数据中字段的名称、类型和顺序。其他类型的列可以在物理列之间声明,但不会影响最终的物理列的读取。
2.3 metadata_column_definition
元数据列是SQL标准的扩展,允许访问数据源本身具有的一些元数据。元数据列由 METADATA关键字标识。例如我们可以使用元数据列从Kafka记录中读取和写入时间戳,用于基于时间的操作(这个时间戳不是数据中的某个时间戳字段,而是数据写入Kafka时,Kafka引擎给这条数据打上的时间戳标记)。connector和format文档列出了每个组件可用的元数据字段。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka'
...
);
如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样, FROM xxx子句可省略:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
'connector' = 'kafka'
...
);
如果自定义列的数据类型和Connector中定义的metadata字段的数据类型不一致,程序运行时会自动 cast强转,但是这要求两种数据类型是可以强转的。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 将时间戳强转为 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);
默认情况下,Flink SQL planner认为metadata列可以读取和写入。在许多情况下,外部系统提供的字段都是只读元数据,使用VIRTUAL关键字表示该字段只读:
CREATE TABLE MyTable (
`timestamp` BIGINT METADATA,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);
2.4 computed_column_definition
计算列就是拿已有的一些列经过一些自定义的运算生成的新列,在物理上并不存储在表中,只能读不能写。列的数据类型从给定的表达式自动派生,无需手动声明字段类型。
CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
`cost` AS price * quanitity
) WITH (
'connector' = 'kafka'
...
);
2.5 定义Watermark
Flink SQL提供了几种WATERMARK生成策略:
- 严格升序:
WATERMARK FOR rowtime_column AS rowtime_column
Flink任务认为时间戳只会越来越大,也不存在相等的情况,只要相等或者小于之前的,就认为是迟到的数据。 - 递增:
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
一般基本不用这种方式。如果设置此类,则允许有相同的时间戳出现。 - 有界无序:
WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL 'string' timeUnit
此类策略就可以用于设置最大乱序时间,比如设置为WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
,则生成的是运行5s延迟的Watermark。一般都用这种Watermark生成策略,此类Watermark生成策略通常用于有数据乱序的场景中。
2.6 PRIMARY KEY
主键约束表明表中的一列或一组列是唯一的,并且它们不包含NULL值。主键唯一地标识表中的一行,只支持 not enforced。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
PARYMARY KEY(user_id) not enforced
) WITH (
'connector' = 'kafka'
...
);
2.7 PARTITIONED BY
创建分区表
2.8 with语句
用于指定外部存储系统的元数据信息。配置属性时,表达式key=val的键和值都应该是字符串字面值。
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`name` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
一般with中的配置项由Flink SQL的Connector(链接外部存储的连接器)来定义,每种Connector提供的with配置项都是不同的。
2.9 LIKE
用于基于现有表的定义创建表。此外,用户可以扩展原始表或排除表的某些部分。可以使用该子句重用(可能还会覆盖)某些连接器属性,或者向外部定义的表添加水印。
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE Orders_with_watermark (
-- Add watermark definition
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- Overwrite the startup-mode
'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;
2.10 AS select_statement(CTAS)
在一个create-table-as-select (CTAS)语句中,还可以通过查询的结果创建和填充表。CTAS是使用单个命令创建数据并向表中插入数据的最简单、最快速的方法。
CREATE TABLE my_ctas_table
WITH (
'connector' = 'kafka',
...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;
注意:CTAS有以下限制:
- 暂不支持创建临时表。
- 目前还不支持指定显式列。
- 还不支持指定显式水印。
- 目前还不支持创建分区表。
- 目前还不支持指定主键约束。
3. 查看表
3.1 查看所有表
SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]
如果没有指定数据库,则从当前数据库返回表。LIKE子句中sql pattern的语法与MySQL方言的语法相同:
%
匹配任意数量的字符,甚至零字符,\%
匹配一个'%'
字符。_
只匹配一个字符,\_
只匹配一个'_'
字符。
3.2 查看表信息
{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name
4. 修改表
4.1 修改表名
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
4.2 修改表属性
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
5. 删除表
DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name