创建表时,默认的表类型是 Changelog 表。用户可以在该表中插入、更新或删除记录。
主键由一组包含每个记录唯一值的列组成。Paimon 通过在每个 Bucket 内对主键进行排序来强制执行数据排序,使用户能够通过在主键上应用过滤条件来实现高性能。
通过在 Changelog 表上定义主键,用户可以获得以下功能。
Bucket
Bucket 是读写的最小存储单元,每个 Bucket 目录包含一个 LSM 树。
Fixed Bucket
配置一个大于 0 的 Bucket 数量,对 Bucket 进行重新划分只能通过离线方式处理,参见Rescale Bucket,过多的 Bucket 会导致过多的小文件,而过少的 Bucket 会导致写入性能下降。
Dynamic Bucket
配置 'bucket' = '-1'
的话,Paimon 会动态维护索引,自动扩展 Bucket 的数量。
dynamic-bucket.target-row-num
:控制一个 Bucket 的目标行数。dynamic-bucket.assigner-parallelism
:分配器操作符的并行度,控制初始化 Bucket 的数量。
注:动态 Bucket 只支持单个写入作业。请不要启动多个作业同时写入同一个分区。
普通的动态 Bucket 模式
当你的更新操作不跨越分区(没有分区,或者主键包含所有分区字段)时,动态 Bucket 模式使用哈希索引来维护从键到 Bucket 的映射关系,它需要比固定 Bucket 模式更多的内存。
性能:
- 一般而言,性能不会有损失,但会有一些额外的内存消耗。每个分区中的 1 亿条记录会增加约 1 GB 的内存消耗,不再活跃的分区不会占用内存。
- 对于更新频率较低的表,建议使用此模式来显著提高性能。
普通的动态 Bucket 模式的排序压缩
普通动态 Bucket 模式支持排序压缩以加速查询。
跨分区 Upsert 动态 Bucket 模式
这是一个实验性的功能。
当需要进行跨分区的 Upsert 操作(主键不包含所有分区字段)时,动态 Bucket 模式直接维护键到分区和 Bucket 的映射关系,使用本地磁盘,并在启动流式写入作业时通过读取表中所有现有键来初始化索引。不同的合并引擎具有不同的行为:
- Deduplicate:从旧的分区删除数据,并将新数据插入到新的分区中。
- PartialUpdate & Aggregation:将新数据插入到旧的分区中。
- FirstRow:如果存在旧值,则忽略新数据。
性能:对于数据量较大的表,性能会有显著损失。此外,初始化过程需要很长时间。
如果 Upsert 操作不依赖于过旧的数据,可以考虑配置索引的 TTL 以减少索引和初始化时间:cross-partition-upsert.index-ttl
:RocksDB 索引和初始化的 TTL,这可以避免维护过多的索引并导致性能逐渐变差。但请注意,这也可能会导致数据重复。
Merge Engines
当 Paimon Sink 接收到两条或者两条以上相同主键的数据,它会将它们合并成一条保证主键唯一,通过指定表的 merge-engine 属性,用户可以选择如何将数据合并在一起。
注:在 Flink SQL TableConfig 中始终将 table.exec.sink.upsert-materialize
设置为 NONE,因为启用 sink upsert-materialize 可能会导致奇怪的行为。当输入数据无序时,建议使用序列字段来纠正无序性。
Deduplicate
Deduplicate 是默认的 merge engine。Paimon 只会保留最新的记录,并丢弃具有相同主键的其他记录。
具体而言,如果最新的记录是一个 DELETE 记录,那么所有具有相同主键的记录都将被删除。
Partial Update
通过指定 ‘merge-engine’ = ‘partial-update’,用户可以通过多次更新来更新记录的列,直到记录完整为止。这是通过逐个更新值字段,使用相同主键下的最新数据来实现的。但在这个过程中,空值不会被覆盖。
例如,假设 Paimon 收到三条记录:
<1, 23.0, 10, NULL>
<1, NULL, NULL, 'This is a book'>
<1, 25.2, NULL, NULL>假设第一列是主键,最终的结果将是 <1, 25.2, 10, 'This is a book'>。
对于流式查询,partial-update 合并引擎必须与查找(lookup)或完全合并(full-compaction)的 changelog producer 一起使用。(’input’ changelog producer 也支持,但只返回输入记录。)
默认情况下,部分更新不接受删除记录,可以选择以下解决方案之一:
- 配置 ‘partial-update.ignore-delete’ 以忽略删除记录。
- 配置 ‘sequence-group’ 来撤销部分列。
Sequence Group
一个序列字段可能无法解决具有多个流式更新的 partial-update 表的乱序问题,因为在多流更新期间,序列字段可能会被另一个流的最新数据覆盖。
因此,我们为 partial-update 表引入了序列组机制。它可以解决以下问题:
- 多流更新期间的乱序问题。每个流定义自己的序列组。
- 真正的 partial-update,而不仅仅是非空更新。
请参考以下示例:
1 | CREATE TABLE T ( |
对于字段 sequence-group,有效的比较数据类型包括:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP 和 TIMESTAMP_LTZ。
Default Value
如果无法保证数据的顺序,并且字段仅通过覆盖 null 进行写入,那么在读取表时,未被覆盖的字段将显示为 null。
1 | CREATE TABLE T ( |
如果期望在读取表时,未被覆盖的字段具有默认值而不是 null,则需要使用 ‘fields.name.default-value’。
1 | CREATE TABLE T ( |
Aggregation
注意:在 Flink SQL TableConfig 中,始终将 table.exec.sink.upsert-materialize 设置为 NONE。
有时用户只关心聚合结果。aggregation 合并引擎根据聚合函数,逐个将每个值字段与相同主键下的最新数据进行聚合。
非主键字段可以指定聚合函数,通过 fields.