集成Spark
1. 环境准备
Paimon目前支持Spark3.5、3.4、3.3、和3.2。本次使用的Spark版本是3.5.7。
1.1 上传并解压安装包
tar -zxvf spark-3.5.7-bin-hadoop3.tgz -C /opt/module/
mv /opt/module/spark-3.5.7-bin-hadoop3 /opt/module/spark-3.5.71.2 配置环境变量
sudo vim /etc/profile.d/my_env.sh
## 添加如下内容
export SPARK_HOME=/opt/module/spark-3.5.7
export PATH=$PATH:$SPARK_HOME/bin
source /etc/profile.d/my_env.sh1.3 拷贝paimon的jar包到Spark的jars目录
拷贝jar报到spark的jars目录(也可以运行时使用--jars命令,下载地址为https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.5/1.2.0/paimon-spark-3.5-1.2.0.jar
cp paimon-spark-3.5-1.2.0.jar /opt/module/spark/jars2. Catalog
启动spark-sql时,指定Catalog,切换到Catalog后,Spark现有的表将无法直接访问,可以使用spark_catalog.${database_name}.${table_name}来访问Spark表。Spark支持Catalog
注册Catalog可以启动时指定,也可以配置在spark-defaults.conf中。
2.1 文件系统
spark-sql \
--conf spark.sql.catalog.fs=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.fs.warehouse=hdfs://hadoop102:8020/spark/paimon/fs其中,参数前缀为:spark.sql.catalog.catalog名称
USE fs.default;2.2 Hive
#启动hive的metastore服务
nohup hive --service metastore &启动时注册Catalog:
spark-sql \
--conf spark.sql.catalog.hive=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.hive.warehouse=hdfs://hadoop102:8020/spark/paimon/hive \
--conf spark.sql.catalog.hive.metastore=hive \
--conf spark.sql.catalog.hive.uri=thrift://hadoop102:9083切换到该Catalog下的default数据库:
USE hive.default;禁用 Hive ACID(Hive3):
hive.strict.managed.tables=false
hive.create.as.insert.only=false
metastore.create.as.acid=false使用hive Catalog通过alter table更改不兼容的列类型时,参见 HIVE-17832。需要配置:
hive.metastore.disallow.inknown.col.type.changes=false3. 建表
Spark创建Paimon表默认是内部表,当表从Catalog中删除时,其表文件也将被删除。
3.1 创建普通表
CREATE TABLE tests (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);3.2 创建分区表
CREATE TABLE tests_p (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh)
TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);通过配置partition.expiration-time,可以自动删除过期的分区。如果定义了主键,则分区字段必须是主键的子集。可以定义以下三类字段为分区字段:
- 创建时间(推荐):创建时间通常是不可变的,因此您可以放心地将其视为分区字段并将其添加到主键中。
- 事件时间:事件时间是原表中的一个字段。对于CDC数据来说,比如从MySQL CDC同步的表或者Paimon生成的Changelogs,它们都是完整的CDC数据,包括UPDATE_BEFORE记录,即使你声明了包含分区字段的主键,也能达到独特的效果。
- CDC op_ts:不能定义为分区字段,无法知道之前的记录时间戳。
3.3 CTAS建表
CREATE TABLE tests1(
user_id BIGINT,
item_id BIGINT
);
CREATE TABLE tests2 AS SELECT * FROM tests1;
-- 指定分区
CREATE TABLE tests2_p PARTITIONED BY (dt) AS SELECT * FROM tests_p;
-- 指定配置
CREATE TABLE tests3(
user_id BIGINT,
item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
CREATE TABLE tests3_op TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM tests3;
-- 指定主键
CREATE TABLE tests_pk TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM tests;
-- 指定主键和分区
CREATE TABLE tests_all PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM tests_p;3.4 表属性
有关此类属性的完整列表,请参阅配置https://paimon.apache.org/docs/master/maintenance/configurations/
3.5 外部表
在Catalog为hive类型中创建表,如果额外指定location参数,Paimon会创建外部表,如果对应的location已经存有数据,可以简写为:
CREATE TABLE my_table LOCATION '/path/to/table';4. 修改表
4.1 更改/添加表属性
ALTER TABLE tests SET TBLPROPERTIES (
'write-buffer-size' = '256 MB'
);4.2 重命名表
ALTER TABLE tests1 RENAME TO tests_new;4.3 删除表属性
ALTER TABLE tests UNSET TBLPROPERTIES ('write-buffer-size');4.4 添加新列
-- flink中不需要使用COLUMNS,Spark中需要指定COLUMNS
ALTER TABLE tests ADD COLUMNS (c1 INT, c2 STRING);
-- 添加列并指定位置
ALTER TABLE tests ADD COLUMN a INT FIRST;
ALTER TABLE tests ADD COLUMN b INT AFTER a;需要在hive-site.xml中配置支持指定列位置:
<property>
<name>hive.metastore.disallow.incompatible.col.type.changes</name>
<value>false</value>
</property>4.5 重命名列名称
-- flink中不需要使用COLUMN,Spark中需要指定COLUMN
ALTER TABLE tests RENAME COLUMN c1 TO c0;4.6 删除列
-- flink中不需要使用COLUMNS,Spark中需要指定COLUMNS
ALTER TABLE my_table DROP COLUMNS(c0, c2);4.7 更改列的可为空
CREATE TABLE tests_null(
id INT,
coupon_info FLOAT NOT NULL
);
-- Spark只支持将not null改为 nullable
ALTER TABLE tests_null ALTER COLUMN coupon_info DROP NOT NULL;4.8 更改列注释
ALTER TABLE tests ALTER COLUMN user_id COMMENT 'user id'4.9 更改列类型
ALTER TABLE tests ALTER COLUMN a TYPE DOUBLE;5. 插入数据
INSERT语句向表中插入新行。插入的行可以由值表达式或查询结果指定,跟标准的sql语法一致:
INSERT INTO table_identifier [ part_spec ] [ column_list ] { value_expr | query }- part_spec:
可选,指定分区的键值对列表,多个用逗号分隔。可以使用类型文字(例如,date'2019-01-02')。
语法:PARTITION (分区列名称 = 分区列值 [ , … ] ) - column_list:
可选,指定以逗号分隔的字段列表。
语法:(col_name1 [,column_name2, …])
所有指定的列都应该存在于表中,并且不能相互重复。它包括除静态分区列之外的所有列。字段列表的大小应与VALUES子句或查询中的数据大小完全相同。 - value_expr:
指定要插入的值。可以插入显式指定的值或 NULL。必须使用逗号分隔子句中的每个值。可以指定多于一组的值来插入多行。
语法:VALUES ( { 值 | NULL } [ , … ] ) [ , ( … ) ]
将Null字段写入Not-null字段
不能将另一个表的可为空列插入到一个表的非空列中。Spark可以使用nvl函数来处理,比如A表的key1是not null,B表的key2是nullable:
INSERT INTO A key1 SELECT nvl(key2, <non-null expression>) FROM B6. 时间旅行
Paimon的批量读取返回表快照中的所有数据。默认情况下,批量读取返回最新快照。
6.1 读取指定id的快照
可以在查询中使用VERSION AS OF和TIMESTAMP AS OF来进行时间旅行。
SELECT * FROM tests VERSION AS OF 1;
SELECT * FROM tests VERSION AS OF 2;6.2 读取指定时间戳的快照
-- 查看快照信息
SELECT * FROM `tests&snapshots`;
SELECT * FROM tests TIMESTAMP AS OF '2023-07-03 15:34:20.123';
-- 时间戳指定到秒(向上取整)
SELECT * FROM tests TIMESTAMP AS OF 1688369660;6.3 读取指定标签
SELECT * FROM tests VERSION AS OF 'my-tag';6.4 指定范围快照
读取开始快照(不包括)和结束快照之间的增量更改。例如,“3,5”表示快照3和快照5之间的更改:
spark.read()
.format("paimon")
.option("incremental-between", "3,5")
.load("path/to/table")7. 元数据表
系统表包含有关每个表的元数据和信息,例如创建的快照和使用的选项。用户可以通过批量查询访问系统表。
7.1 快照表Snapshots
通过snapshots表可以查询表的快照历史信息,包括快照中发生的记录数。Spark中使用需要反引号表名$系统表名。
SELECT * FROM `tests$snapshots`;通过查询快照表,可以了解该表的提交和过期信息以及数据的时间旅行。
7.2 模式表Schemas
通过schemas表可以查询该表的历史schema:
-- 直接查询会报错,需要加上反引号
SELECT * FROM `tests$schemas`;可以连接快照表和模式表以获取给定快照的字段:
SELECT s.snapshot_id, t.schema_id, t.fields
FROM `tests$snapshots` s JOIN `tests$schemas` t
ON s.schema_id=t.schema_id where s.snapshot_id=3;7.3 选项表Options
可以通过选项表查询DDL中指定的表的选项信息。未显示的选项将是默认值:
SELECT * FROM `tests$options`;7.4 审计日志表Audit log
如果需要审计表的changelog,可以使用audit_log系统表。通过audit_log表,获取表增量数据时可以获取rowkind列。您可以利用该栏目进行过滤等操作来完成审核。rowkind 有四个值:
+I:插入操作。-U:使用更新行的先前内容进行更新操作。+U:使用更新行的新内容进行更新操作。-D:删除操作。
SELECT * FROM `tests$audit_log`;7.5 文件表Files
-- 查询最新快照的文件
SELECT * FROM `tests$files`;7.6 标签表Tags
通过tags表可以查询表的标签历史信息,包括基于哪些快照进行标签以及快照的一些历史信息。您还可以通过名称获取所有标签名称和时间旅行到特定标签的数据。
SELECT * FROM `tests$tags`;