Skip to content

集成Spark

1. 环境准备

Paimon目前支持Spark3.5、3.4、3.3、和3.2。本次使用的Spark版本是3.5.7。

1.1 上传并解压安装包

sh
tar -zxvf spark-3.5.7-bin-hadoop3.tgz -C /opt/module/
mv /opt/module/spark-3.5.7-bin-hadoop3 /opt/module/spark-3.5.7

1.2 配置环境变量

sh
sudo vim /etc/profile.d/my_env.sh
## 添加如下内容
export SPARK_HOME=/opt/module/spark-3.5.7
export PATH=$PATH:$SPARK_HOME/bin

source /etc/profile.d/my_env.sh

1.3 拷贝paimon的jar包到Spark的jars目录

拷贝jar报到spark的jars目录(也可以运行时使用--jars命令,下载地址为https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.5/1.2.0/paimon-spark-3.5-1.2.0.jar

sh
cp paimon-spark-3.5-1.2.0.jar /opt/module/spark/jars

2. Catalog

启动spark-sql时,指定Catalog,切换到Catalog后,Spark现有的表将无法直接访问,可以使用spark_catalog.${database_name}.${table_name}来访问Spark表。Spark支持Catalog
注册Catalog可以启动时指定,也可以配置在spark-defaults.conf中。

2.1 文件系统

sh
spark-sql \
    --conf spark.sql.catalog.fs=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.fs.warehouse=hdfs://hadoop102:8020/spark/paimon/fs

其中,参数前缀为:spark.sql.catalog.catalog名称

sh
USE fs.default;

2.2 Hive

sh
#启动hive的metastore服务
nohup hive --service metastore &

启动时注册Catalog:

sh
spark-sql \
    --conf spark.sql.catalog.hive=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.hive.warehouse=hdfs://hadoop102:8020/spark/paimon/hive \
    --conf spark.sql.catalog.hive.metastore=hive \
    --conf spark.sql.catalog.hive.uri=thrift://hadoop102:9083

切换到该Catalog下的default数据库:

sh
USE hive.default;

禁用 Hive ACID(Hive3):

sql
hive.strict.managed.tables=false
hive.create.as.insert.only=false
metastore.create.as.acid=false

使用hive Catalog通过alter table更改不兼容的列类型时,参见 HIVE-17832。需要配置:

sql
hive.metastore.disallow.inknown.col.type.changes=false

3. 建表

Spark创建Paimon表默认是内部表,当表从Catalog中删除时,其表文件也将被删除。

3.1 创建普通表

sql
CREATE TABLE tests (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);

3.2 创建分区表

sql
CREATE TABLE tests_p (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) 
TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);

通过配置partition.expiration-time,可以自动删除过期的分区。如果定义了主键,则分区字段必须是主键的子集。可以定义以下三类字段为分区字段:

  • 创建时间(推荐):创建时间通常是不可变的,因此您可以放心地将其视为分区字段并将其添加到主键中。
  • 事件时间:事件时间是原表中的一个字段。对于CDC数据来说,比如从MySQL CDC同步的表或者Paimon生成的Changelogs,它们都是完整的CDC数据,包括UPDATE_BEFORE记录,即使你声明了包含分区字段的主键,也能达到独特的效果。
  • CDC op_ts:不能定义为分区字段,无法知道之前的记录时间戳。

3.3 CTAS建表

sql
CREATE TABLE tests1(
    user_id BIGINT,
    item_id BIGINT
);
CREATE TABLE tests2 AS SELECT * FROM tests1;

-- 指定分区
CREATE TABLE tests2_p PARTITIONED BY (dt) AS SELECT * FROM tests_p;
    
-- 指定配置
CREATE TABLE tests3(
       user_id BIGINT,
       item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
CREATE TABLE tests3_op TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM tests3;

-- 指定主键
CREATE TABLE tests_pk TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM tests;

-- 指定主键和分区
CREATE TABLE tests_all PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM tests_p;

3.4 表属性

有关此类属性的完整列表,请参阅配置https://paimon.apache.org/docs/master/maintenance/configurations/

3.5 外部表

在Catalog为hive类型中创建表,如果额外指定location参数,Paimon会创建外部表,如果对应的location已经存有数据,可以简写为:

sql
CREATE TABLE my_table LOCATION '/path/to/table';

4. 修改表

4.1 更改/添加表属性

sql
ALTER TABLE tests SET TBLPROPERTIES (
    'write-buffer-size' = '256 MB'
);

4.2 重命名表

sql
ALTER TABLE tests1 RENAME TO tests_new;

4.3 删除表属性

sql
ALTER TABLE tests UNSET TBLPROPERTIES ('write-buffer-size');

4.4 添加新列

sql
-- flink中不需要使用COLUMNS,Spark中需要指定COLUMNS
ALTER TABLE tests ADD COLUMNS (c1 INT, c2 STRING);
-- 添加列并指定位置
ALTER TABLE tests ADD COLUMN a INT FIRST;

ALTER TABLE tests ADD COLUMN b INT AFTER a;

需要在hive-site.xml中配置支持指定列位置:

xml
<property>
    <name>hive.metastore.disallow.incompatible.col.type.changes</name>
    <value>false</value>
</property>

4.5 重命名列名称

sql
-- flink中不需要使用COLUMN,Spark中需要指定COLUMN
ALTER TABLE tests RENAME COLUMN c1 TO c0;

4.6 删除列

sql
-- flink中不需要使用COLUMNS,Spark中需要指定COLUMNS
ALTER TABLE my_table DROP COLUMNS(c0, c2);

4.7 更改列的可为空

sql
CREATE TABLE tests_null(
id INT, 
coupon_info FLOAT NOT NULL
);
-- Spark只支持将not null改为 nullable
ALTER TABLE tests_null ALTER COLUMN coupon_info DROP NOT NULL;

4.8 更改列注释

sql
ALTER TABLE tests ALTER COLUMN user_id COMMENT 'user id'

4.9 更改列类型

sql
ALTER TABLE tests ALTER COLUMN a TYPE DOUBLE;

5. 插入数据

INSERT语句向表中插入新行。插入的行可以由值表达式或查询结果指定,跟标准的sql语法一致:

sql
INSERT INTO table_identifier [ part_spec ] [ column_list ] { value_expr | query }
  • part_spec:
    可选,指定分区的键值对列表,多个用逗号分隔。可以使用类型文字(例如,date'2019-01-02')。
    语法: PARTITION (分区列名称 = 分区列值 [ , … ] )
  • column_list:
    可选,指定以逗号分隔的字段列表。
    语法:(col_name1 [,column_name2, …])
    所有指定的列都应该存在于表中,并且不能相互重复。它包括除静态分区列之外的所有列。字段列表的大小应与VALUES子句或查询中的数据大小完全相同。
  • value_expr:
    指定要插入的值。可以插入显式指定的值或 NULL。必须使用逗号分隔子句中的每个值。可以指定多于一组的值来插入多行。
    语法:VALUES ( { 值 | NULL } [ , … ] ) [ , ( … ) ]

将Null字段写入Not-null字段

不能将另一个表的可为空列插入到一个表的非空列中。Spark可以使用nvl函数来处理,比如A表的key1是not null,B表的key2是nullable:

sql
INSERT INTO A key1 SELECT nvl(key2, <non-null expression>) FROM B

6. 时间旅行

Paimon的批量读取返回表快照中的所有数据。默认情况下,批量读取返回最新快照。

6.1 读取指定id的快照

可以在查询中使用VERSION AS OF和TIMESTAMP AS OF来进行时间旅行。

sql
SELECT * FROM tests VERSION AS OF 1;
SELECT * FROM tests VERSION AS OF 2;

6.2 读取指定时间戳的快照

sql
-- 查看快照信息
SELECT * FROM `tests&snapshots`;

SELECT * FROM tests TIMESTAMP AS OF '2023-07-03 15:34:20.123';
-- 时间戳指定到秒(向上取整)
SELECT * FROM tests TIMESTAMP AS OF 1688369660;

6.3 读取指定标签

sql
SELECT * FROM tests VERSION AS OF 'my-tag';

6.4 指定范围快照

读取开始快照(不包括)和结束快照之间的增量更改。例如,“3,5”表示快照3和快照5之间的更改:

sql
spark.read()
  .format("paimon")
  .option("incremental-between", "3,5")
  .load("path/to/table")

7. 元数据表

系统表包含有关每个表的元数据和信息,例如创建的快照和使用的选项。用户可以通过批量查询访问系统表。

7.1 快照表Snapshots

通过snapshots表可以查询表的快照历史信息,包括快照中发生的记录数。Spark中使用需要反引号表名$系统表名

sql
SELECT * FROM `tests$snapshots`;

通过查询快照表,可以了解该表的提交和过期信息以及数据的时间旅行。

7.2 模式表Schemas

通过schemas表可以查询该表的历史schema:

sql
-- 直接查询会报错,需要加上反引号
SELECT * FROM `tests$schemas`;

可以连接快照表和模式表以获取给定快照的字段:

sql
SELECT s.snapshot_id, t.schema_id, t.fields 
    FROM `tests$snapshots` s JOIN `tests$schemas` t 
    ON s.schema_id=t.schema_id where s.snapshot_id=3;

7.3 选项表Options

可以通过选项表查询DDL中指定的表的选项信息。未显示的选项将是默认值:

sql
SELECT * FROM `tests$options`;

7.4 审计日志表Audit log

如果需要审计表的changelog,可以使用audit_log系统表。通过audit_log表,获取表增量数据时可以获取rowkind列。您可以利用该栏目进行过滤等操作来完成审核。rowkind 有四个值:

  • +I:插入操作。
  • -U:使用更新行的先前内容进行更新操作。
  • +U:使用更新行的新内容进行更新操作。
  • -D:删除操作。
sql
SELECT * FROM `tests$audit_log`;

7.5 文件表Files

sql
-- 查询最新快照的文件
SELECT * FROM `tests$files`;

7.6 标签表Tags

通过tags表可以查询表的标签历史信息,包括基于哪些快照进行标签以及快照的一些历史信息。您还可以通过名称获取所有标签名称和时间旅行到特定标签的数据。

sql
SELECT * FROM `tests$tags`;