如何收集 Yarn/K8s 集群中的 Flink 任务日志?

从一个小需求(任务异常或者失败时排查问题不便)出发,开始调研业界常见的解决方案进行解决我们的需求,接着有对应的代码实现和效果展示。

背景

不管是 Flink On Yarn 还是 On k8s,如果任务正常运行,我们是可以通过 Flink Web UI 去查看 JobManager 和 TaskManager 日志,虽然日志量大的时候去不同的 TaskManager 找日志有点困难(如何快速知道日志在哪个 TaskManager 上;在 TaskManager 里面可能有多个滚动的日志文件,如何快速找到 root cause 异常;如果 TaskManager OOM 掉了该容器的日志就看不到了),但是起码给了一个可以看日志的途径。

熟悉 Flink On Yarn 的应该知道 Flink 任务运行结束/失败后,只能去 Yarn UI 看到任务的 Jobmanager 日志,对于 TaskManager 日志这些是看不到的,这对于有时候想排查下任务失败的原因日志会比较困难(不过大多数任务挂掉的原因日志都会在 Jobmanager 存在)。

熟悉 Flink On K8s 的那更能体验到查看日志的痛苦了,在任务运行失败和结束后,所有的 Pod 都会退出,如果没有收集这些运行日志,那几乎很难知道任务为啥会失败。

Flink History Server 不像 Spark History Server 一样可以看到任务所有运行的 Excutor 日志,所以对于故障定位 Flink 任务异常日志这个场景,Flink 自带的那些体验不是很友好。因此也有本文的出现,来讲述一下如何针对上面两种运行模式下 Flink 任务的日志收集,来解决我们不方便定位任务异常失败的需求。

当然了,我们收集到这些日志数据后,可以用来做异常日志告警提醒任务负责人作业异常信息(这个后面可以专门开篇文章来写),也可以收集起来存储到 ES,方便用户排查任务异常日志。

方案选择

常见的收集日志方案有下面两种:

1、统一 LogAgent 收集。不管是使用 Flink On Yarn 还是 Flink On K8s,日志都可以配置一个路径(路径有规则),然后每台计算节点机器专门部署一个 LogAgent (比如有 Filebeat)去收集这些运行日志。K8s 的话会比 Yarn 的日志要收集的话稍微会复杂一些,需要 Flink 任务挂载磁盘,这样日志文件数据路径比较固定,否则日志文件是在容器 Pod 内,会随着 Pod 的生命周期而消失。这种方式需要在每台机器都部署一个专门用来收集日志的 Agent,还要额外维护它的稳定性,不然可能会漏收集到任务的日志。

2、自定义 Kafka Appender。这种方式要根据日志框架进行自定义一个 Appender,将定义好的 Appender 打包后放到 Flink lib 目录,然后配置好 log4j 配置,任务启动后会自动加载这个依赖,运行过程中会自动实时将日志发送到 Kafka。这个 Appender 定义可以比较灵活(具体的可以看下文的代码实现),比如加入一些过滤条件:只收集 warn 级别以上的日志(因为任务多了的话收集所有的级别日志数据量会很大,但是对排查问题带来的作用有限)。这种方式和任务运行在 Yarn 和 K8s 无关,都可以正常收集日志,不用单独配置,也不用单独去维护什么组件的稳定性,唯一的缺点就是对已经在运行的任务如果想要收集日志需要重启一下即可,相比来说我个人觉得还是这种方式会比较合适。

整个架构如上,你理解图中的 Reporter 包含三个:第一个是自定义的 Kafka Appender,第二个是自定义的 Kafka Metrics Reporter,第三个是根据官方的 Prometheus PushGateway Metrics Reporter 做了内部改造的。前两个是本篇要讲解的,后两个后面也可以单独再开文章来讲。

自定义 Kafka Appender

需求

自定义 Kafka Appender,将日志数据发到 kafka,但是如何对日志数据进行标识,需要利用 log.file 环境变量来获取作业的 application id、container id、host、jobmanager/taskmanager 信息等。

1
2
3
2021-07-27 20:39:55,777 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -Dout.file=/data/HDATA/yarn/logs/application_1596708697506_12674/container_e11_1596708697506_12674_01_000005/taskmanager.out
2021-07-27 20:39:55,777 INFO org.apache.flink.yarn.YarnTaskExecutorRunner [] - -Dlog.file=/data/HDATA/yarn/logs/application_1596708697506_12674/container_e11_1596708697506_12674_01_000005/taskmanager.log
2021-07-27 20:39:55,777 INFO org.apache.flink.yarn.YarnTaskExecutorRunner [] - -Derr.file=/data/HDATA/yarn/logs/application_1596708697506_12674/container_e11_1596708697506_12674_01_000005/taskmanager.err

上面这个是针对 Yarn 可以根据 log.file 这个环境变量拿到作业的一些维度数据,其实 K8s 也可以在环境变量中拿到这些想要的信息,可能需要改造点 Flink-K8s 模块代码(其实就是添加一点容器运行环境变量,比如 K8s 中的任务 ClusterId、运行的物理机器 IP、Pod IP),如下图所示:

修改框架源码

在 1.10 和 1.12 中,Flink 作业在发生状态转换成 FAILED 时,作业打印出来的日志级别竟然是 info,而这种日志是作业异常的根因,对于我们要收集作业的异常日志,那么就得更改源码,提高日志级别。

提高这些地方的日志级别后,即可收集作业出现 failed 或者 restarting 时的异常日志,从而可以方便用户定位作业异常的原因(当然了,上面这些属于锦上添花修改了,如果不改其实有办法能够在自定义收集器里面去收集到这些数据)。

因为早期开发的时候公司内部 Flink 1.10 和 1.12 两个版本共存,1.10 使用的是log4j,1.12 使用的是 log4j2,两个版本不一致所以自定义 Kafka Appender 也有点区别,所以分别开发了两个版本,这里也都讲一下,请挑选自己需要的阅读。

我已经将代码上传至 Github 了,感兴趣可以参考一下:https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-extends/FlinkLogKafkaAppender 如果对你有帮助的话可以帮忙点个 star。代码结构如下图所示:

FlinkLogKafkaAppender 父模块引入了基础需要的依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-learning-extends</artifactId>
<groupId>com.zhisheng.flink</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>FlinkLogKafkaAppender</artifactId>
<packaging>pom</packaging>

<name>FlinkLogKafkaAppender</name>

<modules>
<module>Log4jKafkaAppender</module>
<module>Log4j2KafkaAppender</module>
<module>KafkaAppenderCommon</module>
</modules>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<kafka.version>2.4.1</kafka.version>
<scope>provided</scope>
<jackson.version>2.11.0</jackson.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
  • KafkaAppenderCommon 模块:定义了日志类和工具类
  • Log4j2KafkaAppender 模块:针对使用了 log4j2 的 Flink 版本
  • Log4jKafkaAppender 模块:针对使用了 log4j 的 Flink 版本

KafkaAppenderCommon

定义的要发送到 Kafka 的数据结构类:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class LogEvent {

private String source; // default is flink, maybe others will use this kafka appender in future

private String id; // log id, default it is UUID

private Long timestamp;

private String content; // log message

private Map<String, String> tags = new HashMap<>(); // tags of the log, eg: host_name, application_id, job_name etc

}

序列化工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class JacksonUtil {

private final static ObjectMapper mapper = new ObjectMapper();

/**
* 将对象转换成普通的 JSON 数据
*
* @param value
* @return
* @throws JsonProcessingException
*/
public static String toJson(Object value) throws JsonProcessingException {
return mapper.writeValueAsString(value);
}

/**
* 将对象转换成结构化的 JSON 数据
*
* @param value
* @return
* @throws JsonProcessingException
*/
public static String toFormatJson(Object value) throws JsonProcessingException {
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(value);
}
}

对异常日志数据处理的工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
public class ExceptionUtil {

private static final char TAB = ' ';
private static final char CR = '\r';
private static final char LF = '\n';
private static final String SPACE = " ";
private static final String EMPTY = "";

/**
* 堆栈转为单行完整字符串
*
* @param throwable 异常对象
* @param limit 限制最大长度
* @return 堆栈转为的字符串
*/
public static String stacktraceToOneLineString(Throwable throwable, int limit) {
Map<Character, String> replaceCharToStrMap = new HashMap<>();
replaceCharToStrMap.put(CR, SPACE);
replaceCharToStrMap.put(LF, SPACE);
replaceCharToStrMap.put(TAB, SPACE);
return stacktraceToString(throwable, limit, replaceCharToStrMap);
}

public static String stacktraceToString(Throwable throwable) {
final OutputStream baos = new ByteArrayOutputStream();
throwable.printStackTrace(new PrintStream(baos));
return baos.toString();
}


/**
* 堆栈转为完整字符串
*
* @param throwable 异常对象
* @param limit 限制最大长度
* @param replaceCharToStrMap 替换字符为指定字符串
* @return 堆栈转为的字符串
*/
private static String stacktraceToString(Throwable throwable, int limit, Map<Character, String> replaceCharToStrMap) {
final OutputStream baos = new ByteArrayOutputStream();
throwable.printStackTrace(new PrintStream(baos));
String exceptionStr = baos.toString();
int length = exceptionStr.length();
if (limit > 0 && limit < length) {
length = limit;
}

if (!replaceCharToStrMap.isEmpty()) {
final StringBuilder sb = new StringBuilder();
char c;
String value;
for (int i = 0; i < length; i++) {
c = exceptionStr.charAt(i);
value = replaceCharToStrMap.get(c);
if (null != value) {
sb.append(value);
} else {
sb.append(c);
}
}
return sb.toString();
} else {
return sub(exceptionStr, 0, limit);
}
}

/**
* 改进JDK subString<br>
* index从0开始计算,最后一个字符为-1<br>
* 如果from和to位置一样,返回 "" <br>
* 如果from或to为负数,则按照length从后向前数位置,如果绝对值大于字符串长度,则from归到0,to归到length<br>
* 如果经过修正的index中from大于to,则互换from和to example: <br>
* abcdefgh 2 3 =》 c <br>
* abcdefgh 2 -3 =》 cde <br>
*
* @param str String
* @param fromIndex 开始的index(包括)
* @param toIndex 结束的index(不包括)
* @return 字串
*/
private static String sub(CharSequence str, int fromIndex, int toIndex) {
if (isEmpty(str)) {
return str(str);
}
int len = str.length();

if (fromIndex < 0) {
fromIndex = len + fromIndex;
if (fromIndex < 0) {
fromIndex = 0;
}
} else if (fromIndex > len) {
fromIndex = len;
}

if (toIndex < 0) {
toIndex = len + toIndex;
if (toIndex < 0) {
toIndex = len;
}
} else if (toIndex > len) {
toIndex = len;
}

if (toIndex < fromIndex) {
int tmp = fromIndex;
fromIndex = toIndex;
toIndex = tmp;
}

if (fromIndex == toIndex) {
return EMPTY;
}

return str.toString().substring(fromIndex, toIndex);
}

/**
* {@link CharSequence} 转为字符串,null安全
*
* @param cs {@link CharSequence}
* @return 字符串
*/
private static String str(CharSequence cs) {
return null == cs ? null : cs.toString();
}

/**
* 字符串是否为空,空的定义如下:<br>
* 1、为null <br>
* 2、为""<br>
*
* @param str 被检测的字符串
* @return 是否为空
*/
private static boolean isEmpty(CharSequence str) {
return str == null || str.length() == 0;
}
}

自定义 Log4j2 Kafka Appender

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>LogKafkaAppender</artifactId>
<groupId>org.example</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>Log4j2KafkaAppender</artifactId>

<properties>
<log4j.version>2.12.1</log4j.version>
<flink.shaded.version>12.0</flink.shaded.version>
<jackson.version>2.10.1</jackson.version>
</properties>


<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>${jackson.version}-${flink.shaded.version}</version>
<scope>${scope}</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.apache.kafka:*</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

自定义的 Kafka Appender 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package com.zhisheng.log.appender;

import com.zhisheng.flink.model.LogEvent;
import com.zhisheng.flink.util.ExceptionUtil;
import com.zhisheng.flink.util.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;

import java.io.File;
import java.io.Serializable;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

@Slf4j
@Plugin(name = "KafkaLog4j2Appender", category = "Core", elementType = "appender", printObject = true)
public class KafkaLog4j2Appender extends AbstractAppender {

private final String source;

private final String topic;

private final String level;

private final Producer<String, String> producer;

private String appId;

private String containerId;

private String containerType;

private final String taskName;

private final String taskId;

private String nodeIp;

protected KafkaLog4j2Appender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, Property[] properties, String source, String bootstrapServers, String topic, String level) {
super(name, filter, layout, ignoreExceptions, properties);
this.source = source;
this.topic = topic;
this.level = level;

Properties envProperties = System.getProperties();
Map<String, String> envs = System.getenv();
String clusterId = envs.get("CLUSTER_ID");
if (clusterId != null) {
//k8s cluster
appId = clusterId;
containerId = envs.get("HOSTNAME");
if (envs.get("HOSTNAME").contains("taskmanager")) {
containerType = "taskmanager";
} else {
containerType = "jobmanager";
}
//k8s 物理机器 ip
if (envs.get("_HOST_IP_ADDRESS") != null) {
nodeIp = envs.get("_HOST_IP_ADDRESS");
}
} else {
//yarn cluster
String logFile = envProperties.getProperty("log.file");
String[] values = logFile.split(File.separator);
if (values.length >= 3) {
appId = values[values.length - 3];
containerId = values[values.length - 2];
String log = values[values.length - 1];
if (log.contains("jobmanager")) {
containerType = "jobmanager";
} else if (log.contains("taskmanager")) {
containerType = "taskmanager";
} else {
containerType = "others";
}
} else {
log.error("log.file Property ({}) doesn't contains yarn application id or container id", logFile);
}
}

taskName = envProperties.getProperty("taskName", null);
taskId = envProperties.getProperty("taskId", null);

Properties props = new Properties();
for (Property property : properties) {
props.put(property.getName(), property.getValue());
}

if (bootstrapServers != null) {
props.setProperty("bootstrap.servers", bootstrapServers);
} else {
throw new ConfigException("The bootstrap servers property must be specified");
}
if (this.topic == null) {
throw new ConfigException("Topic must be specified by the Kafka log4j appender");
}

String clientIdPrefix = taskId != null ? taskId : appId;

if (clientIdPrefix != null) {
props.setProperty("client.id", clientIdPrefix + "_log");
}

if (props.getProperty("acks") == null) {
props.setProperty("acks", "0");
}

if (props.getProperty("retries") == null) {
props.setProperty("retries", "0");
}

if (props.getProperty("batch.size") == null) {
props.setProperty("batch.size", "16384");
}

if (props.getProperty("linger.ms") == null) {
props.setProperty("linger.ms", "5");
}

if (props.getProperty("compression.type") == null) {
props.setProperty("compression.type", "lz4");
}

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<>(props);
}


@Override
public void append(org.apache.logging.log4j.core.LogEvent event) {
try {
if (level.contains(event.getLevel().toString().toUpperCase()) && !event.getLoggerName().contains("xxx")) { //控制哪些类的日志不收集
producer.send(new ProducerRecord<>(topic, appId, subAppend(event)));
}
} catch (Exception e) {
log.warn("Parsing the log event or send log event to kafka has exception", e);
}
}

private String subAppend(org.apache.logging.log4j.core.LogEvent event) throws JsonProcessingException {
LogEvent logEvent = new LogEvent();
Map<String, String> tags = new HashMap<>();
String logMessage = null;
try {
InetAddress inetAddress = InetAddress.getLocalHost();
tags.put("host_name", inetAddress.getHostName());
tags.put("host_ip", inetAddress.getHostAddress());
} catch (Exception e) {
log.error("Error getting the ip and host name of the node where the job({}) is running", appId, e);
} finally {
try {
logMessage = ExceptionUtil.stacktraceToString(event.getThrown());
logEvent.setContent(logMessage);
} catch (Exception e) {
if (logMessage != null) {
logMessage = logMessage + "\n\t" + e.getMessage();
}
logEvent.setContent(logMessage);
} finally {
logEvent.setId(UUID.randomUUID().toString());
logEvent.setTimestamp(event.getTimeMillis());
logEvent.setSource(source);
if (logMessage != null) {
logMessage = event.getMessage().getFormattedMessage() + "\n" + logMessage;
} else {
logMessage = event.getMessage().getFormattedMessage();
}
logEvent.setContent(logMessage);

StackTraceElement eventSource = event.getSource();
tags.put("class_name", eventSource.getClassName());
tags.put("method_name", eventSource.getMethodName());
tags.put("file_name", eventSource.getFileName());
tags.put("line_number", String.valueOf(eventSource.getLineNumber()));

tags.put("logger_name", event.getLoggerName());
tags.put("level", event.getLevel().toString());
tags.put("thread_name", event.getThreadName());
tags.put("app_id", appId);
tags.put("container_id", containerId);
tags.put("container_type", containerType);
if (taskId != null) {
tags.put("task_id", taskId);
}
if (taskName != null) {
tags.put("task_name", taskName);
}
if (nodeIp != null) {
tags.put("node_ip", nodeIp);
}
logEvent.setTags(tags);
}
}
return JacksonUtil.toJson(logEvent);
}


@PluginFactory
public static KafkaLog4j2Appender createAppender(@PluginElement("Layout") final Layout<? extends Serializable> layout,
@PluginElement("Filter") final Filter filter,
@Required(message = "No name provided for KafkaLog4j2Appender") @PluginAttribute("name") final String name,
@PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
@Required(message = "No bootstrapServers provided for KafkaLog4j2Appender") @PluginAttribute("bootstrapServers") final String bootstrapServers,
@Required(message = "No source provided for KafkaLog4j2Appender") @PluginAttribute("source") final String source,
@Required(message = "No topic provided for KafkaLog4j2Appender") @PluginAttribute("topic") final String topic,
@Required(message = "No level provided for KafkaLog4j2Appender") @PluginAttribute("level") final String level,
@PluginElement("Properties") final Property[] properties) {
return new KafkaLog4j2Appender(name, filter, layout, ignoreExceptions, properties, source, bootstrapServers, topic, level);
}

@Override
public void stop() {
super.stop();
if (producer != null) {
producer.close();
}
}
}

修改 Log4j2 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# monitorInterval=30

# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender
rootLogger.appenderRef.kafka.ref = KafkaLog4j2Appender


# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file}
appender.main.filePattern = ${sys:log.file}.%i
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}


appender.kafka.name = KafkaLog4j2Appender
appender.kafka.type = KafkaLog4j2Appender
appender.kafka.source = flink-1.12.0
appender.kafka.bootstrapServers=http://fat-kafka1.com.cn:9092,http://fat-kafka2.com.cn:9092,http://fat-kafka3.com.cn:9092
appender.kafka.topic = yarn_flink_log
appender.kafka.level = ERROR,WARN

logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

注意,针对 K8s 需要修改的是 log4j-console.properties 才能生效

发到 Kafka 数据的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"source": "flink-1.12.0",
"id": "855986dd-993a-4efe-99ab-658fe6bf1683",
"timestamp": 1628131862944,
"content": "Source: Custom Source (1/2) (6302f688e5405815b719bb236634f341) switched from RUNNING to FAILED on container_e12_1626247520347_2269_01_000002 @ uat-hadoopuat-dc01-025187.vm.dc01.tech (dataPort=9916).\njava.lang.Exception: [2021-08-05 10:51:00.641]Container killed on request. Exit code is 137\n[2021-08-05 10:51:00.641]Container exited with a non-zero exit code 137. \n[2021-08-05 10:51:00.643]Killed by external signal\n\n\tat org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.onWorkerTerminated(ActiveResourceManager.java:219)\n\tat org.apache.flink.yarn.YarnResourceManagerDriver$YarnContainerEventHandler.lambda$onContainersCompleted$0(YarnResourceManagerDriver.java:522)\n\tat org.apache.flink.yarn.YarnResourceManagerDriver$YarnContainerEventHandler.lambda$runAsyncWithFatalHandler$2(YarnResourceManagerDriver.java:549)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat akka.actor.Actor$class.aroundReceive(Actor.scala:517)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n",
"tags": {
"host_ip": "10.69.1.10",
"method_name": "transitionState",
"level": "WARN",
"file_name": "Execution.java",
"line_number": "1608",
"thread_name": "flink-akka.actor.default-dispatcher-3",
"container_type": "jobmanager",
"logger_name": "org.apache.flink.runtime.executiongraph.ExecutionGraph",
"class_name": "org.apache.flink.runtime.executiongraph.Execution",
"app_id": "application_1626247520347_2269",
"host_name": "FAT-hadoopuat-69110.vm.dc01.tech",
"container_id": "container_e12_1626247520347_2269_01_000001"
}
}

自定义 Log4j Kafka Appender

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>FlinkLogKafkaAppender</artifactId>
<groupId>com.zhisheng.flink</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>Log4jKafkaAppender</artifactId>

<properties>
<slf4j.version>1.7.15</slf4j.version>
<flink.shaded.version>9.0</flink.shaded.version>
<jackson.version>2.10.1</jackson.version>
</properties>


<dependencies>
<dependency>
<groupId>com.zhisheng.flink</groupId>
<artifactId>KafkaAppenderCommon</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>${jackson.version}-${flink.shaded.version}</version>
<scope>${scope}</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.apache.kafka:*</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

KafkaLog4jAppender: 继承 AppenderSkeleton 抽象类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
public class KafkaLog4jAppender extends AppenderSkeleton {

private String bootstrapServers;

private String source;

private String topic;

private String level;

private String acks;

private String compressionType;

private String retries;

private String batchSize;

private String lingerMs;

private String maxRequestSize;

private String requestTimeoutMs;

private Producer<String, String> producer;

private String appId;

private String containerId;

private String containerType;

private String taskId;

private String taskName;

@Override
public void activateOptions() {
super.activateOptions();

Properties envProperties = System.getProperties();

String logFile = envProperties.getProperty("log.file");
String[] values = logFile.split(File.separator);
if (values.length >= 3) {
appId = values[values.length - 3];
containerId = values[values.length - 2];
String log = values[values.length - 1];
if (log.contains("jobmanager")) {
containerType = "jobmanager";
} else if (log.contains("taskmanager")) {
containerType = "taskmanager";
} else {
containerType = "others";
}
} else {
log.error("log.file Property ({}) doesn't contains yarn application id or container id", logFile);
}

taskId = envProperties.getProperty("taskId", null);
taskName = envProperties.getProperty("taskName", null);

Properties props = new Properties();
if (this.bootstrapServers != null) {
props.setProperty("bootstrap.servers", this.bootstrapServers);
} else {
throw new ConfigException("The bootstrap servers property must be specified");
}
if (this.topic == null) {
throw new ConfigException("Topic must be specified by the Kafka log4j appender");
}
if (this.source == null) {
throw new ConfigException("Source must be specified by the Kafka log4j appender");
}

String clientIdPrefix = taskId != null ? taskId : appId;

if (clientIdPrefix != null) {
props.setProperty("client.id", clientIdPrefix + "_log");
}

if (this.acks != null) {
props.setProperty("acks", this.acks);
} else {
props.setProperty("acks", "0");
}

if (this.retries != null) {
props.setProperty("retries", this.retries);
} else {
props.setProperty("retries", "0");
}

if (this.batchSize != null) {
props.setProperty("batch.size", this.batchSize);
} else {
props.setProperty("batch.size", "16384");
}

if (this.lingerMs != null) {
props.setProperty("linger.ms", this.lingerMs);
} else {
props.setProperty("linger.ms", "5");
}

if (this.compressionType != null) {
props.setProperty("compression.type", this.compressionType);
} else {
props.setProperty("compression.type", "lz4");
}

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<>(props);
}

@Override
protected void append(LoggingEvent loggingEvent) {
try {
if (level.contains(loggingEvent.getLevel().toString().toUpperCase()) && !loggingEvent.getLoggerName().contains("xxx")) { //控制哪些类的日志不收集
producer.send(new ProducerRecord<>(topic, appId, subAppend(loggingEvent)));
}
} catch (Exception e) {
log.warn("Parsing the log event or send log event to kafka has exception", e);
}
}

private String subAppend(LoggingEvent event) throws JsonProcessingException {
LogEvent logEvent = new LogEvent();
Map<String, String> tags = new HashMap<>();
String logMessage = null;
try {
InetAddress inetAddress = InetAddress.getLocalHost();
tags.put("host_name", inetAddress.getHostName());
tags.put("host_ip", inetAddress.getHostAddress());
} catch (Exception e) {
log.error("Error getting the ip and host name of the node where the job({}) is running", appId, e);
} finally {
try {
logMessage = ExceptionUtil.stacktraceToString(event.getThrowableInformation().getThrowable());
logEvent.setContent(logMessage);
} catch (Exception e) {
if (logMessage != null) {
logMessage = logMessage + "\n\t" + e.getMessage();
}
logEvent.setContent(logMessage);
} finally {
logEvent.setId(UUID.randomUUID().toString());
logEvent.setTimestamp(event.getTimeStamp());
logEvent.setSource(source);
if (logMessage != null) {
logMessage = event.getMessage().toString() + "\n" + logMessage;
} else {
logMessage = event.getMessage().toString();
}
logEvent.setContent(logMessage);
LocationInfo locationInformation = event.getLocationInformation();
tags.put("class_name", locationInformation.getClassName());
tags.put("method_name", locationInformation.getMethodName());
tags.put("file_name", locationInformation.getFileName());
tags.put("line_number", locationInformation.getLineNumber());
tags.put("logger_name", event.getLoggerName());
tags.put("level", event.getLevel().toString());
tags.put("thread_name", event.getThreadName());
tags.put("app_id", appId);
tags.put("container_id", containerId);
tags.put("container_type", containerType);
if (taskName != null) {
tags.put("task_name", taskName);
}
if (taskId != null) {
tags.put("task_id", taskId);
}
logEvent.setTags(tags);
}
}
return JacksonUtil.toJson(logEvent);
}

@Override
public void close() {
if (!this.closed) {
this.closed = true;
this.producer.close();
}
}

@Override
public boolean requiresLayout() {
return false;
}

}

将上面打出来的 jar,放到 flink 的 lib 目录下面,修改 log4j.properties 配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, RFA, kafka

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${log.file}
log4j.appender.RFA.MaxFileSize=256MB
log4j.appender.RFA.Append=true
log4j.appender.RFA.MaxBackupIndex=10
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
log4j.appender.RFA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %t %-5p %-60c %x - %m%n

log4j.logger.org.apache.kafka.clients.Metadata=WARN,kafka # 解决死锁问题
log4j.appender.kafka=com.zhisheng.log.appender.KafkaLog4jAppender
log4j.appender.kafka.source=flink-1.10.0
log4j.appender.kafka.bootstrapServers=http://kafka1.com.cn:9092,http://kafka2.com.cn:9092,http://kafka3.com.cn:9092
log4j.appender.kafka.topic=yarn_flink_log
log4j.appender.kafka.level=ERROR,WARN

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, RFA

注意上面的一个解决死锁问题的配置,我们在生产也遇到过,打出来的 jstack 的信息为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
2021-08-20 01:19:24
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.201-b09 mixed mode):

"Attach Listener" #14 daemon prio=9 os_prio=0 tid=0x00007f061c001800 nid=0x728 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"kafka-producer-network-thread | application_1610962534438_1256_log" #11 daemon prio=5 os_prio=0 tid=0x00007f06394ad800 nid=0x27e waiting for monitor entry [0x00007f060be16000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.log4j.Category.callAppenders(Category.java:205)
- waiting to lock <0x00000000ffad7510> (a org.apache.log4j.spi.RootLogger)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
at org.apache.kafka.clients.Metadata.update(Metadata.java:379)
- locked <0x00000000ff1421d8> (a org.apache.kafka.clients.Metadata)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1039)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:822)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
- None

"Service Thread" #9 daemon prio=9 os_prio=0 tid=0x00007f063865c800 nid=0x27c runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"C1 CompilerThread2" #8 daemon prio=9 os_prio=0 tid=0x00007f063862f000 nid=0x27b waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"C2 CompilerThread1" #7 daemon prio=9 os_prio=0 tid=0x00007f063862d000 nid=0x27a waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"C2 CompilerThread0" #6 daemon prio=9 os_prio=0 tid=0x00007f063862a800 nid=0x279 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"Signal Dispatcher" #5 daemon prio=9 os_prio=0 tid=0x00007f0638628800 nid=0x278 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"Surrogate Locker Thread (Concurrent GC)" #4 daemon prio=9 os_prio=0 tid=0x00007f0638626800 nid=0x277 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
- None

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f06385f3800 nid=0x276 in Object.wait() [0x00007f0617c07000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000fff08ed0> (a java.lang.ref.ReferenceQueue$Lock)
at j ava.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
- locked <0x00000000fff08ed0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:216)

Locked ownable synchronizers:
- None

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f06385f1000 nid=0x275 in Object.wait() [0x00007f0617d08000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000fff06bf8> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000000fff06bf8> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

Locked ownable synchronizers:
- None

"main" #1 prio=5 os_prio=0 tid=0x00007f0638011800 nid=0x253 waiting for monitor entry [0x00007f063e7b7000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.kafka.clients.Metadata.fetch(Metadata.java:129)
- waiting to lock <0x00000000ff1421d8> (a org.apache.kafka.clients.Metadata)
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:960)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:866)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:733)
at com.zhisheng.log.appender.KafkaLog4jAppender.append(KafkaLog4jAppender.java:147)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
- locked <0x00000000ff542900> (a com.zhisheng.log.appender.KafkaLog4jAppender)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
- locked <0x00000000ffad7510> (a org.apache.log4j.spi.RootLogger)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.warn(Log4jLoggerAdapter.java:401)
at org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:62)
at org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.<init>(JniBasedUnixGroupsMappingWithFallback.java:38)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at org.apache.hadoop.security.Groups.<init>(Groups.java:106)
at org.apache.hadoop.security.Groups.<init>(Groups.java:101)
at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:449)
- locked <0x00000000fec67710> (a java.lang.Class for org.apache.hadoop.security.Groups)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:327)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:294)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:854)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:824)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:693)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.util.EnvironmentInformation.getHadoopUser(EnvironmentInformation.java:96)
at org.apache.flink.runtime.util.EnvironmentInformation.logEnvironmentInfo(EnvironmentInformation.java:293)
at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:96)

Locked ownable synchronizers:
- None

"VM Thread" os_prio=0 tid=0x00007f06385e7000 nid=0x274 runnable

"Gang worker#0 (Parallel GC Threads)" os_prio=0 tid=0x00007f0638029000 nid=0x256 runnable

"Gang worker#1 (Parallel GC Threads)" os_prio=0 tid=0x00007f063802a800 nid=0x257 runnable

"Gang worker#2 (Parallel GC Threads)" os_prio=0 tid=0x00007f063802c800 nid=0x258 runnable

"Gang worker#3 (Parallel GC Threads)" os_prio=0 tid=0x00007f063802e800 nid=0x259 runnable

"Gang worker#4 (Parallel GC Threads)" os_prio=0 tid=0x00007f0638030000 nid=0x25a runnable

"Gang worker#5 (Parallel GC Threads)" os_prio=0 tid=0x00007f0638032000 nid=0x25b runnable

"Gang worker#6 (Parallel GC Threads)" os_prio=0 tid=0x00007f0638034000 nid=0x25c runnable

"Gang worker#7 (Parallel GC Threads)" os_prio=0 tid=0x00007f0638035800 nid=0x25d runnable

"G1 Main Concurrent Mark GC Thread" os_prio=0 tid=0x00007f063805e000 nid=0x26b runnable

"Gang worker#0 (G1 Parallel Marking Threads)" os_prio=0 tid=0x00007f0638060000 nid=0x26f runnable

"Gang worker#1 (G1 Parallel Marking Threads)" os_prio=0 tid=0x00007f0638061800 nid=0x270 runnable

"Gang worker#2 (G1 Parallel Marking Threads)" os_prio=0 tid=0x00007f0638063800 nid=0x272 runnable

"Gang worker#3 (G1 Parallel Marking Threads)" os_prio=0 tid=0x00007f0638065800 nid=0x273 runnable

"G1 Concurrent Refinement Thread#0" os_prio=0 tid=0x00007f0638047800 nid=0x26a runnable

"G1 Concurrent Refinement Thread#1" os_prio=0 tid=0x00007f0638045800 nid=0x265 runnable

"G1 Concurrent Refinement Thread#2" os_prio=0 tid=0x00007f0638043800 nid=0x264 runnable

"G1 Concurrent Refinement Thread#3" os_prio=0 tid=0x00007f0638042000 nid=0x263 runnable

"G1 Concurrent Refinement Thread#4" os_prio=0 tid=0x00007f0638040000 nid=0x262 runnable

"G1 Concurrent Refinement Thread#5" os_prio=0 tid=0x00007f063803e000 nid=0x261 runnable

"G1 Concurrent Refinement Thread#6" os_prio=0 tid=0x00007f063803c000 nid=0x260 runnable

"G1 Concurrent Refinement Thread#7" os_prio=0 tid=0x00007f063803a800 nid=0x25f runnable

"G1 Concurrent Refinement Thread#8" os_prio=0 tid=0x00007f0638038800 nid=0x25e runnable

"VM Periodic Task Thread" os_prio=0 tid=0x00007f0638660000 nid=0x27d waiting on condition

JNI global references: 652


Found one Java-level deadlock:
=============================
"kafka-producer-network-thread | application_1610962534438_1256_log":
waiting to lock monitor 0x00007f0624005f18 (object 0x00000000ffad7510, a org.apache.log4j.spi.RootLogger),
which is held by "main"
"main":
waiting to lock monitor 0x00007f06240047b8 (object 0x00000000ff1421d8, a org.apache.kafka.clients.Metadata),
which is held by "kafka-producer-network-thread | application_1610962534438_1256_log"

Java stack information for the threads listed above:
===================================================
"kafka-producer-network-thread | application_1610962534438_1256_log":
at org.apache.log4j.Category.callAppenders(Category.java:205)
- waiting to lock <0x00000000ffad7510> (a org.apache.log4j.spi.RootLogger)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
at org.apache.kafka.clients.Metadata.update(Metadata.java:379)
- locked <0x00000000ff1421d8> (a org.apache.kafka.clients.Metadata)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1039)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:822)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
at java.lang.Thread.run(Thread.java:748)
"main":
at org.apache.kafka.clients.Metadata.fetch(Metadata.java:129)
- waiting to lock <0x00000000ff1421d8> (a org.apache.kafka.clients.Metadata)
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:960)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:866)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:733)
at com.zhisheng.log.appender.KafkaLog4jAppender.append(KafkaLog4jAppender.java:147)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
- locked <0x00000000ff542900> (a com.zhisheng.log.appender.KafkaLog4jAppender)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
- locked <0x00000000ffad7510> (a org.apache.log4j.spi.RootLogger)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.warn(Log4jLoggerAdapter.java:401)
at org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:62)
at org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.<init>(JniBasedUnixGroupsMappingWithFallback.java:38)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at org.apache.hadoop.security.Groups.<init>(Groups.java:106)
at org.apache.hadoop.security.Groups.<init>(Groups.java:101)
at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:449)
- locked <0x00000000fec67710> (a java.lang.Class for org.apache.hadoop.security.Groups)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:327)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:294)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:854)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:824)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:693)
- locked <0x00000000fec436d0> (a java.lang.Class for org.apache.hadoop.security.UserGroupInformation)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.util.EnvironmentInformation.getHadoopUser(EnvironmentInformation.java:96)
at org.apache.flink.runtime.util.EnvironmentInformation.logEnvironmentInfo(EnvironmentInformation.java:293)
at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:96)

Found 1 deadlock.

发到 Kafka 数据的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"source": "flink-1.10.0",
"id": "fc527ec2-0a95-4fe8-83d2-d3e889c03658",
"timestamp": 1627886187629,
"content": "Error registering AppInfo mbean\njavax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=application_1626247520347_2075\n\tat com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)\n\tat com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)\n\tat com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)\n\tat com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)\n\tat com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)\n\tat com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)\n\tat org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:426)\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)\n\tat org.apache.flink.metrics.kafka.KafkaReporter.open(KafkaReporter.java:107)\n\tat org.apache.flink.runtime.metrics.ReporterSetup.createReporterSetup(ReporterSetup.java:130)\n\tat org.apache.flink.runtime.metrics.ReporterSetup.lambda$setupReporters$1(ReporterSetup.java:239)\n\tat java.util.Optional.ifPresent(Optional.java:159)\n\tat org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:236)\n\tat org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:148)\n\tat org.apache.flink.runtime.taskexecutor.TaskManagerRunner.<init>(TaskManagerRunner.java:160)\n\tat org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:338)\n\tat org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerSecurely$3(TaskManagerRunner.java:362)\n\tat java.security.AccessController.doPrivileged(Native Method)\n\tat javax.security.auth.Subject.doAs(Subject.java:422)\n\tat org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)\n\tat org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)\n\tat org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:361)\n\tat org.apache.flink.yarn.YarnTaskExecutorRunner.runTaskManagerSecurely(YarnTaskExecutorRunner.java:90)\n\tat org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:70)\n",
"tags": {
"host_ip": "10.xx.xx.17",
"method_name": "checkState",
"level": "ERROR",
"file_name": "ConnectionState.java",
"line_number": "288",
"thread_name": "main-EventThread",
"container_type": "taskmanager",
"logger_name": "org.apache.flink.shaded.curator.org.apache.curator.ConnectionState",
"class_name": "org.apache.flink.shaded.curator.org.apache.curator.ConnectionState",
"app_id": "application_1626247520347_1831",
"host_name": "FAT-hadoopuat-69117.vm.dc01.tech",
"container_id": "container_e12_1626247520347_1831_01_000002"
}
}

通过 Flink SQL 将异常日志数据存储到 ES,按天索引:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
CREATE TABLE yarn_flink_warn_logs (
`source` STRING,
`id` STRING,
`timestamp` BIGINT,
`content` STRING,
`tags` ROW<host_ip STRING, method_name STRING, `level` STRING, `file_name` STRING, line_number STRING, thread_name STRING, container_type STRING, logger_name STRING, class_name STRING, app_id STRING, `host_name` STRING, container_id STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'yarn_flink_log',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'fat-kafka1.com.cn:9092,fat-kafka2.com.cn:9092,fat-kafka3.com.cn:9092',
'properties.group.id' = 'yarn_flink_warn_logs',
'scan.topic-partition-discovery.interval' = '10000 ms',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'source.parallelism' = '1'
);


CREATE TABLE yarn_flink_warn_logs_es (
`source` STRING,
`id` STRING,
`logTime` BIGINT,
`log_time` TIMESTAMP(3),
`content` STRING,
`tags` ROW<host_ip STRING, method_name STRING, `level` STRING, `file_name` STRING, line_number STRING, thread_name STRING, container_type STRING, logger_name STRING, class_name STRING, app_id STRING, `host_name` STRING, container_id STRING>
) WITH (
'connector' = 'elasticsearch-universal',
'hosts' = 'http://fat-search-es.cn:9200',
'index' = 'yarn_flink_warn_logs-{log_time|yyyy.MM.dd}',
'username' = 'test-admin',
'password' = 'test-admin',
'sink.parallelism' = '2',
'failure-handler' = 'ignore',
'sink.bulk-flush.max-actions' = '1000',
'sink.bulk-flush.max-size' = '5MB',
'sink.bulk-flush.interval' = '10'
);


insert into yarn_flink_warn_logs_es select `source`, `id`, `timestamp` as logTime, TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000,'yyyy-MM-dd HH:mm:ss')) AS log_time, content, tags from yarn_flink_warn_logs;

单个作业日志查看,可以通过 application_id/container_type/container_id 过滤,当然你可以看到上面自定义里面我们还加入了 taskId 和 taskName 的维度数据,这两个是我们实时计算平台的维度数据,也可以根据这两个进行过滤。

单条日志查看:

实时监控集群作业异常日志数据量

方便知道任务的异常日志情况,有的任务如果任务突然报出很多异常日志说明有抖动或者报错,对任务负责人和集群负责人都可以做一个提醒通知,起到预警作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
set table.exec.emit.early-fire.enabled=true;
set table.exec.emit.early-fire.delay=60000ms;
set table.exec.state.ttl=90000000ms;

CREATE TABLE yarn_flink_warn_logs (
`source` STRING,
`timestamp` BIGINT,
`tags` ROW<app_id STRING>,
windows_start_time as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000,'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR windows_start_time AS windows_start_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'yarn_flink_log',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'logs-kafka1.com.cn:9092,logs-kafka2.com.cn:9092,logs-kafka3.com.cn:9092',
'properties.group.id' = 'yarn_flink_warn_logs_monitor_board',
'scan.topic-partition-discovery.interval' = '10000 ms',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'source.parallelism' = '6'
);

CREATE TABLE printSink (
`source` STRING,
app_id STRING,
logCount BIGINT,
`window_start_time` TIMESTAMP(3),
PRIMARY KEY (app_id) NOT ENFORCED
) WITH (
'connector' = 'print',
'sink.parallelism' = '1'
);

INSERT INTO
printSink
SELECT
`source`,
`tags`.app_id as app_id ,
count(`tags`.app_id) as logTotalCount,
TUMBLE_START(windows_start_time, INTERVAL '1' day) as window_start_time
FROM
yarn_flink_warn_logs
GROUP BY TUMBLE(windows_start_time, INTERVAL '1' day), `tags`.app_id, `source`;

总结

本文从一个小需求(任务异常或者失败时排查问题不便)出发,开始调研业界常见的解决方案进行解决我们的需求,接着有对应的代码实现和效果展示。

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 背景
  2. 2. 方案选择
  3. 3. 自定义 Kafka Appender
    1. 3.1. 需求
    2. 3.2. 修改框架源码
    3. 3.3. KafkaAppenderCommon
    4. 3.4. 自定义 Log4j2 Kafka Appender
    5. 3.5. 自定义 Log4j Kafka Appender
  4. 4. Flink SQL 存储日志到 ES
  5. 5. 实时监控集群作业异常日志数据量
  6. 6. 总结