publicstaticfinal ConfigOption<Boolean> TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM = ConfigOptions.key("table.exec.iceberg.infer-source-parallelism") .booleanType() .defaultValue(true) .withDescription("If is false, parallelism of source are set by config.\n" + "If is true, source parallelism is inferred according to splits number.\n");
publicstaticfinal ConfigOption<Integer> TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX = ConfigOptions.key("table.exec.iceberg.infer-source-parallelism.max") .intType() .defaultValue(100) .withDescription("Sets max infer parallelism for source operator.");
public DataStream<RowData> build(){ Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); FlinkInputFormat format = buildFormat();
// Empty table, infer parallelism should be at least 1 int parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, scanContext); Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
// Make sure to generate 2 CombinedScanTasks long maxFileLen = Math.max(dataFile1.fileSizeInBytes(), dataFile2.fileSizeInBytes()); sql("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", maxFileLen);
// 2 splits (max infer is the default value 100 , max > splits num), the parallelism is splits num : 2 parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, scanContext); Assert.assertEquals("Should produce the expected parallelism.", 2, parallelism);
// 2 splits and limit is 1 , max infer parallelism is default 100, // which is greater than splits num and limit, the parallelism is the limit value : 1 parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, ScanContext.builder().limit(1).build()); Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
// 2 splits and max infer parallelism is 1 (max < splits num), the parallelism is 1 Configuration configuration = new Configuration(); configuration.setInteger(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX, 1); parallelism = FlinkSource.forRowData() .flinkConf(configuration) .inferParallelism(flinkInputFormat, ScanContext.builder().build()); Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
// 2 splits, max infer parallelism is 1, limit is 3, the parallelism is max infer parallelism : 1 parallelism = FlinkSource.forRowData() .flinkConf(configuration) .inferParallelism(flinkInputFormat, ScanContext.builder().limit(3).build()); Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
// 2 splits, infer parallelism is disabled, the parallelism is flink default parallelism 1 configuration.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false); parallelism = FlinkSource.forRowData() .flinkConf(configuration) .inferParallelism(flinkInputFormat, ScanContext.builder().limit(3).build()); Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism); }