如何提高 Flink K8s 集群资源使用率?

问题

在 flink on k8s 默认提交作业的命令下,我们会指定作业的 JM/TM 的 CPU 和 Memory,最后作业生成的 pod 它的 CPU/Memory 的 request/limit 都是一样的资源,但是作业真实运行时使用的资源远达不到 limit 的值,这样就会造成机器资源浪费(水位不高,但是机器又不能再申请 pod)。

比如下面命令:(指定了 TM 的资源,未指定 JM 资源)

1
2
3
4
5
6
7
8
./bin/flink run-application -p 1 -t kubernetes-application  -c com.zhisheng.Test \
-Dkubernetes.cluster-id=flink-log-alert-test1 \
-Dtaskmanager.memory.process.size=6g \
-Djobmanager.memory.process.size=2g \
-Dkubernetes.jobmanager.cpu=0.5 \
-Dkubernetes.taskmanager.cpu=1 \
-Dtaskmanager.numberOfTaskSlots=1 \
....

JM:

TM:

解决方案

1、分别为 JM/TM 的 内存和 CPU 添加参数设置 request 和 limit,如果行得通的话,这种方式要增加 8 个参数才能满足需求,但因 Flink 内存模型使得单独设置内存的 request/limit 变得非常复杂,只能设置 CPU 参数,而且之前的参数也将变得不可以使用。

2、分别为 JM/TM 的 内存和 CPU 添加参数 limit 因子,用户配置的内存或者 CPU 的值默认为 request 的值,limit 因子必须 >= 1,这种方式需要增加四个参数,相比第一种方法这种方法较为简单,但是目前 YARN 集群用户的资源配置,大多数作业已经是有一定的资源浪费(申请的资源远大于实际使用的资源),如果使用该方式,用户作业无感迁移到 K8s 集群后,其实资源浪费问题并没有解决

3、分别为 JM/TM 的 内存和 CPU 添加参数 request 因子,用户配置的内存或者 CPU 的值默认为 limit 的值,request 因子必须 <= 1,我们可以根据生产的数据配置一个合理的值,比如为 0.5。这种方式同样需要增加四个参数,但是这种方法对比第二种带来的好处是,大多数用户作业的资源配置将会更合理,机器同等资源能运行更多的 pod,从而可以提高机器的资源水位

代码开发

1、在 KubernetesConfigOptions 增加配置

KubernetesConfigOptions.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// jobmanager
public static final ConfigOption<Double> JOB_MANAGER_CPU_REQUEST_FACTOR =
key("kubernetes.jobmanager.cpu.request-factor")
.doubleType()
.defaultValue(1.0)
.withDescription(
"The request factor of cpu used by job manager. "
+ "The resources request cpu will be set to cpu * request-factor.");

public static final ConfigOption<Double> JOB_MANAGER_MEMORY_REQUEST_FACTOR =
key("kubernetes.jobmanager.memory.request-factor")
.doubleType()
.defaultValue(1.0)
.withDescription(
"The request factor of memory used by job manager. "
+ "The resources request memory will be set to memory * request-factor.");


// taskmanager
public static final ConfigOption<Double> TASK_MANAGER_CPU_REQUEST_FACTOR =
key("kubernetes.taskmanager.cpu.request-factor")
.doubleType()
.defaultValue(1.0)
.withDescription(
"The request factor of cpu used by task manager. "
+ "The resources request cpu will be set to cpu * request-factor.");

public static final ConfigOption<Double> TASK_MANAGER_MEMORY_REQUEST_FACTOR =
key("kubernetes.taskmanager.memory.request-factor")
.doubleType()
.defaultValue(1.0)
.withDescription(
"The request factor of memory used by task manager. "
+ "The resources request memory will be set to memory * request-factor.");

2、在 KubernetesJobManagerParameters 和 KubernetesTaskManagerParameters 中分别提供获取参数的方法

KubernetesJobManagerParameters.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public double getJobManagerCPURequestFactor() {
final double requestFactor =
flinkConfig.getDouble(KubernetesConfigOptions.JOB_MANAGER_CPU_REQUEST_FACTOR);
checkArgument(
requestFactor <= 1,
"%s should be less than or equal to 1.",
KubernetesConfigOptions.JOB_MANAGER_CPU_REQUEST_FACTOR.key());
return requestFactor;
}

public double getJobManagerMemoryRequestFactor() {
final double requestFactor =
flinkConfig.getDouble(KubernetesConfigOptions.JOB_MANAGER_MEMORY_REQUEST_FACTOR);
checkArgument(
requestFactor <= 1,
"%s should be less than or equal to 1.",
KubernetesConfigOptions.JOB_MANAGER_MEMORY_REQUEST_FACTOR.key());
return requestFactor;
}

KubernetesTaskManagerParameters.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public double getTaskManagerCPURequestFactor() {
final double requestFactor =
flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU_REQUEST_FACTOR);
checkArgument(
requestFactor <= 1,
"%s should be less than or equal to 1.",
KubernetesConfigOptions.TASK_MANAGER_CPU_REQUEST_FACTOR.key());
return requestFactor;
}

public double getTaskManagerMemoryRequestFactor() {
final double requestFactor =
flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_MEMORY_REQUEST_FACTOR);
checkArgument(
requestFactor <= 1,
"%s should be less than or equal to 1.",
KubernetesConfigOptions.TASK_MANAGER_MEMORY_REQUEST_FACTOR.key());
return requestFactor;
}

3、KubernetesUtils.getResourceRequirements() 方法做如下改变,增加 request 因子参数

KubernetesUtils.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Get resource requirements from memory and cpu.
*
* @param mem Memory in mb.
* @param memoryRequestFactor Memory request factor.
* @param cpu cpu.
* @param cpuRequestFactor cpu request factor.
* @param externalResources external resources
* @return KubernetesResource requirements.
*/
public static ResourceRequirements getResourceRequirements(
int mem,
double memoryRequestFactor,
double cpu,
double cpuRequestFactor,
Map<String, Long> externalResources) {

//todo:cpu 和 内存分别设置一个因子,默认是 0.5,用户设置的资源配置为 limit;request = limit * 因子
final Quantity cpuQuantity = new Quantity(String.valueOf(cpu));
final Quantity cpuRequestQuantity = new Quantity(String.valueOf(cpu * cpuRequestFactor));
final Quantity memQuantity = new Quantity(mem + Constants.RESOURCE_UNIT_MB);
final Quantity memRequestQuantity =
new Quantity(((int) (mem * memoryRequestFactor)) + Constants.RESOURCE_UNIT_MB);

ResourceRequirementsBuilder resourceRequirementsBuilder = new ResourceRequirementsBuilder()
.addToRequests(Constants.RESOURCE_NAME_MEMORY, memRequestQuantity)
.addToRequests(Constants.RESOURCE_NAME_CPU, cpuRequestQuantity)
.addToLimits(Constants.RESOURCE_NAME_MEMORY, memQuantity)
.addToLimits(Constants.RESOURCE_NAME_CPU, cpuQuantity);

// Add the external resources to resource requirement.
for (Map.Entry<String, Long> externalResource: externalResources.entrySet()) {
final Quantity resourceQuantity = new Quantity(String.valueOf(externalResource.getValue()));
resourceRequirementsBuilder
.addToRequests(externalResource.getKey(), resourceQuantity)
.addToLimits(externalResource.getKey(), resourceQuantity);
LOG.info("Request external resource {} with config key {}.", resourceQuantity.getAmount(), externalResource.getKey());
}
return resourceRequirementsBuilder.build();
}

4、在 InitJobManagerDecorator 和 InitTaskManagerDecorator 调用上面方法的地方做相应的修改

最终效果

可以在 flink-conf.yaml 中定义配置如下:

1
2
3
4
kubernetes.jobmanager.cpu.request-factor: 0.5
kubernetes.jobmanager.memory.request-factor: 0.8
kubernetes.taskmanager.cpu.request-factor: 0.5
kubernetes.taskmanager.memory.request-factor: 0.8

当然,用户的作业提交参数中也可以使用上面的参数进行覆盖,最终效果如下:

关注我

微信公众号:zhisheng

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号(zhisheng)了,你可以回复关键字:Flink 即可无条件获取到。另外也可以加我微信 你可以加我的微信:yuanblog_tzs,探讨技术!

更多私密资料请加入知识星球!

Github 代码仓库

https://github.com/zhisheng17/flink-learning/

以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 问题
  2. 2. 解决方案
  3. 3. 最终效果
  4. 4. 关注我
  5. 5. Github 代码仓库