maven
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>${flinkcdc.vsersion}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc10</artifactId>
<version>19.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.vsersion}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
</dependencies>
代码
```public static void main(String[] args) throws Exception {
//反序列化定义
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("log.mining.strategy","online_catalog");
//Oracle数据源定义(ip、username、password、、、、、)
JdbcIncrementalSource oracleChangeEventSource = new OracleSourceBuilder()
.hostname("192.168.1.253")
.port(1521)
.databaseList("newdb")
.schemaList("payment")
.tableList("payment.order")
.username("payment")
.password("payment")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true)
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(3000L);
environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
environment.fromSource(oracleChangeEventSource,
WatermarkStrategy.noWatermarks(),
"OracleParallelSource")
.print()
.setParallelism(1);
environment.execute("Print Oracle Snapshot + RedoLog");
}
maven打包后,在webUI提交jar,并submit,执行失败,
![image.png](https://ucc.alicdn.com/pic/developer-ecology/m666eo5xcszli_2622060522c744c9b7b655e0ac915da7.png)
失败日志
```2023-10-24 20:07:25
org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=2000)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:301)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:618)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: OracleParallelSource -> Sink: Print to Std. Out' (operator cbc357ccb763df2852fee8c4fc7d55f2).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:600)
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:374)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:226)
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:405)
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:150)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator
at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:151)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:222)
... 8 more
Caused by: java.lang.RuntimeException: Failed to resolve Oracle database version
at io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:177)
at io.debezium.connector.oracle.OracleConnection.<init>(OracleConnection.java:87)
at io.debezium.connector.oracle.OracleConnection.<init>(OracleConnection.java:82)
at com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection(OracleConnectionUtils.java:61)
at com.ververica.cdc.connectors.oracle.source.OracleDialect.openJdbcConnection(OracleDialect.java:90)
at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:107)
at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:51)
at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:139)
... 9 more
Caused by: java.sql.SQLException: No suitable driver found for jdbc:oracle:thin:@192.168.1.253:1521/newecpss
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$0(JdbcConnection.java:192)
at io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:129)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:888)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:883)
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:636)
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510)
at io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:147)
... 16 more
看起来像是Flink执行失败了,原因是出现了全局失败。具体原因是"Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=2000)",表示恢复被固定延迟重试策略抑制,最大重试次数为3次,每次重试之间的间隔为2000毫秒。这可能是由于某些作业在执行过程中出现了严重的问题,导致无法恢复。建议你可以检查一下你的Flink配置文件,看看是否有错误的地方。另外,你也可以检查一下你的作业逻辑,看看是否有潜在的问题。
从日志信息中可以看出,Job执行失败的原因是由于执行失败后采用了固定延迟重试策略,并且已经达到了最大重试次数,因此恢复被抑制。建议检查您的代码逻辑,确保源数据的可用性,以及设置合理的重试策略。
从你提供的依赖来看,你已经包含了所有必要的依赖。Flink CDC Oracle的错误可能由多种原因引起,包括Oracle版本不匹配、Oracle配置问题、Flink配置问题等。
以下是一些可能的解决方案:
检查Oracle的版本是否与Flink CDC Oracle的版本相匹配。例如,如果你的Oracle是12c,那么你需要使用Flink CDC Oracle 12c的版本。
检查Oracle的配置,确保CDC功能已经启用。
检查Flink的配置,确保Flink正确连接到Oracle,并且Flink的并行度设置得当。
检查Flink的任务管理器和工作节点的资源分配,确保它们有足够的资源来处理CDC数据。
如果问题仍然存在,你可能需要查看Flink的日志,以获取更多的错误信息。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。