与SparkSql集成
使用spark-sql环境进行操作,基于外置Hive保存数据到Hadoop上。
1. 环境准备
1.1 下载iceberg
访问https://repo1.maven.org/maven2/org/apache/iceberg/, 下载对应Spark版本的iceberg运行jar包:
提示
目前最新版本为1.8.0,但是iceberg从1.7.0开始不再支持jdk1.8, iceberg1.6.1支持jdk1.8
1.2 拷贝iceberg的jar包到Spark的jars目录
cp /opt/software/iceberg-spark-runtime-3.4_2.12-1.6.1.jar /opt/module/spark-3.4.2/jars
1.3 启动环境依赖
启动Hadoop集群和Hive的metastore服务
2. 配置SparkSql
Spark中支持两种Catalog的设置:hive和hadoop,Hive Catalog就是Iceberg表存储使用Hive默认的数据路径,Hadoop Catalog需要指定Iceberg格式表存储路径。默认使用Hive Catalog。
2.1 配置Hive Catalog
编辑spark-defaults.conf文件,添加额外自定义的Hive Catalog。
[jack@hadoop103 spark-3.4.2]$ cd conf/
[jack@hadoop103 conf]$ cp spark-defaults.conf.template spark-defaults.conf
## 配置如下内容
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://hadoop103:9083
2.2 配置Hadoop Catalog
编辑 spark-defaults.conf文件
[jack@hadoop103 conf]$ vim spark-defaults.conf
## 配置如下内容
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://hadoop102:8020/warehouse/spark-iceberg
3. 启动SparkSql
- 执行thriftserver的启动脚本:
./sbin/start-thriftserver.sh
- 连接sparksql
./bin/beeline -u jdbc:hive2://hadoop103:10000 -n jack
4. 创建表
建表支持以下语法特性:
- PARTITIONED BY (partition-expressions) :配置分区
- LOCATION '(fully-qualified-uri)' :指定表路径
- COMMENT 'table documentation' :配置表备注
- TBLPROPERTIES ('key'='value', ...) :配置表属性
对Iceberg表的每次更改都会生成一个新的元数据文件(json文件)以提供原子性。默认情况下,旧元数据文件作为历史文件保存不会删除。
4.1 创建内部表
--
CREATE TABLE ice_spark_sample1 (
id bigint COMMENT 'unique id',
data string)
USING iceberg
location '/spark/warehouse/ice_spark_sample1';
查看ice_spark_sample1表信息:
0: jdbc:hive2://hadoop103:10000> show create table ice_spark_sample1;
+----------------------------------------------------+
| createtab_stmt |
+----------------------------------------------------+
| CREATE TABLE hive_prod.default.ice_spark_sample1 (
id BIGINT COMMENT 'unique id',
data STRING)
USING iceberg
LOCATION 'hdfs://hadoop102:8020/spark/warehouse/ice_spark_sample1'
TBLPROPERTIES (
'current-snapshot-id' = 'none',
'format' = 'iceberg/parquet',
'format-version' = '2',
'write.parquet.compression-codec' = 'zstd')
|
+----------------------------------------------------+
1 row selected (0.38 seconds)
查看HDFS上面记录的元数据json文件信息:
4.2 创建外部表
CREATE EXTERNAL TABLE ice_spark_sample2 (
id bigint COMMENT 'unique id',
data string)
USING iceberg
location '/spark/warehouse/ice_spark_sample2';
查看ice_spark_sample2表信息:
0: jdbc:hive2://hadoop103:10000> desc formatted ice_spark_sample2;
+-------------------------------+----------------------------------------------------+------------+
| col_name | data_type | comment |
+-------------------------------+----------------------------------------------------+------------+
| id | bigint | unique id |
| data | string | NULL |
| | | |
| # Metadata Columns | | |
| _spec_id | int | |
| _partition | struct<> | |
| _file | string | |
| _pos | bigint | |
| _deleted | boolean | |
| | | |
| # Detailed Table Information | | |
| Name | hive_prod.default.ice_spark_sample2 | |
| Type | EXTERNAL | |
| Location | hdfs://hadoop102:8020/spark/warehouse/ice_spark_sample2 | |
| Provider | iceberg | |
| Owner | jack | |
| Table Properties | [current-snapshot-id=none,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd] | |
+-------------------------------+----------------------------------------------------+------------+
17 rows selected (0.329 seconds)
4.3 创建分区表
CREATE external TABLE ice_spark_sample3 (
id bigint,
data string,
category string)
USING iceberg
PARTITIONED BY (category)
location '/spark/warehouse/ice_spark_sample3';
insert into ice_spark_sample3 values (1, 'jack', '1'), (2, 'tuanzi', '2'), (3, 'yuanyuan', '1');
查看建表信息:
0: jdbc:hive2://hadoop103:10000> show create table ice_spark_sample3;
+----------------------------------------------------+
| createtab_stmt |
+----------------------------------------------------+
| CREATE TABLE hive_prod.default.ice_spark_sample3 (
id BIGINT,
data STRING,
category STRING)
USING iceberg
PARTITIONED BY (category)
LOCATION 'hdfs://hadoop102:8020/spark/warehouse/ice_spark_sample3'
TBLPROPERTIES (
'current-snapshot-id' = '3117539524054604329',
'format' = 'iceberg/parquet',
'format-version' = '2',
'write.parquet.compression-codec' = 'zstd')
|
+----------------------------------------------------+
查看HDFS分区信息:
4.4 创建隐藏分区表
所谓隐藏分区表就是里面的分区是根据字段计算而出,不是按照某字段的值进行分区。
CREATE external TABLE ice_spark_sample4 (
id bigint,
data string,
category string,
ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category)
location '/spark/warehouse/ice_spark_sample4';
支持的转换函数有:
- years(ts):按年划分
- months(ts):按月划分
- days(ts)或date(ts):等效于dateint分区
- hours(ts)或date_hour(ts):等效于dateint和hour分区
- bucket(N, col):按哈希值划分mod N个桶
- truncate(L, col):按截断为L的值划分
查看表字段信息:
0: jdbc:hive2://hadoop103:10000> desc ice_spark_sample4;
+-----------------+-----------------+----------+
| col_name | data_type | comment |
+-----------------+-----------------+----------+
| id | bigint | NULL |
| data | string | NULL |
| category | string | NULL |
| ts | timestamp | NULL |
| | | |
| # Partitioning | | |
| Part 0 | bucket(16, id) | |
| Part 1 | days(ts) | |
| Part 2 | category | |
+-----------------+-----------------+----------+
4.5 CTAS建表
0: jdbc:hive2://hadoop103:10000> CREATE external TABLE ice_spark_sample5
. . . . . . . . . . . . . . . .> USING iceberg
. . . . . . . . . . . . . . . .> location '/spark/warehouse/ice_spark_sample5'
. . . . . . . . . . . . . . . .> AS SELECT * from ice_spark_sample3;
+---------+
| Result |
+---------+
+---------+
No rows selected (2.313 seconds)
0: jdbc:hive2://hadoop103:10000> select * from ice_spark_sample5;
+-----+-----------+-----------+
| id | data | category |
+-----+-----------+-----------+
| 1 | jack | 1 |
| 3 | yuanyuan | 1 |
| 2 | tuanzi | 2 |
+-----+-----------+-----------+
3 rows selected (0.366 seconds)
查看表结构,可以发现CTAS建表方式会丢失分区信息:
0: jdbc:hive2://hadoop103:10000> desc ice_spark_sample5;
+-----------+------------+----------+
| col_name | data_type | comment |
+-----------+------------+----------+
| id | bigint | NULL |
| data | string | NULL |
| category | string | NULL |
+-----------+------------+----------+
3 rows selected (1.578 seconds)
不指定分区就是无分区,需要重新指定分区、表属性:
CREATE external TABLE ice_spark_sample6
USING iceberg
PARTITIONED BY (bucket(8, id), hours(ts), category)
TBLPROPERTIES ('key'='value')
location '/spark/warehouse/ice_spark_sample6'
AS SELECT * from ice_spark_sample4;
4.6 使用RTAS建表
-- 若表不存在会报错,不能替换
REPLACE TABLE ice_spark_sample5
USING iceberg
PARTITIONED BY (category)
location '/spark/warehouse/ice_spark_sample5'
AS SELECT * from ice_spark_sample3;
-- 没有表就创建,有就替换方式建表
create or REPLACE TABLE ice_spark_sample8
USING iceberg
PARTITIONED BY (category)
TBLPROPERTIES ('key'='value')
AS SELECT * from ice_hive_sample3;
查看表ice_spark_sample5,发现已经变成了分区表:
0: jdbc:hive2://hadoop103:10000> desc formatted ice_spark_sample5;
+-------------------------------+----------------------------------------------------+----------+
| col_name | data_type | comment |
+-------------------------------+----------------------------------------------------+----------+
| id | bigint | NULL |
| data | string | NULL |
| category | string | NULL |
| # Partition Information | | |
| # col_name | data_type | comment |
| category | string | NULL |
| | | |
| # Metadata Columns | | |
| _spec_id | int | |
| _partition | struct<category:string> | |
| _file | string | |
| _pos | bigint | |
| _deleted | boolean | |
| | | |
| # Detailed Table Information | | |
| Name | hive_prod.default.ice_spark_sample5 | |
| Type | EXTERNAL | |
| Location | hdfs://hadoop102:8020/spark/warehouse/ice_spark_sample5 | |
| Provider | iceberg | |
| Owner | jack | |
| Table Properties | [current-snapshot-id=8445519115344500338,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd] | |
+-------------------------------+----------------------------------------------------+----------+
21 rows selected (0.39 seconds)
5. 修改表
5.1 修改表名
ALTER TABLE ice_spark_sample1 RENAME TO ice_spark_sample11;
5.2 修改表属性
ALTER TABLE ice_spark_sample11 SET TBLPROPERTIES (
'read.split.target-size'='268435456'
);
-- 删除表属性
ALTER TABLE ice_spark_sample11 UNSET TBLPROPERTIES ('read.split.target-size')
5.3 添加字段
ALTER TABLE ice_spark_sample11
ADD COLUMNS (
new_column int comment 'new_column docs'
);
查询表数据:
0: jdbc:hive2://hadoop103:10000> select * from ice_spark_sample1;
+-----+-------+-------------+
| id | data | new_column |
+-----+-------+-------------+
| 1 | jack | NULL |
+-----+-------+-------------+
1 row selected (0.3 seconds)
5.4 修改字段
iceberg允许修改字段类型,但要求修改的类型范围更大
alter table ice_spark_sample1 alter column new_column type bigint;
查看表字段信息:
0: jdbc:hive2://hadoop103:10000> desc ice_spark_sample1;
+-------------+------------+------------------+
| col_name | data_type | comment |
+-------------+------------+------------------+
| id | bigint | unique id |
| data | string | NULL |
| new_column | bigint | new_column docs |
+-------------+------------+------------------+
3 rows selected (0.326 seconds)
5.5 删除字段
ALTER TABLE ice_spark_sample1 DROP COLUMN new_column;
查看表字段信息:
0: jdbc:hive2://hadoop103:10000> desc ice_spark_sample1;
+-----------+------------+------------+
| col_name | data_type | comment |
+-----------+------------+------------+
| id | bigint | unique id |
| data | string | NULL |
+-----------+------------+------------+
2 rows selected (0.045 seconds)
5.6 添加分区
Spark支持额外添加新的分区,需要提前配置:
vim spark-default.conf
## 添加如下内容
spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
重启ThriftServer后,使用beeline再次连接。 将ice_spark_sample1添加分区, 由普通表变成分区表:
ALTER TABLE ice_spark_sample1 ADD PARTITION FIELD bucket(16, id);
查看ice_spark_sample1表信息:
0: jdbc:hive2://hadoop103:10000> desc ice_spark_sample1;
+-----------------+-----------------+------------+
| col_name | data_type | comment |
+-----------------+-----------------+------------+
| id | bigint | unique id |
| data | string | NULL |
| | | |
| # Partitioning | | |
| Part 0 | bucket(16, id) | |
+-----------------+-----------------+------------+
5.7 删除分区
同上需要Spark配置参数项spark.sql.extensions
才能支持:
ALTER TABLE ice_spark_sample1 DROP PARTITION FIELD bucket(16, id);
删除分区字段是元数据操作,不会改变任何现有的表数据。新数据将被写入新的分区,但现有数据将保留在旧的分区布局中。 当分区发生变化时,动态分区覆盖行为也会发生变化。例如,如果按天划分分区,而改为按小时划分分区,那么将覆盖每小时划分的分区,而不再覆盖按天划分的分区。
删除分区字段时要小心,可能导致元数据查询失败或产生不同的结果。
5.8 并行分区写入
ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION;
ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id
6. 删除表
6.1 删除HadoopCatalog表
对于HadoopCatalog而言,运行DROP TABLE将从catalog中删除表并删除表内容。
CREATE EXTERNAL TABLE hadoop_prod.default.sample (
id bigint COMMENT 'unique id',
data string)
USING iceberg
location '/warehouse/spark-iceberg/default/sample';
INSERT INTO hadoop_prod.default.sample values(1,'a');
执行表删除:
DROP TABLE hadoop_prod.default.sample;
删除后发现即使是外部表,包含数据全部被删除,使用时需要注意。
6.2 删除HiveCatalog表
对于HiveCatalog而言:
- 在0.14之前,运行DROP TABLE将从catalog中删除表并删除表内容。
- 从0.14开始,DROP TABLE只会从catalog中删除表,不会删除数据。为了删除表内容,应该使用
DROP table PURGE
。
比如删除ice_spark_sample5, 删除之前发现里面有数据:
0: jdbc:hive2://hadoop103:10000> drop table ice_spark_sample5;
+---------+
| Result |
+---------+
+---------+
No rows selected (1.172 seconds)
执行完毕后,查看HDFS上面数据还在:
7. 插入数据
7.1 插入分区表
0: jdbc:hive2://hadoop103:10000> insert into ice_spark_sample1 values(1, 'jack');
+---------+
| Result |
+---------+
+---------+
No rows selected (5.012 seconds)
查看查看HDFS上的信息:
7.2 插入隐藏分区表
0: jdbc:hive2://hadoop103:10000> insert into ice_spark_sample4 values(3, 'bobo','3',timestamp'2025-02-19 14:51:20'),(4, 'pipia','3', timestamp'2025-01-18 20:11:06');
+---------+
| Result |
+---------+
+---------+
No rows selected (4.27 seconds)
0: jdbc:hive2://hadoop103:10000> select * from ice_spark_sample4;
+-----+--------+-----------+------------------------+
| id | data | category | ts |
+-----+--------+-----------+------------------------+
| 3 | bobo | 3 | 2025-02-19 14:51:20.0 |
| 4 | pipia | 3 | 2025-01-18 20:11:06.0 |
| 2 | test | 3 | 2025-02-18 00:00:00.0 |
| 1 | demo | 2 | 2025-02-19 00:00:00.0 |
+-----+--------+-----------+------------------------+
7.3 MERGE INTO行级更新
匹配规则比较灵活比较好用
MERGE INTO hadoop_prod.default.a t
USING (SELECT * FROM hadoop_prod.default.b) u ON t.id = u.id
WHEN MATCHED AND u.flag='b' THEN UPDATE SET t.count = t.count + u.count
WHEN MATCHED AND u.flag='a' THEN DELETE
WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count)
8. 更新数据
iceberg目前还不支持更新表数据:
0: jdbc:hive2://hadoop103:10000> update ice_spark_sample1 set data='jack123' where id=1;
+---------+
| Result |
+---------+
+---------+
No rows selected (4.112 seconds)
0: jdbc:hive2://hadoop103:10000> select * from ice_spark_sample1;
+-----+----------+
| id | data |
+-----+----------+
| 1 | jack123 |
| 2 | tom |
+-----+----------+
2 rows selected (0.303 seconds)
9. 查询表
9.1 查询表数据
select * from ice_spark_sample1 where id=2;
9.2 查询元数据
查询的表名需要按照catalog名.数据库名.表名的全名才能查询:
-- 查询表快照
SELECT * FROM hive_prod.default.ice_spark_sample1.snapshots;
-- 查询数据文件信息
SELECT * FROM hive_prod.default.ice_spark_sample1.files;
-- 查询表历史
SELECT * FROM hive_prod.default.ice_spark_sample1.history;
-- 查询 manifest
SELECT * FROM hive_prod.default.ice_spark_sample1.manifests;
比如查询ice_spark_sample1当前的数据和快照:
-- 字段比较多,选取了部分显示
0: jdbc:hive2://hadoop103:10000> select committed_at, snapshot_id, operation from hive_prod.default.ice_spark_sample1.snapshots;
+--------------------------+----------------------+------------+
| committed_at | snapshot_id | operation |
+--------------------------+----------------------+------------+
| 2025-02-19 10:30:08.555 | 5825989728064739405 | append |
| 2025-02-19 12:46:48.009 | 8230451385389770413 | append |
| 2025-02-19 15:08:29.046 | 7404723281141935099 | overwrite |
| 2025-02-19 16:00:35.952 | 4012613232180724766 | append |
+--------------------------+----------------------+------------+
10. 存储过程
iceberge的存储过程作用是用来管理我们表,比如快照管理、元数据管理、迁移表等。同上需要Spark配置参数项spark.sql.extensions
才能支持。
10.1 快照管理
- 回滚到指定的快照id
-- ice_spark_sample1最新版本是4012613232180724766,有4条数据。
CALL hive_prod.system.rollback_to_snapshot('default.ice_spark_sample1', 7404723281141935099);
查询当前快照的数据:
0: jdbc:hive2://hadoop103:10000> select * from ice_spark_sample1;
+-----+----------+
| id | data |
+-----+----------+
| 2 | tom |
| 1 | jack123 |
+-----+----------+
- 回滚到指定时间的快照
CALL hive_prod.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000');
- 设置表的当前快照ID
CALL hive_prod.system.set_current_snapshot('db.sample', 1);
- 从快照变为当前表状态
CALL hive_prod.system.cherrypick_snapshot('default.a', 7629160535368763452);
10.2 元数据管理
- 删除早于指定日期和时间的快照,但保留最近100个快照
-- 到期快照会被删除
CALL hive_prod.system.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100)
- 删除Iceberg表中任何元数据文件中没有引用的文件
-- 列出所有需要删除的候选文件
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', dry_run => true);
-- 删除指定目录中db.sample表不知道的任何文件
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data');
- 合并小文件
CALL catalog_name.system.rewrite_data_files('db.sample');
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"')
- 迁移表
-- 快照
CALL catalog_name.system.snapshot('db.sample', 'db.snap');
-- 迁移
CALL catalog_name.system.migrate('db.sample');
-- 添加数据文件
CALL spark_catalog.system.add_files(
table => 'db.tbl',
source_table => 'db.src_tbl',
partition_filter => map('part_col_1', 'A')
);
- 元数据信息
-- 获取指定快照的父快照id
CALL spark_catalog.system.ancestors_of('db.tbl');
-- 获取指定快照的所有祖先快照
CALL spark_catalog.system.ancestors_of('db.tbl', 1);