运行环境
MySQL:5.7x
StarRocks:2.5
FLink:1.18.0
FLink-CDC:3.0
配置文件如下
source:
type: mysql
hostname: xxx
port: 3306
username: xxx
password: xxx
tables: xxx_db.\.*
server-id: 1
server-time-zone: Asia/Shanghai
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://xxx:9030
load-url: xxx:8030
username: root
password: ""
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to StarRocks
parallelism: 1
DDL问题:
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: temp of AddColumnEvent is already existed
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at com.ververica.cdc.runtime.operators.schema.SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:123)
... 30 more
Caused by: java.lang.IllegalArgumentException: temp of AddColumnEvent is already existed
at com.ververica.cdc.common.utils.SchemaUtils.applyAddColumnEvent(SchemaUtils.java:73)
at com.ververica.cdc.common.utils.SchemaUtils.applySchemaChangeEvent(SchemaUtils.java:53)
at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaManager.applySchemaChange(SchemaManager.java:113)
at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.handleSchemaChangeRequest(SchemaRegistryRequestHandler.java:102)
at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry.handleCoordinationRequest(SchemaRegistry.java:157)
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(DefaultOperatorCoordinatorHandler.java:143)
at org.apache.flink.runtime.scheduler.SchedulerBase.deliverCoordinationRequestToCoordinator(SchedulerBase.java:1070)
at org.apache.flink.runtime.jobmaster.JobMaster.sendRequestToCoordinator(JobMaster.java:616)
at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
DML问题:
由于您没有提供具体的问题描述,我无法为您提供针对性的解决方案。但是,我可以给您一些建议来解决FLink CDC 3.0同步MySQL数据到StarRocks时可能遇到的问题:
确保您的MySQL和StarRocks版本与FLink CDC 3.0兼容。根据官方文档,FLink CDC 3.0支持的MySQL版本为5.7.x,而StarRocks 2.5应该与FLink CDC 3.0兼容。
检查您的Flink配置文件(flink-conf.yaml)中是否包含正确的连接器配置。例如,确保您已经添加了以下配置:
table.exec.source.cdc-events-duplicate: true
CREATE TABLE mysql_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'user'
);
CREATE TABLE starrocks_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'starrocks',
'fenodes' = 'localhost:8030',
'table.identifier' = 'test_db.user',
'username' = 'root',
'password' = '123456'
);
如果您能提供更多关于问题的详细信息,我将更好地帮助您解决问题。
这个问题是因为任务没有使用 savepoint 重启。因此重启后会再次进行全量同步,此时insert、update 的操作数据都能够读取到最新的结果,而 delete 数据则因源表不再包含这条数据,下游不会做对应处理。数据仍然保存。
解决办法是通过 flink savepoint 停止原来的任务,如执行下面命令:
flink stop [--savepointPath <pathToSavepoint>] <jobID>
并且在 flink-conf.yaml 里设置 execution.savepoint.path 指定 savepoint 路径,重新启动 cdc 任务。这样 cdc 任务会从重启前的位点,逐条处理binlog,正确处理 delete 事件。
Flink 3.0 还不支持通过 -s/--savepoint 参数指定savepoint 路径。
这个问题可能是由于Flink CDC在处理删除字段时出现了问题。你可以尝试以下方法解决这个问题:
检查FLink CDC的版本,确保它是最新的。如果不是,请升级到最新版本,看看问题是否得到解决。
如果问题仍然存在,你可以尝试在FLink CDC的GitHub仓库中提交一个issue,详细描述你遇到的问题。这样,FLink团队可能会关注这个问题,并在后续版本中修复它。
作为临时解决方案,你可以尝试在同步任务中使用DELETE
操作符,而不是直接删除数据。这样,FLink CDC应该能够正确处理删除操作。例如:
DELETE FROM your_table WHERE some_condition;
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。