Skip to content

与SparkSql集成

使用spark-sql环境进行操作,基于外置Hive保存数据到Hadoop上。

1. 环境准备

1.1 下载iceberg

访问https://repo1.maven.org/maven2/org/apache/iceberg/, 下载对应Spark版本的iceberg运行jar包:
Alt text

提示

目前最新版本为1.8.0,但是iceberg从1.7.0开始不再支持jdk1.8, iceberg1.6.1支持jdk1.8

1.2 拷贝iceberg的jar包到Spark的jars目录

sh
cp /opt/software/iceberg-spark-runtime-3.4_2.12-1.6.1.jar /opt/module/spark-3.4.2/jars

1.3 启动环境依赖

启动Hadoop集群和Hive的metastore服务

2. 配置SparkSql

Spark中支持两种Catalog的设置:hive和hadoop,Hive Catalog就是Iceberg表存储使用Hive默认的数据路径,Hadoop Catalog需要指定Iceberg格式表存储路径。默认使用Hive Catalog。

2.1 配置Hive Catalog

编辑spark-defaults.conf文件,添加额外自定义的Hive Catalog。

sh
[jack@hadoop103 spark-3.4.2]$ cd conf/
[jack@hadoop103 conf]$ cp spark-defaults.conf.template spark-defaults.conf
## 配置如下内容
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://hadoop103:9083

2.2 配置Hadoop Catalog

编辑 spark-defaults.conf文件

sh
[jack@hadoop103 conf]$ vim spark-defaults.conf
## 配置如下内容
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://hadoop102:8020/warehouse/spark-iceberg

3. 启动SparkSql

  1. 执行thriftserver的启动脚本:
sh
./sbin/start-thriftserver.sh
  1. 连接sparksql
sh
./bin/beeline -u jdbc:hive2://hadoop103:10000 -n jack

4. 创建表

建表支持以下语法特性:

  • PARTITIONED BY (partition-expressions) :配置分区
  • LOCATION '(fully-qualified-uri)' :指定表路径
  • COMMENT 'table documentation' :配置表备注
  • TBLPROPERTIES ('key'='value', ...) :配置表属性

对Iceberg表的每次更改都会生成一个新的元数据文件(json文件)以提供原子性。默认情况下,旧元数据文件作为历史文件保存不会删除。

4.1 创建内部表

sql
-- 
CREATE TABLE ice_spark_sample1 (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg
location '/spark/warehouse/ice_spark_sample1';

查看ice_spark_sample1表信息:

sh
0: jdbc:hive2://hadoop103:10000> show create table ice_spark_sample1;
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE hive_prod.default.ice_spark_sample1 (
  id BIGINT COMMENT 'unique id',
  data STRING)
USING iceberg
LOCATION 'hdfs://hadoop102:8020/spark/warehouse/ice_spark_sample1'
TBLPROPERTIES (
  'current-snapshot-id' = 'none',
  'format' = 'iceberg/parquet',
  'format-version' = '2',
  'write.parquet.compression-codec' = 'zstd')
 |
+----------------------------------------------------+
1 row selected (0.38 seconds)

查看HDFS上面记录的元数据json文件信息:
Alt text

4.2 创建外部表

sql
CREATE EXTERNAL TABLE ice_spark_sample2 (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg
location '/spark/warehouse/ice_spark_sample2';

查看ice_spark_sample2表信息:

sh
0: jdbc:hive2://hadoop103:10000> desc formatted ice_spark_sample2;
+-------------------------------+----------------------------------------------------+------------+
|           col_name            |                     data_type                      |  comment   |
+-------------------------------+----------------------------------------------------+------------+
| id                            | bigint                                             | unique id  |
| data                          | string                                             | NULL       |
|                               |                                                    |            |
| # Metadata Columns            |                                                    |            |
| _spec_id                      | int                                                |            |
| _partition                    | struct<>                                           |            |
| _file                         | string                                             |            |
| _pos                          | bigint                                             |            |
| _deleted                      | boolean                                            |            |
|                               |                                                    |            |
| # Detailed Table Information  |                                                    |            |
| Name                          | hive_prod.default.ice_spark_sample2                 |            |
| Type                          | EXTERNAL                                           |            |
| Location                      | hdfs://hadoop102:8020/spark/warehouse/ice_spark_sample2 |            |
| Provider                      | iceberg                                            |            |
| Owner                         | jack                                               |            |
| Table Properties              | [current-snapshot-id=none,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd] |            |
+-------------------------------+----------------------------------------------------+------------+
17 rows selected (0.329 seconds)

4.3 创建分区表

sql
CREATE external TABLE ice_spark_sample3 (
    id bigint,
    data string,
    category string)
USING iceberg
PARTITIONED BY (category)
location '/spark/warehouse/ice_spark_sample3';
insert into ice_spark_sample3 values (1, 'jack', '1'), (2, 'tuanzi', '2'), (3, 'yuanyuan', '1');

查看建表信息:

sh
0: jdbc:hive2://hadoop103:10000> show create table ice_spark_sample3;
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE hive_prod.default.ice_spark_sample3 (
  id BIGINT,
  data STRING,
  category STRING)
USING iceberg
PARTITIONED BY (category)
LOCATION 'hdfs://hadoop102:8020/spark/warehouse/ice_spark_sample3'
TBLPROPERTIES (
  'current-snapshot-id' = '3117539524054604329',
  'format' = 'iceberg/parquet',
  'format-version' = '2',
  'write.parquet.compression-codec' = 'zstd')
 |
+----------------------------------------------------+

查看HDFS分区信息:
Alt text

4.4 创建隐藏分区表

所谓隐藏分区表就是里面的分区是根据字段计算而出,不是按照某字段的值进行分区。

sql
CREATE external TABLE ice_spark_sample4 (
    id bigint,
    data string,
    category string,
    ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category)
location '/spark/warehouse/ice_spark_sample4';

支持的转换函数有:

  • years(ts):按年划分
  • months(ts):按月划分
  • days(ts)或date(ts):等效于dateint分区
  • hours(ts)或date_hour(ts):等效于dateint和hour分区
  • bucket(N, col):按哈希值划分mod N个桶
  • truncate(L, col):按截断为L的值划分

查看表字段信息:

sh
0: jdbc:hive2://hadoop103:10000> desc ice_spark_sample4;
+-----------------+-----------------+----------+
|    col_name     |    data_type    | comment  |
+-----------------+-----------------+----------+
| id              | bigint          | NULL     |
| data            | string          | NULL     |
| category        | string          | NULL     |
| ts              | timestamp       | NULL     |
|                 |                 |          |
| # Partitioning  |                 |          |
| Part 0          | bucket(16, id)  |          |
| Part 1          | days(ts)        |          |
| Part 2          | category        |          |
+-----------------+-----------------+----------+

4.5 CTAS建表

sh
0: jdbc:hive2://hadoop103:10000> CREATE external TABLE ice_spark_sample5
. . . . . . . . . . . . . . . .> USING iceberg
. . . . . . . . . . . . . . . .> location '/spark/warehouse/ice_spark_sample5'
. . . . . . . . . . . . . . . .> AS SELECT * from ice_spark_sample3;
+---------+
| Result  |
+---------+
+---------+
No rows selected (2.313 seconds)
0: jdbc:hive2://hadoop103:10000> select * from ice_spark_sample5;
+-----+-----------+-----------+
| id  |   data    | category  |
+-----+-----------+-----------+
| 1   | jack      | 1         |
| 3   | yuanyuan  | 1         |
| 2   | tuanzi    | 2         |
+-----+-----------+-----------+
3 rows selected (0.366 seconds)

查看表结构,可以发现CTAS建表方式会丢失分区信息:

sh
0: jdbc:hive2://hadoop103:10000> desc ice_spark_sample5;
+-----------+------------+----------+
| col_name  | data_type  | comment  |
+-----------+------------+----------+
| id        | bigint     | NULL     |
| data      | string     | NULL     |
| category  | string     | NULL     |
+-----------+------------+----------+
3 rows selected (1.578 seconds)

不指定分区就是无分区,需要重新指定分区、表属性:

sql
CREATE external TABLE ice_spark_sample6
USING iceberg
PARTITIONED BY (bucket(8, id), hours(ts), category)
TBLPROPERTIES ('key'='value')
location '/spark/warehouse/ice_spark_sample6'
AS SELECT * from ice_spark_sample4;

4.6 使用RTAS建表

sql
-- 若表不存在会报错,不能替换
REPLACE TABLE ice_spark_sample5
USING iceberg
PARTITIONED BY (category)
location '/spark/warehouse/ice_spark_sample5'
AS SELECT * from ice_spark_sample3;

-- 没有表就创建,有就替换方式建表
create or REPLACE TABLE ice_spark_sample8
USING iceberg
PARTITIONED BY (category)
TBLPROPERTIES ('key'='value')
AS SELECT * from ice_hive_sample3;

查看表ice_spark_sample5,发现已经变成了分区表:

sh
0: jdbc:hive2://hadoop103:10000> desc formatted ice_spark_sample5;
+-------------------------------+----------------------------------------------------+----------+
|           col_name            |                     data_type                      | comment  |
+-------------------------------+----------------------------------------------------+----------+
| id                            | bigint                                             | NULL     |
| data                          | string                                             | NULL     |
| category                      | string                                             | NULL     |
| # Partition Information       |                                                    |          |
| # col_name                    | data_type                                          | comment  |
| category                      | string                                             | NULL     |
|                               |                                                    |          |
| # Metadata Columns            |                                                    |          |
| _spec_id                      | int                                                |          |
| _partition                    | struct<category:string>                            |          |
| _file                         | string                                             |          |
| _pos                          | bigint                                             |          |
| _deleted                      | boolean                                            |          |
|                               |                                                    |          |
| # Detailed Table Information  |                                                    |          |
| Name                          | hive_prod.default.ice_spark_sample5                |          |
| Type                          | EXTERNAL                                           |          |
| Location                      | hdfs://hadoop102:8020/spark/warehouse/ice_spark_sample5 |          |
| Provider                      | iceberg                                            |          |
| Owner                         | jack                                               |          |
| Table Properties              | [current-snapshot-id=8445519115344500338,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd] |          |
+-------------------------------+----------------------------------------------------+----------+
21 rows selected (0.39 seconds)

5. 修改表

5.1 修改表名

sql
ALTER TABLE ice_spark_sample1 RENAME TO ice_spark_sample11;

5.2 修改表属性

sql
ALTER TABLE ice_spark_sample11 SET TBLPROPERTIES (
    'read.split.target-size'='268435456'
);
-- 删除表属性
ALTER TABLE ice_spark_sample11 UNSET TBLPROPERTIES ('read.split.target-size')

5.3 添加字段

sql
ALTER TABLE ice_spark_sample11
ADD COLUMNS (
    new_column int comment 'new_column docs'
);

查询表数据:

sh
0: jdbc:hive2://hadoop103:10000> select * from ice_spark_sample1;
+-----+-------+-------------+
| id  | data  | new_column  |
+-----+-------+-------------+
| 1   | jack  | NULL        |
+-----+-------+-------------+
1 row selected (0.3 seconds)

5.4 修改字段

iceberg允许修改字段类型,但要求修改的类型范围更大

sql
 alter table ice_spark_sample1 alter column new_column type bigint;

查看表字段信息:

sh
0: jdbc:hive2://hadoop103:10000> desc ice_spark_sample1;
+-------------+------------+------------------+
|  col_name   | data_type  |     comment      |
+-------------+------------+------------------+
| id          | bigint     | unique id        |
| data        | string     | NULL             |
| new_column  | bigint     | new_column docs  |
+-------------+------------+------------------+
3 rows selected (0.326 seconds)

5.5 删除字段

sh
ALTER TABLE ice_spark_sample1 DROP COLUMN new_column;

查看表字段信息:

sh
0: jdbc:hive2://hadoop103:10000> desc ice_spark_sample1;
+-----------+------------+------------+
| col_name  | data_type  |  comment   |
+-----------+------------+------------+
| id        | bigint     | unique id  |
| data      | string     | NULL       |
+-----------+------------+------------+
2 rows selected (0.045 seconds)

5.6 添加分区

Spark支持额外添加新的分区,需要提前配置:

sh
vim spark-default.conf
## 添加如下内容
spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

重启ThriftServer后,使用beeline再次连接。 将ice_spark_sample1添加分区, 由普通表变成分区表:

sql
ALTER TABLE ice_spark_sample1 ADD PARTITION FIELD bucket(16, id);

查看ice_spark_sample1表信息:

sh
0: jdbc:hive2://hadoop103:10000> desc ice_spark_sample1;
+-----------------+-----------------+------------+
|    col_name     |    data_type    |  comment   |
+-----------------+-----------------+------------+
| id              | bigint          | unique id  |
| data            | string          | NULL       |
|                 |                 |            |
| # Partitioning  |                 |            |
| Part 0          | bucket(16, id)  |            |
+-----------------+-----------------+------------+

5.7 删除分区

同上需要Spark配置参数项spark.sql.extensions才能支持:

sql
ALTER TABLE ice_spark_sample1 DROP PARTITION FIELD bucket(16, id);

删除分区字段是元数据操作,不会改变任何现有的表数据。新数据将被写入新的分区,但现有数据将保留在旧的分区布局中。 当分区发生变化时,动态分区覆盖行为也会发生变化。例如,如果按天划分分区,而改为按小时划分分区,那么将覆盖每小时划分的分区,而不再覆盖按天划分的分区。
删除分区字段时要小心,可能导致元数据查询失败或产生不同的结果。

5.8 并行分区写入

sql
ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION;

ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id

6. 删除表

6.1 删除HadoopCatalog表

对于HadoopCatalog而言,运行DROP TABLE将从catalog中删除表并删除表内容。

sql
CREATE EXTERNAL TABLE hadoop_prod.default.sample (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg
location '/warehouse/spark-iceberg/default/sample';

INSERT INTO hadoop_prod.default.sample values(1,'a');

Alt text 执行表删除:

sql
DROP TABLE hadoop_prod.default.sample;

删除后发现即使是外部表,包含数据全部被删除,使用时需要注意。 Alt text

6.2 删除HiveCatalog表

对于HiveCatalog而言:

  • 在0.14之前,运行DROP TABLE将从catalog中删除表并删除表内容。
  • 从0.14开始,DROP TABLE只会从catalog中删除表,不会删除数据。为了删除表内容,应该使用DROP table PURGE
    比如删除ice_spark_sample5, 删除之前发现里面有数据:
    Alt text
sh
0: jdbc:hive2://hadoop103:10000> drop table ice_spark_sample5;
+---------+
| Result  |
+---------+
+---------+
No rows selected (1.172 seconds)

执行完毕后,查看HDFS上面数据还在:
Alt text

7. 插入数据

7.1 插入分区表

sh
0: jdbc:hive2://hadoop103:10000> insert into ice_spark_sample1 values(1, 'jack');
+---------+
| Result  |
+---------+
+---------+
No rows selected (5.012 seconds)

查看查看HDFS上的信息: Alt text

7.2 插入隐藏分区表

sh
0: jdbc:hive2://hadoop103:10000> insert into ice_spark_sample4 values(3, 'bobo','3',timestamp'2025-02-19 14:51:20'),(4, 'pipia','3', timestamp'2025-01-18 20:11:06');
+---------+
| Result  |
+---------+
+---------+
No rows selected (4.27 seconds)
0: jdbc:hive2://hadoop103:10000> select * from ice_spark_sample4;
+-----+--------+-----------+------------------------+
| id  |  data  | category  |           ts           |
+-----+--------+-----------+------------------------+
| 3   | bobo   | 3         | 2025-02-19 14:51:20.0  |
| 4   | pipia  | 3         | 2025-01-18 20:11:06.0  |
| 2   | test   | 3         | 2025-02-18 00:00:00.0  |
| 1   | demo   | 2         | 2025-02-19 00:00:00.0  |
+-----+--------+-----------+------------------------+

7.3 MERGE INTO行级更新

匹配规则比较灵活比较好用

sh
MERGE INTO hadoop_prod.default.a t 
USING (SELECT * FROM hadoop_prod.default.b) u ON t.id = u.id
WHEN MATCHED AND u.flag='b' THEN UPDATE SET t.count = t.count + u.count
WHEN MATCHED AND u.flag='a' THEN DELETE
WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count)

8. 更新数据

iceberg目前还不支持更新表数据:

sh
0: jdbc:hive2://hadoop103:10000> update ice_spark_sample1 set data='jack123' where id=1;
+---------+
| Result  |
+---------+
+---------+
No rows selected (4.112 seconds)
0: jdbc:hive2://hadoop103:10000> select * from ice_spark_sample1;
+-----+----------+
| id  |   data   |
+-----+----------+
| 1   | jack123  |
| 2   | tom      |
+-----+----------+
2 rows selected (0.303 seconds)

9. 查询表

9.1 查询表数据

sql
select * from ice_spark_sample1 where id=2;

9.2 查询元数据

查询的表名需要按照catalog名.数据库名.表名的全名才能查询:

sql
-- 查询表快照
SELECT * FROM hive_prod.default.ice_spark_sample1.snapshots;
-- 查询数据文件信息
SELECT * FROM hive_prod.default.ice_spark_sample1.files;
-- 查询表历史
SELECT * FROM hive_prod.default.ice_spark_sample1.history;
-- 查询 manifest
SELECT * FROM hive_prod.default.ice_spark_sample1.manifests;

比如查询ice_spark_sample1当前的数据和快照:

sh
-- 字段比较多,选取了部分显示
0: jdbc:hive2://hadoop103:10000> select committed_at, snapshot_id, operation from hive_prod.default.ice_spark_sample1.snapshots;
+--------------------------+----------------------+------------+
|       committed_at       |     snapshot_id      | operation  |
+--------------------------+----------------------+------------+
| 2025-02-19 10:30:08.555  | 5825989728064739405  | append     |
| 2025-02-19 12:46:48.009  | 8230451385389770413  | append     |
| 2025-02-19 15:08:29.046  | 7404723281141935099  | overwrite  |
| 2025-02-19 16:00:35.952  | 4012613232180724766  | append     |
+--------------------------+----------------------+------------+

10. 存储过程

iceberge的存储过程作用是用来管理我们表,比如快照管理、元数据管理、迁移表等。同上需要Spark配置参数项spark.sql.extensions才能支持。

10.1 快照管理

  1. 回滚到指定的快照id
sql
-- ice_spark_sample1最新版本是4012613232180724766,有4条数据。
CALL hive_prod.system.rollback_to_snapshot('default.ice_spark_sample1', 7404723281141935099);

查询当前快照的数据:

sh
0: jdbc:hive2://hadoop103:10000> select * from ice_spark_sample1;
+-----+----------+
| id  |   data   |
+-----+----------+
| 2   | tom      |
| 1   | jack123  |
+-----+----------+
  1. 回滚到指定时间的快照
sql
CALL hive_prod.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000');
  1. 设置表的当前快照ID
sql
CALL hive_prod.system.set_current_snapshot('db.sample', 1);
  1. 从快照变为当前表状态
sql
CALL hive_prod.system.cherrypick_snapshot('default.a', 7629160535368763452);

10.2 元数据管理

  1. 删除早于指定日期和时间的快照,但保留最近100个快照
sql
-- 到期快照会被删除
CALL hive_prod.system.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100)
  1. 删除Iceberg表中任何元数据文件中没有引用的文件
sql
-- 列出所有需要删除的候选文件
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', dry_run => true);

-- 删除指定目录中db.sample表不知道的任何文件
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data');
  1. 合并小文件
sql
CALL catalog_name.system.rewrite_data_files('db.sample');
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"')
  1. 迁移表
sql
-- 快照
CALL catalog_name.system.snapshot('db.sample', 'db.snap');
-- 迁移
CALL catalog_name.system.migrate('db.sample');
-- 添加数据文件
CALL spark_catalog.system.add_files(
table => 'db.tbl',
source_table => 'db.src_tbl',
partition_filter => map('part_col_1', 'A')
);
  1. 元数据信息
sql
-- 获取指定快照的父快照id
CALL spark_catalog.system.ancestors_of('db.tbl');
-- 获取指定快照的所有祖先快照
CALL spark_catalog.system.ancestors_of('db.tbl', 1);