Flink 从0到1学习 —— 如何使用 Side Output 来分流?

前言

之前在 Flink 从0到1学习—— Flink 不可以连续 Split(分流)? 讲过 Flink 使用连续的 Split 会有问题,当时提供了几种解决方法,有一种方法就是使用 Side Output 来进行,当时留了个余念,那么就在这篇文章详细的讲一波,教大家如何使用 Side Output 来分流。

Side Output

通常我们在处理数据的时候,有时候想对不同情况的数据进行不同的处理,那么就需要把数据流进行分流。比如我们在那篇文章里面的例子:需要将从 Kafka 过来的告警和恢复数据进行分类拆分,然后在对每种数据再分为告警数据和恢复数据。

如果是使用 filter 来进行拆分,也能满足我们的需求,但每次筛选过滤都要保留整个流,然后通过遍历整个流来获取相应的数据,显然很浪费性能。假如能够在一个流里面就进行多次输出就好了,恰好 Flink 的 Side Output 则提供了这样的功能。

如何使用?

要使用 Side Output 的话,你首先需要做的是定义一个 OutputTag 来标识 Side Output,代表这个 Tag 是要收集哪种类型的数据,如果是要收集多种不一样类型的数据,那么你就需要定义多种 OutputTag。例如:如果我要将告警/恢复的数据分为机器、容器、中间件等的数据,那么我们起码就得定义三个 OutputTag,如下:

1
2
3
4
5
6
private static final OutputTag<AlertEvent> middleware = new OutputTag<AlertEvent>("MIDDLEWARE") {
};
private static final OutputTag<AlertEvent> machine = new OutputTag<AlertEvent>("MACHINE") {
};
private static final OutputTag<AlertEvent> docker = new OutputTag<AlertEvent>("DOCKER") {
};

然后呢,你可以使用下面几种函数来处理数据,在处理数据的过程中,进行判断将不同种类型的数据存到不同的 OutputTag 中去。

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//dataStream 是总的数据流
SingleOutputStreamOperator<AlertEvent, AlertEvent> outputStream = dataStream.process(new ProcessFunction<AlertEvent, AlertEvent>() {
@Override
public void processElement(AlertEvent value, Context ctx, Collector<AlertEvent> out) throws Exception {
if ("MACHINE".equals(value.type)) {
ctx.output(machine, value);
} else if ("DOCKER".equals(value.type)) {
ctx.output(docker, value);
} else if ("MIDDLEWARE".equals(value.type)) {
ctx.output(middleware, value);
} else {
//其他的业务逻辑
out.collect(value);
}
}
})

好了,既然上面我们已经将不同类型的数据进行放到不同的 OutputTag 里面了,那么我们该如何去获取呢?你可以使用 getSideOutput 方法来获取不同 OutputTag 的数据,比如:

1
2
3
4
5
6
7
8
//机器相关的告警&恢复数据
outputStream.getSideOutput(machine).print();

//容器相关的告警&恢复数据
outputStream.getSideOutput(docker).print();

//中间件相关的告警&恢复数据
outputStream.getSideOutput(middleware).print();

这样你就可以获取到 Side Output 数据了。

另外你还可以看下我在 Github 放的一个完整 demo 代码: https://github.com/zhisheng17/flink-learning/blob/master/flink-learning-examples/src/main/java/com/zhisheng/examples/streaming/sideoutput/Main.java

总结

本文讲了如何使用 Side Output 来进行分流,比较简单,大家可以稍微阅读一下 demo 代码就可以很清楚了解。

本文地址是:http://www.54tianzhisheng.cn/2019/08/18/flink-side-output/

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号(zhisheng)了,你可以回复关键字:Flink 即可无条件获取到。另外也可以加我微信 你可以加我的微信:yuanblog_tzs,探讨技术!

知识星球

专栏介绍

首发地址:http://www.54tianzhisheng.cn/2019/11/15/flink-in-action/

专栏地址:https://gitbook.cn/gitchat/column/5dad4a20669f843a1a37cb4f

Github 代码仓库

https://github.com/zhisheng17/flink-learning/

以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客

博客

1、Flink 从0到1学习 —— Apache Flink 介绍

2、Flink 从0到1学习 —— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

3、Flink 从0到1学习 —— Flink 配置文件详解

4、Flink 从0到1学习 —— Data Source 介绍

5、Flink 从0到1学习 —— 如何自定义 Data Source ?

6、Flink 从0到1学习 —— Data Sink 介绍

7、Flink 从0到1学习 —— 如何自定义 Data Sink ?

8、Flink 从0到1学习 —— Flink Data transformation(转换)

9、Flink 从0到1学习 —— 介绍 Flink 中的 Stream Windows

10、Flink 从0到1学习 —— Flink 中的几种 Time 详解

11、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 ElasticSearch

12、Flink 从0到1学习 —— Flink 项目如何运行?

13、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Kafka

14、Flink 从0到1学习 —— Flink JobManager 高可用性配置

15、Flink 从0到1学习 —— Flink parallelism 和 Slot 介绍

16、Flink 从0到1学习 —— Flink 读取 Kafka 数据批量写入到 MySQL

17、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RabbitMQ

18、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HBase

19、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HDFS

20、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Redis

21、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Cassandra

22、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Flume

23、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 InfluxDB

24、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RocketMQ

25、Flink 从0到1学习 —— 你上传的 jar 包藏到哪里去了

26、Flink 从0到1学习 —— 你的 Flink job 日志跑到哪里去了

27、阿里巴巴开源的 Blink 实时计算框架真香

28、Flink 从0到1学习 —— Flink 中如何管理配置?

29、Flink 从0到1学习—— Flink 不可以连续 Split(分流)?

30、Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文

31、Flink 架构、原理与部署测试

32、为什么说流处理即未来?

33、OPPO 数据中台之基石:基于 Flink SQL 构建实时数据仓库

34、流计算框架 Flink 与 Storm 的性能对比

35、Flink状态管理和容错机制介绍

36、Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理

37、360深度实践:Flink与Storm协议级对比

38、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了

39、Apache Flink 1.9 重大特性提前解读

40、Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)

41、Flink 灵魂两百问,这谁顶得住?

42、Flink 从0到1学习 —— 如何使用 Side Output 来分流?

43、你公司到底需不需要引入实时计算引擎?

44、一文让你彻底了解大数据实时计算引擎 Flink

源码解析

1、Flink 源码解析 —— 源码编译运行

2、Flink 源码解析 —— 项目结构一览

3、Flink 源码解析—— local 模式启动流程

4、Flink 源码解析 —— standalone session 模式启动流程

5、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Job Manager 启动

6、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Task Manager 启动

7、Flink 源码解析 —— 分析 Batch WordCount 程序的执行过程

8、Flink 源码解析 —— 分析 Streaming WordCount 程序的执行过程

9、Flink 源码解析 —— 如何获取 JobGraph?

10、Flink 源码解析 —— 如何获取 StreamGraph?

11、Flink 源码解析 —— Flink JobManager 有什么作用?

12、Flink 源码解析 —— Flink TaskManager 有什么作用?

13、Flink 源码解析 —— JobManager 处理 SubmitJob 的过程

14、Flink 源码解析 —— TaskManager 处理 SubmitJob 的过程

15、Flink 源码解析 —— 深度解析 Flink Checkpoint 机制

16、Flink 源码解析 —— 深度解析 Flink 序列化机制

17、Flink 源码解析 —— 深度解析 Flink 是如何管理好内存的?

18、Flink Metrics 源码解析 —— Flink-metrics-core

19、Flink Metrics 源码解析 —— Flink-metrics-datadog

20、Flink Metrics 源码解析 —— Flink-metrics-dropwizard

21、Flink Metrics 源码解析 —— Flink-metrics-graphite

22、Flink Metrics 源码解析 —— Flink-metrics-influxdb

23、Flink Metrics 源码解析 —— Flink-metrics-jmx

24、Flink Metrics 源码解析 —— Flink-metrics-slf4j

25、Flink Metrics 源码解析 —— Flink-metrics-statsd

26、Flink Metrics 源码解析 —— Flink-metrics-prometheus

26、Flink Annotations 源码解析

27、Flink 源码解析 —— 如何获取 ExecutionGraph ?

28、大数据重磅炸弹——实时计算框架 Flink

29、Flink Checkpoint-轻量级分布式快照

30、Flink Clients 源码解析

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 前言
  2. 2. Side Output
  3. 3. 如何使用?
  4. 4. 总结
  5. 5. 专栏介绍
  6. 6. Github 代码仓库
  7. 7. 博客
  8. 8. 源码解析