深入理解Paimon
1. 插入数据场景
-- 创建表
CREATE TABLE t_demo (
id BIGINT,
a INT,
b STRING,
dt STRING COMMENT 'timestamp string in format yyyyMMdd',
PRIMARY KEY(id, dt) NOT ENFORCED
) PARTITIONED BY (dt);
-- 插入数据
INSERT INTO t_demo VALUES (1, 10001, 'varchar00001', '20230501');一旦插入SQL执行就会转换成Flink作业,作业完成后,记录就会通过成功提交写入Paimon表中。而数据提交的保存在HDFS上面的路径/paimon/fs/internal/warehouse/default.db/t_demo/snapshot/snapshot-1的快照中,snapshot-1 处生成的文件布局如下所述:
其中最开始只有base文件,base文件中是空的,delta相关文件记录实际的快照信息。
snapshot-1的内容包含快照的元数据,包括清单列表(manifest list)和schemaId:
{
"version" : 3,
"id" : 1,
"schemaId" : 0,
"baseManifestList" : "manifest-list-3ba61be6-af58-42a2-9c3c-f0bd9c036788-0",
"baseManifestListSize" : 884,
"deltaManifestList" : "manifest-list-3ba61be6-af58-42a2-9c3c-f0bd9c036788-1",
"deltaManifestListSize" : 1005,
"changelogManifestList" : null,
"indexManifest" : "index-manifest-ca5ecc78-0266-4767-8905-be0ed5633395-0",
"commitUser" : "fc568f6c-96b5-469d-8985-5dba001a1c9d",
"commitIdentifier" : 9223372036854775807,
"commitKind" : "APPEND",
"timeMillis" : 1759275029815,
"logOffsets" : { },
"totalRecordCount" : 1,
"deltaRecordCount" : 1,
"changelogRecordCount" : 0,
"watermark" : -9223372036854775808
}清单列表包含快照的所有更改,baseManifestList是应用deltaManifestList中的更改的基础文件。第一次提交将生成1个清单文件(manifest file),并创建2个清单列表(manifest list):
--deltaManifestList:包含对数据文件执行操作的清单条目列表(上图中的 manifest-list-1-delta)
manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1
--baseManifestList:空的(上图中的 manifest-list-1-base)
manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0
--清单文件:存储快照中数据文件的信息(上图中的manifest-1-0)
manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0跨不同分区插入一批记录:
INSERT INTO t_demo VALUES
(2, 10002, 'varchar00002', '20230502'),
(3, 10003, 'varchar00003', '20230503'),
(4, 10004, 'varchar00004', '20230504'),
(5, 10005, 'varchar00005', '20230505'),
(6, 10006, 'varchar00006', '20230506'),
(7, 10007, 'varchar00007', '20230507'),
(8, 10008, 'varchar00008', '20230508'),
(9, 10009, 'varchar00009', '20230509'),
(10, 10010, 'varchar00010', '20230510');第二次提交任务发生,创建一个新快照,即snapshot-2,如图所示:
在snapshot目录中:
此时的布局如下所示: 
2. 删除数据场景
执行如下删除:
SET 'execution.runtime-mode' = 'batch';
DELETE FROM t_demo WHERE dt >= '20230503';第三次任务提交,Paimon为我们提供了snapshot-3。会发现对应的分区没有被删除。相反,会为分区20230503~20230510分别创建一个新的数据文件:
截至snapshot-3的新文件布局如下所示:
manifest-3-0包含8个ADD操作类型的manifest条目,对应8个新写入的数据文件。
3. 合并文件场景
小文件的数量会随着连续快照的增加而增加,这可能会导致读取性能下降。因此,需要进行full compaction以减少小文件的数量。手动触发full-compaction:
bin/flink run lib/paimon-flink-action-1.2.0.jar compact \
--warehouse hdfs://node01:8020/paimon/fs/internal/warehouse \
--database default \
--table t_demo
-D execution.runtime-mode=batch或者使用flinksql:
CALL sys.compact(
`table` => 'default.t_demo'
);所有当前表文件将被压缩,并创建一个新快照,即snapshot-4,包含以下信息:
{
"version" : 3,
"id" : 4,
"schemaId" : 0,
"baseManifestList" : "manifest-list-808501ef-48f0-4771-80e5-3827b08f74c5-0",
"baseManifestListSize" : 1071,
"deltaManifestList" : "manifest-list-808501ef-48f0-4771-80e5-3827b08f74c5-1",
"deltaManifestListSize" : 1008,
"changelogManifestList" : null,
"indexManifest" : "index-manifest-05c7979f-b1e1-4f67-adf2-d5ec7260958f-0",
"commitUser" : "12a947fa-9191-4b91-b009-2b6c7f13dbdf",
"commitIdentifier" : 9223372036854775807,
"commitKind" : "COMPACT",
"timeMillis" : 1759306461592,
"logOffsets" : { },
"totalRecordCount" : 2,
"deltaRecordCount" : -16,
"changelogRecordCount" : 0,
"watermark" : -9223372036854775808
}截至snapshot-4的新文件布局如下所示:
可见依旧还是schema-0,base来到了版本4,base4包含了之前的mainfest1~3,而delta指向的是清单mainfest4, mainfest4干了的事情就是发出消息:删除之前标记为过期的数据。
manifest-4-0包含20个清单条目(18个DELETE操作和2个ADD操作):
- 对于分区20230503到20230510,对两个数据文件进行两次DELETE操作。
- 对于分区20230501到20230502,对同一个数据文件进行1次DELETE操作和1次ADD操作。
4. 修改表场景
执行以下语句来添加一个字段:
alter table t_demo add (desc string);它将为Paimon表创建一个新schema,即schema-1,但在下一次提交之前还没有快照实际使用该schema。 
5. 过期快照场景
快照过期的过程中,首先确定快照的范围,然后将这些快照内的数据文件标记为删除。此标记可确保该文件不会被后续快照使用并可以安全删除。假设上图中的所有4个快照都即将过期。过期流程如下:
- 它首先删除所有标记的数据文件,并记录任何更改的存储桶。
- 然后它会删除所有更改日志文件和关联的清单。
- 最后,它删除快照本身并写入最早的提示文件。
假设创建了另一个快照snapshot-5并触发了快照过期。snapshot-1到snapshot-4被删除。快照过期后的最终布局如下所示:
最后分区20230503至20230510被物理删除。为何已经修改了表,图中正在被使用的schema还是schema-0而不是schema-1,因为过期快照操作和schema无关,不会触发提交操作,而insert,update等操作会触发提交操作。
5. Flink流式写入
CDC摄取工作流程以及所涉及的每个组件所扮演的独特角色: 
- MySQL CDC Source统一读取快照和增量数据,分别由SnapshotReader读取快照数据和BinlogReader读取增量数据。
- Paimon Sink将数据写入桶级别的Paimon表中。其中的CompactManager将异步触发Compaction。
- Committer Operator 是一个单例,负责提交和过期快照。
5.1 MySQL Cdc Source
MySQL Cdc Source读取快照和增量数据,并在规范化后将它们发送到下游:
Paimon Sink首先将新记录缓冲在基于堆的LSM树中,并在内存缓冲区满时将它们刷新到磁盘。请注意,写入的每个数据文件都是Sorted Run。此时,还没有创建清单文件和快照。在Flink检查点发生之前,Paimon Sink将刷新所有缓冲记录并向下游发送可提交消息,该消息在检查点期间由Committer Operator 读取并提交:
在检查点期间,Committer Operator将创建一个新快照并将其与清单列表关联起来,以便该快照包含有关表中所有数据文件的信息:
稍后可能会发生异步Compaction,CompactManager 生成的提交表包含有关先前文件和合并文件的信息,以便Committer Operator可以构造相应的清单条目。在这种情况下,Committer Operator可能会在Flink检查点期间生成两个快照:
- 一个用于写入数据(Append类型的快照),
- 另一个用于compact(Compact 类型的快照)。
如果在检查点间隔期间没有写入数据文件,则只会创建Compact类型的快照。 Committer Operator将检查快照是否过期并执行标记数据文件的物理删除。
