import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
case class TestClass2(name: String, money: Int, var count: Long, var flage: Boolean)
object CheckpointExample extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10000)
val checkpointConf = env.getCheckpointConfig
checkpointConf.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 应用cancel掉时候,Checkpoint不会被cleanup掉.
/* kafka init */
val orderProperties = new Properties()
//orderProperties.setProperty("bootstrap.servers", "172.24.80.110:9092")
orderProperties.setProperty("bootstrap.servers", "192.168.1.160:19092")
orderProperties.setProperty("group.id", "id1")
val orderKafkaConsumer = new FlinkKafkaConsumer011[String]("test_topic_h1", new SimpleStringSchema(), orderProperties)
val source = env.addSource(orderKafkaConsumer).uid("source")
val t1 = source.map(_.split(",")).map(x => TestClass2(x(0), x(1).toInt, x(2).toLong, false)).uid("map")
val t4 = t1.keyBy(_.name).sum("money")
t4.print()
env.execute("10086")
}
代码如上, 在kafka 输入
hb,40,0
hb,40,0
结果正确 3> TestClass2(hb,80,0,false)
但是 从Checkpoint恢复时,作业恢复不成功,
恢复命令 $FLINK_HOME/bin/flink run -s hdfs://hadoop:8020/flink/flink-checkpoints/b01fbd688ab62cb20e5fe75ca1b6fba0/chk-20/_metadata --class ${MAIN_CLASS} ${LIB_DIR}/${LIB_NAME}, 变量值是正确的.
报错:
2019-01-02 21:53:39,859 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to airflow (auth:SIMPLE)
2019-01-02 21:53:39,911 INFO org.apache.flink.client.cli.CliFrontend - Running 'run' command.
2019-01-02 21:53:39,917 INFO org.apache.flink.client.cli.CliFrontend - Building program from JAR file
2019-01-02 21:53:40,392 WARN org.apache.flink.configuration.Configuration - Config uses deprecated configuration key 'high-availability.zookeeper.storageDir' instead of proper key 'high-availability.storageDir'
2019-01-02 21:53:41,384 INFO org.apache.flink.runtime.blob.FileSystemBlobStore - Creating highly available BLOB storage directory at hdfs://qa-bigdata001.ecs.east1-e:8020/flink/recovery//default/blob
2019-01-02 21:53:41,525 INFO org.apache.flink.runtime.util.ZooKeeperUtils - Enforcing default ACL for ZK connections
2019-01-02 21:53:41,526 INFO org.apache.flink.runtime.util.ZooKeeperUtils - Using '/flink/default' as Zookeeper namespace.
2019-01-02 21:53:41,608 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
2019-01-02 21:53:41,617 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
2019-01-02 21:53:41,618 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:host.name=qa-bigdata004.ecs.east1-e
2019-01-02 21:53:41,618 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_77
2019-01-02 21:53:41,618 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle Corporation
2019-01-02 21:53:41,618 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.home=/usr/java/jdk1.8.0_77/jre
2019-01-02 21:53:41,618 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.class.path=/home/airflow/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/home/airflow/flink-1.5.0/lib/flink-shaded
-hadoop2-uber-1.5.0.jar:/home/airflow/flink-1.5.0/lib/log4j-1.2.17.jar:/home/airflow/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/home/airflow/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar::/etc/hadoop/conf:
2019-01-02 21:53:41,618 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2019-01-02 21:53:41,618 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/tmp
2019-01-02 21:53:41,618 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=<NA>
2019-01-02 21:53:41,618 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:os.name=Linux
2019-01-02 21:53:41,619 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:os.arch=amd64
2019-01-02 21:53:41,619 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:os.version=2.6.32-696.1.1.el6.x86_64
2019-01-02 21:53:41,619 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:user.name=airflow
2019-01-02 21:53:41,619 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:user.home=/home/airflow
2019-01-02 21:53:41,619 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/home/airflow/streaming-project/flink/flink-example
2019-01-02 21:53:41,620 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=172.24.80.87:2181,172.24.80.88:2181,172.24.80.89:2181, sessionTimeout=60000 watcher=org.apa
che.flink.shaded.curator.org.apache.curator.ConnectionState@5e01a982
2019-01-02 21:53:41,636 WARN org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specifi
ed JAAS configuration file: '/tmp/jaas-2931078451485661056.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
2019-01-02 21:53:41,639 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening socket connection to server 172.24.80.88/172.24.80.88:2181
2019-01-02 21:53:41,639 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed
2019-01-02 21:53:41,640 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket connection established to 172.24.80.88/172.24.80.88:2181, initiating session
2019-01-02 21:53:41,649 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session establishment complete on server 172.24.80.88/172.24.80.88:2181, sessionid = 0x2653fad5ca288a9, negotiated timeout = 40000
2019-01-02 21:53:41,651 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
2019-01-02 21:53:41,935 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
2019-01-02 21:53:41,939 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-01-02 21:53:41,967 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-01-02 21:53:41,967 INFO org.apache.flink.client.cli.CliFrontend - Starting execution of program
2019-01-02 21:53:41,974 INFO org.apache.flink.client.program.rest.RestClusterClient - Starting program in interactive mode (detached: false)
2019-01-02 21:53:42,398 INFO org.apache.flink.client.program.rest.RestClusterClient - Submitting job 64e1046a98a0be642d99e9e0a16bfb66 (detached: false).
2019-01-02 21:53:42,399 INFO org.apache.flink.client.program.rest.RestClusterClient - Requesting blob server port.
2019-01-02 21:54:12,443 INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint.
2019-01-02 21:54:12,449 INFO org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete.
2019-01-02 21:54:12,449 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-01-02 21:54:12,451 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-01-02 21:54:12,452 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting
2019-01-02 21:54:12,455 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Session: 0x2653fad5ca288a9 closed
2019-01-02 21:54:12,455 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x2653fad5ca288a9
2019-01-02 21:54:12,456 ERROR org.apache.flink.client.cli.CliFrontend - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result.
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.dianwoda.bigdata.flink.example.CheckpointExample$.delayedEndpoint$com$dianwoda$bigdata$flink$example$CheckpointExample$1(CheckpointExample.scala:65)
at com.dianwoda.bigdata.flink.example.CheckpointExample$delayedInit$body.apply(CheckpointExample.scala:38)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App
$$
anonfun$main$1.apply(App.scala:76)
at scala.App
$$
anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.dianwoda.bigdata.flink.example.CheckpointExample$.main(CheckpointExample.scala:38)
at com.dianwoda.bigdata.flink.example.CheckpointExample.main(CheckpointExample.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$5(RestClusterClient.java:357)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1088)
at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
at java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1020)
... 10 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable.
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
... 10 more
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
... 10 more
Caused by: java.util.concurrent.TimeoutException
... 8 more
求解
看你的原义是要保存位点后恢复,要先保存 savepoint,参考:https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#triggering-savepoints
bin/flink savepoint :jobId [:targetDirectory]
bin/flink run -s :savepointPath [:runArgs]
如果是从checkpoint恢复,这个是自动的,只要设置了checkpoint path即可,不需要用 flink run -s
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。