宕机一台机器,结果一百多个 Flink 作业挂了

背景

因宕机了一台物理机器,实时集群不少作业发生 failover,其中大部分作业都能 failover 成功,某个部门的部分作业一直在 failover,始终未成功,到 WebUI 查看作业异常日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2021-11-09 16:01:11
java.util.concurrent.CompletionException: java.lang.reflect.UndeclaredThrowableException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.UndeclaredThrowableException
at com.sun.proxy.$Proxy54.submitTask(Unknown Source)
at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:72)
at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$10(Execution.java:756)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 7 more
Caused by: java.io.IOException: The rpc invocation size 56424326 exceeds the maximum akka framesize.
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
... 11 more

解决异常过程

从上面的异常日志中我们提取到关键信息:

1
Caused by: java.io.IOException: The rpc invocation size 56424326 exceeds the maximum akka framesize.

看起来是 RPC 的消息大小超过了默认的 akka framesize 的最大值了,所以我们来了解一下这个值的默认值,从 官网 我们可以看的到该值的默认大小为 “10485760b”,并且该参数的描述为:

1
Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier.

翻译过来的意思就是:这个参数是 JobManager 和 TaskManagers 之间通信允许的最大消息大小,如果 Flink 作业因为通信消息大小超过了该值,你可以通过增加该值的大小来解决,该参数需要指定一个单位。

分析原因

Flink 使用 Akka 作为组件(JobManager/TaskManager/ResourceManager)之间的 RPC 框架,在 JobManager 和 TaskManagers 之间发送的消息的最大大小默认为 10485760b,如果消息超过这个限制就会失败,报错。这个可以看下抛出异常处的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected RpcInvocation createRpcInvocationMessage(String methodName, Class<?>[] parameterTypes, Object[] args) throws IOException {
Object rpcInvocation;
if (this.isLocal) {
rpcInvocation = new LocalRpcInvocation(methodName, parameterTypes, args);
} else {
try {
RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(methodName, parameterTypes, args);
if (remoteRpcInvocation.getSize() > this.maximumFramesize) {
// 异常所在位置
throw new IOException("The rpc invocation size exceeds the maximum akka framesize.");
}

rpcInvocation = remoteRpcInvocation;
} catch (IOException var6) {
LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", var6);
throw var6;
}
}

return (RpcInvocation)rpcInvocation;
}

至于为什么 JobManager 和 TaskManager 之间的 RPC 消息大小会如此之大,初步的解释是在 task 出现异常之后,它需要调用 updateTaskExecutionState(TaskExecutionState,taskExecutionState) 这个 RPC 接口去通知 Flink Jobmanager 去改变对应 task 的状态并且重启 task。但是呢,taskExecutionState 这个参数里面有个 error 属性,当我的 task 打出来的错误栈太多的时候,在序列化的之后超过了 rpc 接口要求的最大数据大小(也就是 maximum akka framesize),导致调用 updateTaskExecutionState 这个 rpc 接口失败,Jobmanager 无法获知这个 task 已经处于 fail 的状态,也无法重启,然后就导致了一系列连锁反应。

解决办法

任务停止,在 flink-conf.yaml 中加入 akka.framesize 参数,调大该值。

1
akka.framesize: "62914560b"

然后将任务重启,可以观察 Jobmanager Configration 看看参数是否生效。

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 背景
  2. 2. 解决异常过程
  3. 3. 分析原因
  4. 4. 解决办法