记录一次因内存设置导致flink运行时发生的错误

Apache Flink 是一个在有界数据流和无界数据流上进行有状态计算的分布式处理引擎和框架。Flink 设计旨在所有常见的集群环境中运行,以任意规模和内存级速度执行计算。Flink CDC 是一个基于流的数据集成工具,支持从数据库变更日志中(如binlog)读取记录进行无锁的增量数据处理。处理任务支持通过 Flink Connector 读取数据,也可以使用 Flink CDC Connector 读取数据。

错误场景描述:业务上对于开启了 binlog 的 MySQL 数据库默认使用 Flink CDC Connector 来读取数据,未开启 binlog 的数据库使用内置的 Flink Connector 读取数据。测试环境上的某个未开启 binlog 的 MySQL 数据库的任务平时可以正常运行,在没有修改代码的情况下突然无法启动了。

排查最后的结果:由于数据量较大,需要的内存较多,但 TaskManager 设置的内存过小,频繁GC导致任务无法正常运行。

排查过程

检查其他任务之后发现只有这个未开启 binlog 的数据库无法正常启动,于是第一时间到 Hadoop Yarn 上查看容器的运行日志,发现抛出的错误与 checkpoint 有关:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2024-03-27 16:37:06,439 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: ****[4] (3/4) (b32683d65134001706acbda4898a4828_5e1cbbbc1a9055ae0faa9b9a2f8da913_2_0) switched from INITIALIZING to RUNNING.
2024-03-27 16:37:07,878 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1711528627828 for job 5114bd1336c34f02ea59e8f48f7ad8c8.
2024-03-27 16:37:08,436 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 1 for job 5114bd1336c34f02ea59e8f48f7ad8c8 (0 bytes, checkpointDuration=604 ms, finalizationTime=4 ms).
2024-03-27 16:37:12,826 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1711528632825 for job 5114bd1336c34f02ea59e8f48f7ad8c8.
2024-03-27 16:37:12,862 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 2 for job 5114bd1336c34f02ea59e8f48f7ad8c8 (0 bytes, checkpointDuration=35 ms, finalizationTime=2 ms).
2024-03-27 16:37:17,838 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 3 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1711528637825 for job 5114bd1336c34f02ea59e8f48f7ad8c8.
2024-03-27 16:37:25,669 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 3 for job 5114bd1336c34f02ea59e8f48f7ad8c8 (0 bytes, checkpointDuration=7843 ms, finalizationTime=1 ms).
2024-03-27 16:37:26,169 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 4 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1711528646169 for job 5114bd1336c34f02ea59e8f48f7ad8c8.
2024-03-27 16:37:35,268 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 4 for job 5114bd1336c34f02ea59e8f48f7ad8c8 (0 bytes, checkpointDuration=9098 ms, finalizationTime=0 ms).
2024-03-27 16:37:35,782 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 5 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1711528655772 for job 5114bd1336c34f02ea59e8f48f7ad8c8.
2024-03-27 16:37:45,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering Checkpoint 5 for job 5114bd1336c34f02ea59e8f48f7ad8c8 failed due to java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.triggerCheckpoint(ExecutionAttemptID, long, long, CheckpointOptions))] at recipient [akka.tcp://flink@node01:3546/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
2024-03-27 16:37:45,811 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 5 for job 5114bd1336c34f02ea59e8f48f7ad8c8. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint failure.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpointRequest$10(CheckpointCoordinator.java:706) ~[flink-dist-1.16.1.jar:1.16.1]
...
2024-03-27 16:37:47,676 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 7 for job 5114bd1336c34f02ea59e8f48f7ad8c8. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1926) ~[flink-dist-1.16.1.jar:1.16.1]
...
2024-03-27 16:37:57,219 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Received late message for now expired checkpoint attempt 7 from task b32683d65134001706acbda4898a4828_5e1cbbbc1a9055ae0faa9b9a2f8da913_3_0 of job 5114bd1336c34f02ea59e8f48f7ad8c8 at container_1710904673777_0069_01_000002 @ node01 (dataPort=14516).

从日志中可以看到在等待了相当长的一段时间后才收到了过期的 checkpoint ,任务中设置了每10秒执行一次 checkpoint 的保存,保存的超时时间为120秒,日志中保存 checkpoint 的时间虽然较长,但抛出异常的时间也远远没有达到设置的超时时间。调整代码后针对未开启数据库日志的数据源不主动开启 checkpoint。

1
2
3
4
5
6
7
8
9
10
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
if(enableBinlog) {
environment.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
checkpointConfig.setMinPauseBetweenCheckpoints(1000);
checkpointConfig.setCheckpointTimeout(120000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 取消flink任务时保留检查点状态,以便恢复到指定的检查点,最终需要手动删除检查点元数据和实际程序状态
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}

发布代码到测试环境后发现由于 flink 的配置文件(./conf/flink-conf.yaml)中声明了 checkpoint 的相关参数,即使代码中不主动声明开启 checkpoint,checkpoint 默认也是开启的,只是超时和保存策略设置的稍有不同,所以日志中仍然出现了 checkpoint 超时的异常。

1
2
3
4
5
6
7
8
9
10
11
12
execution.checkpointing.interval: 5000
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: hashmap
state.checkpoints.dir: hdfs://node01:8020/flink/checkpoints
state.savepoints.dir: hdfs://node01:8020/flink/checkpoints
execution.checkpointing.timeout: 600000
execution.checkpointing.min-pause: 500
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.unaligned: true
execution.checkpointing.aligned-checkpoint-timeout: 30 s
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true

再次调整代码,主动禁用了 checkpoint。

1
2
3
4
5
6
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
if(enableBinlog) {
...
} else {
environment.getCheckpointConfig().disableCheckpointing();
}

调整完代码后再次执行,一切就变得奇怪起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2024-03-27 17:38:00,868 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
...
2024-03-27 17:38:07,983 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: ****[4] (2/4) (25fd5975f0d7c9d7351722295793fee4_5e1cbbbc1a9055ae0faa9b9a2f8da913_1_0) switched from INITIALIZING to RUNNING.
2024-03-27 17:38:49,552 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job **** (5721b22dfe821f225c9d5b06996b7ebb) switched from state RUNNING to CANCELLING.
...
2024-03-27 17:38:59,415 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: ****M[4] (2/4) (25fd5975f0d7c9d7351722295793fee4_5e1cbbbc1a9055ae0faa9b9a2f8da913_1_0) switched from CANCELING to CANCELED.
...
2024-03-27 17:40:24,092 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Heartbeat of TaskManager with id container_1710904673777_0074_01_000002(node01:6683) timed out.
...
2024-03-27 17:40:24,106 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 5721b22dfe821f225c9d5b06996b7ebb reached terminal state CANCELED.
2024-03-27 17:40:24,494 INFO org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application CANCELED:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: CANCELED
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$7(ApplicationDispatcherBootstrap.java:403) ~[flink-dist-1.16.1.jar:1.16.1]
...

可以看到日志中输出 checkpoint 已经被禁用了,但任务从初始化 INITIALIZING 状态切换到运行 RUNNING 状态后,过了一段时间突然又被切换到取消 CANCELLING 状态了,这中间没有任何的异常输出,再等待一段时间的重试后,容器才抛出异常结束。

再多次的尝试执行任务,进入到 Flink Dashboard 查看运行状态,此时才真相大白,查看发现 TaskManager 占用的内存较高,同时在 JVM->TM GC count 中发现 TaskManager 的 GC 较为频繁。由于在执行 GC 时会出现 Stop-The-World 导致应用程序的其他所有线程都被挂起无法响应,flink 任务无法向 ResourceManager 发送心跳包证明自己是存活的,Yarn 认为应用可能意外退出了,在执行重试机制之后仍然没有改变就选择了取消任务,根本原因是随着时间推移,表内记录的数据增加,处理数据所需要的内存也在逐渐增加,直到今天发生GC后STW时间超出可接受的暂停时间。应谨慎设置在开启任务时传入的参数 -Dtaskmanager.memory.process.size

其实认真查看日志,也可以看到相关的提示 Heartbeat of TaskManager with id container_1710904673777_0074_01_000002(node01:6683) timed out. ,不过由于这条输出的日志不是以抛出异常的格式输出,在众多日志信息中被忽略了,导致排查花费了较多的时间。

另外,尝试将表结构+数据存放到开启了 binlog 的 MySQL 数据库上,使用 Flink CDC Connector 读取数据,即使配置的 TaskManager 内存较小任务也可以正常执行。在这个月 (2024.03) Apache Flink 宣布 Flink CDC 正式加入了社区的维护,如果不是无法修改数据库配置的话,服务器内存不足的情况下尽量开启日志使用 Flink CDC Connector 读取数据以保证任务可以正常运行。