I have a standalone ZooKeeper-HA-cluster.
First restart ZK cluster.
Second Submit 31 Jobs with Command-Line , ./bin/flink run
Then I caught an exception. Specifically, there were 4 jobs that failed to submit.
18:55:20,243 - [Flink-DispatcherRestEndpoint-thread-2] - [WARN ] - [org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(line:290)] - Could not create remote rpc invocation message. Failing rpc invocation because...
java.io.IOException: The rpc invocation size 12532388 exceeds the maximum akka framesize.
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:283) ~[flink-dist_2.12-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:210) ~[flink-dist_2.12-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:133) ~[flink-dist_2.12-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:88) ~[flink-dist_2.12-1.11.6.jar:1.11.6]
at com.sun.proxy.$Proxy36.submitJob(Unknown Source) ~[?:?]
at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$handleRequest$0(JobSubmitHandler.java:123) ~[flink-dist_2.12-1.11.6.jar:1.11.6]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) [?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) [?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
18:55:20,244 - [Flink-DispatcherRestEndpoint-thread-2] - [ERROR] - [org.apache.flink.runtime.rest.handler.AbstractHandler.handleException(line:257)] - Unhandled exception.
java.lang.reflect.UndeclaredThrowableException: null
at com.sun.proxy.$Proxy36.submitJob(Unknown Source) ~[?:?]
at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$handleRequest$0(JobSubmitHandler.java:123) ~[flink-dist_2.12-1.11.6.jar:1.11.6]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) [?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.io.IOException: The rpc invocation size 12532388 exceeds the maximum akka framesize.
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:283) ~[flink-dist_2.12-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:210) ~[flink-dist_2.12-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:133) ~[flink-dist_2.12-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:88) ~[flink-dist_2.12-1.11.6.jar:1.11.6]
... 11 more
The logs showed some similarities between node A and node B. I believe that Some jobs have been submitted to Node A, whereas some other jobs have been submitted to Node B.
18:54:51,441 - [main-EventThread] - [INFO ] - [org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.startJobMaster(line:321)] - JobManager runner for job PrepareMblQuotEvent (dfdcb0d6f20c83b0fe99473f3103278a) was granted leadership with session id c11603ca-1313-4066-9a21-51e02f0df057 at akka.tcp://flink@Node_A:39715/user/rpc/jobmanager_2.
18:54:51,632 - [main-EventThread] - [INFO ] - [org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.startJobMaster(line:321)] - JobManager runner for job PrepareErrOrderEvent (5918fedb51dd000c017db9cb381fbb1c) was granted leadership with session id 85aef144-05c2-473d-b479-0c8c74d089f9 at akka.tcp://flink@Node_B:46746/user/rpc/jobmanager_5.
The logs of Node A indicate that this node is the leader node.
17:19:45,433 - [flink-akka.actor.default-dispatcher-22] - [INFO ] - [org.apache.flink.runtime.resourcemanager.ResourceManager.tryAcceptLeadership(line:1118)] - ResourceManager akka.tcp://flink@Node_A:46746/user/rpc/resourcemanager_0 was granted leadership with fencing token ad84d46e902e0cf6da92179447af4e00
17:19:45,434 - [main-EventThread] - [INFO ] - [org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.grantLeadership(line:931)] - http://Node_A:xxx was granted leadership with leaderSessionID=f60df688-372d-416b-a965-989a59b37feb
17:19:45,437 - [flink-akka.actor.default-dispatcher-22] - [INFO ] - [org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.start(line:287)] - Starting the SlotManager.
17:19:45,480 - [main-EventThread] - [INFO ] - [org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.startInternal(line:97)] - Start SessionDispatcherLeaderProcess.
17:19:45,489 - [cluster-io-thread-1] - [INFO ] - [org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(line:232)] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 .
17:19:45,495 - [flink-akka.actor.default-dispatcher-23] - [INFO ] - [org.apache.flink.runtime.resourcemanager.ResourceManager.registerTaskExecutorInternal(line:891)] - Registering TaskManager with ResourceID XXXXXX (akka.tcp://[email protected]:XX/user/rpc/taskmanager_0) at ResourceManager
The logs of Nodes A, B, C, and D all contain the following information Start SessionDispatcherLeaderProcess
, but only Nodes C and D have the additional information Stopping SessionDispatcherLeaderProcess
.
17:19:44,577 - [main-EventThread] - [INFO ] - [org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.startInternal(line:97)] - Start SessionDispatcherLeaderProcess.
17:19:44,580 - [Curator-ConnectionStateManager-0] - [INFO ] - [org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.closeInternal(line:134)] - Stopping SessionDispatcherLeaderProcess.
17:19:44,590 - [cluster-io-thread-3] - [INFO ] - [org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(line:232)] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 .
I suspect that Node B mistakenly believes itself to be the leader. Nodes A and B are simultaneously modifying the content of a certain node(rest_server_lock
) in Zookeeper.
Whether this is a bug of Flink.
Whether this is a bug of Flink.