Flink Iceberg Source 并行度推断源码解析

批读 Iceberg

Iceberg 提供了两个配置:

1
2
3
4
5
6
7
8
9
10
11
12
public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
ConfigOptions.key("table.exec.iceberg.infer-source-parallelism")
.booleanType()
.defaultValue(true)
.withDescription("If is false, parallelism of source are set by config.\n" +
"If is true, source parallelism is inferred according to splits number.\n");

public static final ConfigOption<Integer> TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX =
ConfigOptions.key("table.exec.iceberg.infer-source-parallelism.max")
.intType()
.defaultValue(100)
.withDescription("Sets max infer parallelism for source operator.");
  • table.exec.iceberg.infer-source-parallelism:默认是 true,意味着 source 的并行度是根据推断来配置的,如果配置的 false 的话,那么并行度的配置是以配置的为准。
  • table.exec.iceberg.infer-source-parallelism.max: 默认是 100,source 算子的最大并行度。

这两个参数只在 FileSource 的 inferParallelism 方法中调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int inferParallelism(FlinkInputFormat format, ScanContext context) {
// 读取 table.exec.resource.default-parallelism 配置,默认值为 -1
int parallelism = readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
// 读取 table.exec.iceberg.infer-source-parallelism 配置,默认是 true
if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
// 读取 table.exec.iceberg.infer-source-parallelism.max 配置,默认是 100
int maxInferParallelism = readableConfig.get(FlinkConfigOptions
.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
Preconditions.checkState(
maxInferParallelism >= 1,
FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
//获取表的 splitNum
int splitNum;
try {
FlinkInputSplit[] splits = format.createInputSplits(0);
splitNum = splits.length;
} catch (IOException e) {
throw new UncheckedIOException("Failed to create iceberg input splits for table: " + table, e);
}

parallelism = Math.min(splitNum, maxInferParallelism);
}

if (context.limit() > 0) {
int limit = context.limit() >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) context.limit();
parallelism = Math.min(parallelism, limit);
}

// parallelism must be positive.
parallelism = Math.max(1, parallelism);
return parallelism;
}

在下面代码获取到表的 splitNum

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
FlinkInputSplit[] splits = format.createInputSplits(0);
splitNum = splits.length;

public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
// Called in Job manager, so it is OK to load table from catalog.
// 加载 catalog
tableLoader.open();
final ExecutorService workerPool = ThreadPools.newWorkerPool("iceberg-plan-worker-pool", context.planParallelism());
try (TableLoader loader = tableLoader) {
// 加载表
Table table = loader.loadTable();
// 调用
return FlinkSplitPlanner.planInputSplits(table, context, workerPool);
} finally {
workerPool.shutdown();
}
}

static FlinkInputSplit[] planInputSplits(Table table, ScanContext context, ExecutorService workerPool) {
// 主要通过 planTasks 方法
try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context, workerPool)) {
List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
boolean exposeLocality = context.exposeLocality();

Tasks.range(tasks.size())
.stopOnFailure()
.executeWith(exposeLocality ? workerPool : null)
.run(index -> {
CombinedScanTask task = tasks.get(index);
String[] hostnames = null;
if (exposeLocality) {
hostnames = Util.blockLocations(table.io(), task);
}
splits[index] = new FlinkInputSplit(index, task, hostnames);
});
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to process tasks iterable", e);
}
}

planTasks 方法中主要靠 scan.planTasks(),代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public CloseableIterable<CombinedScanTask> planTasks() {
CloseableIterable<FileScanTask> fileScanTasks = planFiles();
CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles(fileScanTasks, targetSplitSize());
return TableScanUtil.planTasks(splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost());
}

// 获取文件情况
public CloseableIterable<FileScanTask> planFiles() {
Snapshot snapshot = snapshot();
if (snapshot != null) {
LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table,
snapshot.snapshotId(), DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()),
context.rowFilter());

Listeners.notifyAll(
new ScanEvent(table.name(), snapshot.snapshotId(), context.rowFilter(), schema()));

return planFiles(ops, snapshot,
context.rowFilter(), context.ignoreResiduals(), context.caseSensitive(), context.returnColumnStats());

} else {
LOG.info("Scanning empty table {}", table);
return CloseableIterable.empty();
}
}

// split
public static CloseableIterable<FileScanTask> splitFiles(CloseableIterable<FileScanTask> tasks, long splitSize) {
Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize);

Iterable<FileScanTask> splitTasks = FluentIterable
.from(tasks)
.transformAndConcat(input -> input.split(splitSize));
// Capture manifests which can be closed after scan planning
return CloseableIterable.combine(splitTasks, tasks);
}

public static CloseableIterable<CombinedScanTask> planTasks(CloseableIterable<FileScanTask> splitFiles,
long splitSize, int lookback, long openFileCost) {
Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize);
Preconditions.checkArgument(lookback > 0, "Invalid split planning lookback (negative or 0): %s", lookback);
Preconditions.checkArgument(openFileCost >= 0, "Invalid file open cost (negative): %s", openFileCost);

// Check the size of delete file as well to avoid unbalanced bin-packing
Function<FileScanTask, Long> weightFunc = file -> Math.max(
file.length() + file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(),
(1 + file.deletes().size()) * openFileCost);

return CloseableIterable.transform(
CloseableIterable.combine(
new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, weightFunc, true),
splitFiles),
BaseCombinedScanTask::new);
}

推断到表文件 split 的数量后,那么接下来是决定并行度大小的时候了:

1
parallelism = Math.min(splitNum, maxInferParallelism);

取配置的最大并行度和 split 数量的最小值,eg:设置的 source 最大并行度为 50,但是根据表文件划分出来的 split 数量为 40,那么 source 的并行度为 40。

如果没有配置 table.exec.iceberg.infer-source-parallelism 为 true 的话,那么就以 table.exec.resource.default-parallelism 的并行度为准(默认值是 -1)。

继续分析接下来的代码:

1
2
3
4
if (context.limit() > 0) {
int limit = context.limit() >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) context.limit();
parallelism = Math.min(parallelism, limit);
}

limit 像是 SQL 查询语句里面的 limit 的值,如果配置了 limit,那么也会参与并行度配置的计算的,eg:如果 limti 为 1,那么 parallelism 取前面 parallelism 的值与 1 两者的最小值。

1
2
3
// parallelism must be positive.
parallelism = Math.max(1, parallelism);
return parallelism;

最后代码保证并行度的最小值为 1。

来看下 inferParallelism 方法的调用情况,只在 FileSource 类的 build() 方法调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public DataStream<RowData> build() {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
FlinkInputFormat format = buildFormat();

ScanContext context = contextBuilder.build();
TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));

if (!context.isStreaming()) {
// 只在 批 模式下生效
int parallelism = inferParallelism(format, context);
return env.createInput(format, typeInfo).setParallelism(parallelism);
} else {
StreamingMonitorFunction function = new StreamingMonitorFunction(tableLoader, context);

String monitorFunctionName = String.format("Iceberg table (%s) monitor", table);
String readerOperatorName = String.format("Iceberg table (%s) reader", table);

return env.addSource(function, monitorFunctionName)
.transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));
}
}

可以看 TestFlinkScanSql 测试类的 testInferedParallelism 方法进行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Test
public void testInferedParallelism() throws IOException {
Table table = catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, TestFixtures.SPEC);

TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
FlinkInputFormat flinkInputFormat = FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat();
ScanContext scanContext = ScanContext.builder().build();

// Empty table, infer parallelism should be at least 1
int parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, scanContext);
Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);

GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0),
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L));
DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0),
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L));
helper.appendToTable(dataFile1, dataFile2);

// Make sure to generate 2 CombinedScanTasks
long maxFileLen = Math.max(dataFile1.fileSizeInBytes(), dataFile2.fileSizeInBytes());
sql("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", maxFileLen);

// 2 splits (max infer is the default value 100 , max > splits num), the parallelism is splits num : 2
parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, scanContext);
Assert.assertEquals("Should produce the expected parallelism.", 2, parallelism);

// 2 splits and limit is 1 , max infer parallelism is default 100,
// which is greater than splits num and limit, the parallelism is the limit value : 1
parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, ScanContext.builder().limit(1).build());
Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);

// 2 splits and max infer parallelism is 1 (max < splits num), the parallelism is 1
Configuration configuration = new Configuration();
configuration.setInteger(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX, 1);
parallelism = FlinkSource.forRowData()
.flinkConf(configuration)
.inferParallelism(flinkInputFormat, ScanContext.builder().build());
Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);

// 2 splits, max infer parallelism is 1, limit is 3, the parallelism is max infer parallelism : 1
parallelism = FlinkSource.forRowData()
.flinkConf(configuration)
.inferParallelism(flinkInputFormat, ScanContext.builder().limit(3).build());
Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);

// 2 splits, infer parallelism is disabled, the parallelism is flink default parallelism 1
configuration.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
parallelism = FlinkSource.forRowData()
.flinkConf(configuration)
.inferParallelism(flinkInputFormat, ScanContext.builder().limit(3).build());
Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
}

注意⚠️:该代码模块是在 Iceberg 项目的 flink module 下

流读 Iceberg

并没有针对流读配置 Source 并行度

加入知识星球可以看到上面文章:https://t.zsxq.com/6YjyJYR

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 批读 Iceberg
  2. 2. 流读 Iceberg