Flink on k8s native application模式踩坑记录
flink on k8s application模式部署踩坑
·
启动命令
bin/flink run-application \
--target kubernetes-application \
-c com.mypackage.MyFlinkJob \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=myflinkimage/flink-k8s:1.13.1.1 \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dclassloader.resolve-order=parent-first \
local:///opt/flink/usrlib/my-flink-job.jar \
--offset 1661356800000 --startOffset timestamp --flag test --servers 127.0.0.1:9092 --group my_group1 --enable_checkpoint false
Dockerfile
FROM flink:1.13.6-scala_2.12-java8
RUN mkdir -p $FLINK_HOME/usrlib
COPY lib/flink-connector-jdbc_2.12-1.13.1.jar $FLINK_HOME/lib
COPY lib/flink-connector-kafka_2.12-1.13.1.jar $FLINK_HOME/lib
COPY lib/kafka_2.12-2.4.1.jar $FLINK_HOME/lib
COPY lib/kafka-clients-2.4.1.jar $FLINK_HOME/lib
COPY my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
COPY flink-conf.yaml $FLINK_HOME/conf
CMD ["/bin/bash"]
几个问题
- 镜像问题,官方文档中使用的基础镜像是
FROM flink
,该基础镜像的版本是1.14.2,因为代码使用的是1.13.1,故此需要指定镜像版本。而且在flink镜像中,存在两个版本,一个linux/arm64/v8
一个linux/arm64
,默认使用拉取的是linux/arm64/v8
,该版本打包好提交到k8s中会有如下问题
standard_init_linux.go:207: exec user process caused "exec format error"
解决方式是在build镜像的时候加上一个参数--platform linux/amd64
.还是推荐使用指定版本的基础镜像
- 依赖jar包问题
在打包镜像时候,除了日常依赖的jarflink-connector-jdbc_2.12-1.13.1.jar
、flink-connector-kafka_2.12-1.13.1.jar
,还需要增加kafka-clients-2.4.1.jar
,不然会有如下问题
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_302]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_302]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_302]
at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.1.jar:1.13.1]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
... 11 more
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:139) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:108) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at com.aibee.stream.bi.common.source.KafkaSourceV2.getConsumer(KafkaSourceV2.scala:25) ~[?:?]
at com.aibee.stream.bi.common.source.KafkaSourceV2.getConsumer(KafkaSourceV2.scala:44) ~[?:?]
at com.aibee.stream.bi.mall.task.BaseCorrectionStatistic.getOnlineCorrPersonTimesStream(BaseCorrectionStatistic.scala:105) ~[?:?]
at com.aibee.stream.bi.mall.task.DailyActionPersonTimesStatistic$.main(DailyActionPersonTimesStatistic.scala:53) ~[?:?]
at com.aibee.stream.bi.mall.task.DailyActionPersonTimesStatistic.main(DailyActionPersonTimesStatistic.scala) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_302]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_302]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_302]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_302]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 10 more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_302]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_302]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[?:1.8.0_302]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_302]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:139) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:108) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at com.aibee.stream.bi.common.source.KafkaSourceV2.getConsumer(KafkaSourceV2.scala:25) ~[?:?]
at com.aibee.stream.bi.common.source.KafkaSourceV2.getConsumer(KafkaSourceV2.scala:44) ~[?:?]
at com.aibee.stream.bi.mall.task.BaseCorrectionStatistic.getOnlineCorrPersonTimesStream(BaseCorrectionStatistic.scala:105) ~[?:?]
at com.aibee.stream.bi.mall.task.DailyActionPersonTimesStatistic$.main(DailyActionPersonTimesStatistic.scala:53) ~[?:?]
at com.aibee.stream.bi.mall.task.DailyActionPersonTimesStatistic.main(DailyActionPersonTimesStatistic.scala) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_302]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_302]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_302]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_302]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 10 more
2022-09-06 03:12:57,645 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: Application failed unexpectedly.
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAndShutdownClusterAsync$0(ApplicationDispatcherBootstrap.java:170) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_302]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_302]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_302]
at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.1.jar:1.13.1]
Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_302]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_302]
... 13 more
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
... 11 more
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:139) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:108) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at com.aibee.stream.bi.common.source.KafkaSourceV2.getConsumer(KafkaSourceV2.scala:25) ~[?:?]
at com.aibee.stream.bi.common.source.KafkaSourceV2.getConsumer(KafkaSourceV2.scala:44) ~[?:?]
at com.aibee.stream.bi.mall.task.BaseCorrectionStatistic.getOnlineCorrPersonTimesStream(BaseCorrectionStatistic.scala:105) ~[?:?]
at com.aibee.stream.bi.mall.task.DailyActionPersonTimesStatistic$.main(DailyActionPersonTimesStatistic.scala:53) ~[?:?]
at com.aibee.stream.bi.mall.task.DailyActionPersonTimesStatistic.main(DailyActionPersonTimesStatistic.scala) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_302]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_302]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_302]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_302]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 10 more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_302]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_302]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[?:1.8.0_302]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_302]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:139) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:108) ~[flink-connector-kafka_2.12-1.13.1.jar:1.13.1]
at com.aibee.stream.bi.common.source.KafkaSourceV2.getConsumer(KafkaSourceV2.scala:25) ~[?:?]
at com.aibee.stream.bi.common.source.KafkaSourceV2.getConsumer(KafkaSourceV2.scala:44) ~[?:?]
at com.aibee.stream.bi.mall.task.BaseCorrectionStatistic.getOnlineCorrPersonTimesStream(BaseCorrectionStatistic.scala:105) ~[?:?]
at com.aibee.stream.bi.mall.task.DailyActionPersonTimesStatistic$.main(DailyActionPersonTimesStatistic.scala:53) ~[?:?]
at com.aibee.stream.bi.mall.task.DailyActionPersonTimesStatistic.main(DailyActionPersonTimesStatistic.scala) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_302]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_302]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_302]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_302]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 10 more
- 提交参数问题
任务提交到k8s中,任务读取的flink-conf.yaml并不是镜像中conf/flink-conf.yaml的内容,而是根据提交命令中的参数重新生成的,故此一些配置需要通过启动命令-D的方式传递给任务。一个问题是在任务在启动时会有如下问题,解决该问题需要在启动命令中用-D方式增加配置classloader.resolve-order=parent-first
2022-09-05 10:35:40,668 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job mall-full-dailyactionpersontimesstatistic-prod-wanda (64a0e7c3f32da11ad514877659408034) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_342]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_342]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_342]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.6.jar:1.13.6]
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:338) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:159) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:1.8.0_342]
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[?:1.8.0_342]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2437) ~[?:1.8.0_342]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[?:1.8.0_342]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[?:1.8.0_342]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[?:1.8.0_342]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) ~[?:1.8.0_342]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[?:1.8.0_342]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[?:1.8.0_342]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[?:1.8.0_342]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) ~[?:1.8.0_342]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) ~[?:1.8.0_342]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) ~[?:1.8.0_342]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) ~[?:1.8.0_342]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) ~[?:1.8.0_342]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) ~[?:1.8.0_342]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:159) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
更多推荐
已为社区贡献2条内容
所有评论(0)