《Flink 实战与性能优化》—— Flink Table & SQL 概念与通用 API

第五章 —— Table API & SQL

Flink 中除了 DataStream 和 DataSet API,还有比较高级的 Table API & SQL,它可以帮助我们简化开发的过程,能够快读的运用 Flink 去完成一些需求,本章将对 Flink Table API & SQL 进行讲解,并将与其他的 API 结合对比分析。

前面的内容都是讲解 DataStream 和 DataSet API 相关的,在 1.2.5 节中讲解 Flink API 时提及到 Flink 的高级 API —— Table API & SQL,本节将开始 Table & SQL 之旅。

在 Flink 1.9 版本中,合进了阿里巴巴开源的 Blink 版本中的大量代码,其中最重要的贡献就是 Blink SQL 了。在 Blink 捐献给 Apache Flink 之后,社区就致力于为 Table API & SQL 集成 Blink 的查询优化器和 runtime。先来看下 1.8 版本的 Flink Table 项目结构如下图所示。

1.9 版本的 Flink Table 项目结构图如下图所示:

可以发现新增了 flink-sql-parserflink-table-planner-blinkflink-table-runtime-blinkflink-table-uber-blink 模块,对 Flink Table 模块的重构详细内容可以参考 FLIP-32。这样对于 Java 和 Scala API 模块、优化器以及 runtime 模块来说,分层更清楚,接口更明确。

另外 flink-table-planner-blink 模块中实现了新的优化器接口,所以现在有两个插件化的查询处理器来执行 Table API & SQL:1.9 以前的 Flink 处理器和新的基于 Blink 的处理器。基于 Blink 的查询处理器提供了更好的 SQL 覆盖率、支持更广泛的查询优化、改进了代码生成机制、通过调优算子的实现来提升批处理查询的性能。除此之外,基于 Blink 的查询处理器还提供了更强大的流处理能力,包括了社区一些非常期待的新功能(如维表 Join、TopN、去重)和聚合场景缓解数据倾斜的优化,以及内置更多常用的函数,具体可以查看 flink-table-runtime-blink 代码。目前整个模块的结构如下图所示:

注意:两个查询处理器之间的语义和功能大部分是一致的,但未完全对齐,因为基于 Blink 的查询处理器还在优化中,所以在 1.9 版本中默认查询处理器还是 1.9 之前的版本。如果你想使用 Blink 处理器的话,可以在创建 TableEnvironment 时通过 EnvironmentSettings 配置启用。被选择的处理器必须要在正在执行的 Java 进程的类路径中。对于集群设置,默认两个查询处理器都会自动地加载到类路径中。如果要在 IDE 中运行一个查询,需要在项目中添加 planner 依赖。

5.1.2 为什么选择 Table API & SQL?

在 1.2 节中介绍了 Flink 的 API 是包含了 Table API & SQL,在 1.3 节中也介绍了在 Flink 1.9 中阿里开源的 Blink 分支中的很强大的 SQL 功能合并进 Flink 主分支,另外通过阿里 Blink 相关的介绍,可以知道阿里在 SQL 功能这块是做了很多的工作。从前面章节的内容可以发现 Flink 的 DataStream/DataSet API 的功能已经很全并且很强大了,常见复杂的数据处理问题也都可以处理,那么社区为啥还在一直推广 Table API & SQL 呢?

其实通过观察其它的大数据组件,就不会好奇了,比如 Spark、Storm、Beam、Hive 、KSQL(面向 Kafka 的 SQL 引擎)、Elasticsearch、Phoenix(使用 SQL 进行 HBase 数据的查询)等,可以发现 SQL 已经成为各个大数据组件必不可少的数据查询语言,那么 Flink 作为一个大数据实时处理引擎,笔者对其支持 SQL 查询流数据也不足为奇了,但是还是来稍微介绍一下 Table API & SQL。

Table API & SQL 是一种关系型 API,用户可以像操作数据库一样直接操作流数据,而不再需要通过 DataStream API 来写很多代码完成计算需求,更不用手动去调优你写的代码,另外 SQL 最大的优势在于它是一门学习成本很低的语言,普及率很高,用户基数大,和其他的编程语言相比,它的入门相对简单。

除了上面的原因,还有一个原因是:可以借助 Table API & SQL 统一流处理和批处理,因为在 DataStream/DataSet API 中,用户开发流作业和批作业需要去了解两种不同的 API,这对于公司有些开发能力不高的数据分析师来说,学习成本有点高,他们其实更擅长写 SQL 来分析。Table API & SQL 做到了批与流上的查询具有同样的语法语义,因此不用改代码就能同时在批和流上执行。

总结来说,为什么选择 Table API & SQL:

  • 声明式语言表达业务逻辑
  • 无需代码编程 —— 易于上手
  • 查询能够被有效的优化
  • 查询可以高效的执行

在上文中提及到 Flink Table 在 1.8 和 1.9 的区别,这里还是要再讲解一下这几个依赖,因为只有了解清楚了之后,我们在后面开发的时候才能够清楚挑选哪种依赖。它有如下几个模块:

  • flink-table-common:table 中的公共模块,可以用于通过自定义 function,format 等来扩展 Table 生态系统
  • flink-table-api-java:支持使用 Java 语言,纯 Table&SQL API
  • flink-table-api-scala:支持使用 Scala 语言,纯 Table&SQL API
  • flink-table-api-java-bridge:支持使用 Java 语言,包含 DataStream/DataSet API 的 Table&SQL API(推荐使用)
  • flink-table-api-scala-bridge:支持使用 Scala 语言,带有 DataStream/DataSet API 的 Table&SQL API(推荐使用)
  • flink-sql-parser:SQL 语句解析层,主要依赖 calcite
  • flink-table-planner:Table 程序的 planner 和 runtime
  • flink-table-uber:将上诉模块打成一个 fat jar,在 lib 目录下
  • flink-table-planner-blink:Blink 的 Table 程序的 planner(阿里开源的版本)
  • flink-table-runtime-blink:Blink 的 Table 程序的 runtime(阿里开源的版本)
  • flink-table-uber-blink:将 Blink 版本的 planner 和 runtime 与前面模块(除 flink-table-planner 模块)打成一个 fat jar,在 lib 目录下,如下图所示。

  • flink-sql-client:SQL 客户端

5.1.4 两种 planner 之间的区别

上面讲了两种不同的 planner 之间包含的模块有点区别,但是具体有什么区别如下所示:

  • Blink planner 将批处理作业视为流的一种特殊情况。因此不支持 Table 和 DataSet 之间的转换,批处理作业会转换成 DataStream 程序,而不会转换成 DataSet 程序,流作业还是转换成 DataStream 程序。
  • Blink planner 不支持 BatchTableSource,而是使用有界的(bounded) StreamTableSource 代替它。
  • Blink planner 仅支持全新的 Catalog,不支持已经废弃的 ExternalCatalog。
  • 以前的 planner 中 FilterableTableSource 的实现与现在的 Blink planner 有冲突,在以前的 planner 中是叠加 PlannerExpressions(在未来的版本中会移除),而在 Blink planner 中是 Expressions。
  • 基于字符串的 KV 键值配置选项仅可以在 Blink planner 中使用。
  • PlannerConfig 的实现(CalciteConfig)在两种 planner 中不同。
  • Blink planner 会将多个 sink 优化在同一个 DAG 中(只在 TableEnvironment 中支持,StreamTableEnvironment 中不支持),而以前的 planner 是每个 sink 都有一个 DAG 中,相互独立的。
  • 以前的 planner 不支持 catalog 统计,而 Blink planner 支持。

在了解到了两种 planner 的区别后,接下来开始 Flink Table API & SQL 之旅。

5.1.5 添加项目依赖

因为在 Flink 1.9 版本中有两个 planner,所以得根据你使用的 planner 来选择对应的依赖,假设你选择的是最新的 Blink 版本,那么添加下面的依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

如果是以前的 planner,则使用下面这个依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

如果要自定义 format 格式或者自定义 function,则需要添加 flink-table-common 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>

5.1.6 创建一个 TableEnvironment

TableEnvironment 是 Table API 和 SQL 的统称,它负责的内容有:

  • 在内部的 catalog 注册 Table
  • 注册一个外部的 catalog
  • 执行 SQL 查询
  • 注册用户自定义的 function
  • 将 DataStream 或者 DataSet 转换成 Table
  • 保持对 ExecutionEnvironment 和 StreamExecutionEnvironment 的引用

Table 总是会绑定在一个指定的 TableEnvironment,不能在同一个查询中组合不同 TableEnvironment 的 Table,比如 join 或 union 操作。你可以使用下面的几种静态方法创建 TableEnvironment。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//创建 StreamTableEnvironment
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, EnvironmentSettings.newInstance().build());
}

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
return StreamTableEnvironmentImpl.create(executionEnvironment, settings, new TableConfig());
}

/** @deprecated */
@Deprecated
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
return StreamTableEnvironmentImpl.create(executionEnvironment, EnvironmentSettings.newInstance().build(), tableConfig);
}

//创建 BatchTableEnvironment
static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, new TableConfig());
}

static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
//
}

你需要根据你的程序来使用对应的 TableEnvironment,是 BatchTableEnvironment 还是 StreamTableEnvironment。默认两个 planner 都是在 Flink 的安装目录下 lib 文件夹中存在的,所以应该在你的程序中指定使用哪种 planner。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Flink Streaming query
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
//或者 TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);

// Flink Batch query
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

// Blink Streaming query
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
//或者 TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

// Blink Batch query
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

如果在 lib 目录下只存在一个 planner,则可以使用 useAnyPlanner 来创建指定的 EnvironmentSettings。

5.1.7 Table API & SQL 应用程序的结构

批处理和流处理的 Table API & SQL 作业都有相同的模式,它们的代码结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//根据前面内容创建一个 TableEnvironment,指定是批作业还是流作业
TableEnvironment tableEnv = ...;

//用下面的其中一种方式注册一个 Table
tableEnv.registerTable("table1", ...)
tableEnv.registerTableSource("table2", ...);
tableEnv.registerExternalCatalog("extCat", ...);

//注册一个 TableSink
tableEnv.registerTableSink("outputTable", ...);

//根据一个 Table API 查询创建一个 Table
Table tapiResult = tableEnv.scan("table1").select(...);
//根据一个 SQL 查询创建一个 Table
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");

//将 Table API 或者 SQL 的结果发送给 TableSink
tapiResult.insertInto("outputTable");

//运行
tableEnv.execute("java_job");

5.1.8 Catalog 中注册 Table

Table 有两种类型,输入表和输出表,可以在 Table API & SQL 查询中引用输入表并提供输入数据,输出表可以用于将 Table API & SQL 的查询结果发送到外部系统。输出表可以通过 TableSink 来注册,输入表可以从各种数据源进行注册:

  • 已经存在的 Table 对象,通过是 Table API 或 SQL 查询的结果
  • 连接了外部系统的 TableSource,比如文件、数据库、MQ
  • 从 DataStream 或 DataSet 程序中返回的 DataStream 和 DataSet

注册 Table

在 TableEnvironment 中可以像下面这样注册一个 Table:

1
2
3
4
5
6
7
8
//创建一个 TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

//projTable 是一个简单查询的结果
Table projTable = tableEnv.scan("X").select(...);

//将 projTable 表注册为 projectedTable 表
tableEnv.registerTable("projectedTable", projTable);

注册 TableSource

注册 TableSink

5.1.9 注册外部的 Catalog

5.1.10 查询 Table

Table API

SQL

Table API & SQL

5.1.11 提交 Table

5.1.12 翻译并执行查询

加入知识星球可以看到上面文章:https://t.zsxq.com/MNBEYvf

5.1.13 小结与反思

本节介绍了 Flink 新的 planner,然后详细的和之前的 planner 做了对比,然后对 Table API & SQL 中的概念做了介绍,还通过样例去介绍了它们的通用 API。

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 第五章 —— Table API & SQL
    1. 1.1. 5.1 Flink Table & SQL 概念与通用 API
      1. 1.1.1. 5.1.1 新增 Blink SQL 查询处理器
      2. 1.1.2. 5.1.2 为什么选择 Table API & SQL?
      3. 1.1.3. 5.1.3 Flink Table 项目模块
      4. 1.1.4. 5.1.4 两种 planner 之间的区别
      5. 1.1.5. 5.1.5 添加项目依赖
      6. 1.1.6. 5.1.6 创建一个 TableEnvironment
      7. 1.1.7. 5.1.7 Table API & SQL 应用程序的结构
      8. 1.1.8. 5.1.8 Catalog 中注册 Table
        1. 1.1.8.1. 注册 Table
        2. 1.1.8.2. 注册 TableSource
        3. 1.1.8.3. 注册 TableSink
      9. 1.1.9. 5.1.9 注册外部的 Catalog
      10. 1.1.10. 5.1.10 查询 Table
        1. 1.1.10.1. Table API
        2. 1.1.10.2. SQL
        3. 1.1.10.3. Table API & SQL
      11. 1.1.11. 5.1.11 提交 Table
      12. 1.1.12. 5.1.12 翻译并执行查询
      13. 1.1.13. 5.1.13 小结与反思