开发者社区> 问答> 正文

v1.1.3-alpha-3 HA仍然存在问题(回切失败,消息重复)

环境信息

canal version 1.1.3-alpha-3 mysql version 5.7.22

问题描述

先说结论. 1.使用./bin/stop.sh 测试主宕机, HA切换成功, kafka可以收到数据, 测试回切,会有重复数据产生 2.使用 mv conf/instance/instance.properties conf/instance/instance.properties.bak 测试主宕机, HA切换成功, 测试回切,切换失败 kafka无法接收到数据

步骤重现

配置

[root@node004035 conf]# cat canal.properties ################################################# ######### common argument ############# ################################################# canal.id = 004035 canal.ip = 192.168.x.35 canal.port = 11111 canal.metrics.pull.port = 11112 canal.zkServers = 192.168.x.108:2181,192.168.x.110:2181,192.168.x.112:2181

flush data to zk

canal.zookeeper.flush.period = 1000 canal.withoutNetty = true

tcp, kafka, RocketMQ

canal.serverMode = kafka

flush meta cursor/parse position to file

canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000

memory store RingBuffer size, should be Math.pow(2,n)

canal.instance.memory.buffer.size = 16384

memory store RingBuffer used memory unit size , default 1kb

canal.instance.memory.buffer.memunit = 1024

meory store gets mode used MEMSIZE or ITEMSIZE

canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true

detecing config

canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false

support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery

canal.instance.transaction.size = 1024

mysql fallback connected to new master should fallback times

canal.instance.fallbackIntervalInSeconds = 60

network config

canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30

binlog filter config

canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false

binlog format/image check

canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

binlog ddl isolation

canal.instance.get.ddl.isolation = false

parallel parser config

canal.instance.parser.parallel = true

concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()

#canal.instance.parser.parallelThreadSize = 16

disruptor ringbuffer size, must be power of 2

canal.instance.parser.parallelBufferSize = 256

table meta tsdb info

canal.instance.tsdb.enable = false canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal

dump snapshot interval, default 24 hour

canal.instance.tsdb.snapshot.interval = 24

purge snapshot expire , default 360 hour(15 days)

canal.instance.tsdb.snapshot.expire = 360

aliyun ak/sk , support rds/mq

canal.aliyun.accesskey = canal.aliyun.secretkey =

################################################# ######### destinations ############# ################################################# canal.destinations =

conf root dir

canal.conf.dir = ../conf

auto scan instance dir add/remove and start/stop instance

canal.auto.scan = true canal.auto.scan.interval = 5

#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring canal.instance.global.lazy = false #canal.instance.global.manager.address = 127.0.0.1:1099 #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml #canal.instance.global.spring.xml = classpath:spring/file-instance.xml canal.instance.global.spring.xml = classpath:spring/default-instance.xml

################################################## ######### MQ ############# ################################################## canal.mq.servers = 192.168.x.108:9092,192.168.x.110:9092,192.168.x.112:9092 canal.mq.retries = 0 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all canal.mq.extraParams.enable.idempotence=true canal.mq.extraParams.max.in.flight.requests.per.connection=1 [root@node004035 conf]# cat data_checksum/instance.properties #################################################

mysql serverId , v1.0.26+ will autoGen

canal.instance.mysql.slaveId=0

canal.instance.mysql.slaveId=1234

enable gtid use true/false

canal.instance.gtidon=true

position info

canal.instance.master.address=192.168.x.112:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid=

rds oss binlog

#canal.instance.rds.accesskey= #canal.instance.rds.secretkey= #canal.instance.rds.instanceId=

table meta tsdb info

canal.instance.tsdb.enable=false #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid=

username/password

canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset=UTF-8 canal.instance.defaultDatabaseName=information_schema

enable druid Decrypt database password

canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

table regex

canal.instance.filter.regex=.\.. #canal.instance.filter.regex=.*\.t_time

table black regex

canal.instance.filter.black.regex=.\._._new

mq config

canal.mq.topic=data_checksum2

dynamic topic route by table regex

#canal.mq.dynamicTopic=.,mytest\..,mytest2.user #canal.mq.partition=0

hash partition config

#canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.\.. ################################################# [root@node004035 conf]# cat fanboshi/instance.properties #################################################

mysql serverId , v1.0.26+ will autoGen

canal.instance.mysql.slaveId=0

canal.instance.mysql.slaveId=1234

enable gtid use true/false

canal.instance.gtidon=true

position info

canal.instance.master.address=192.168.x.112:3307 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid=

rds oss binlog

#canal.instance.rds.accesskey= #canal.instance.rds.secretkey= #canal.instance.rds.instanceId=

table meta tsdb info

canal.instance.tsdb.enable=false #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid=

username/password

canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset=UTF-8 canal.instance.defaultDatabaseName=information_schema

enable druid Decrypt database password

canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

table regex

canal.instance.filter.regex=.\.. #canal.instance.filter.regex=.*\.t_time

table black regex

canal.instance.filter.black.regex=.\._._new

mq config

canal.mq.topic=fanboshi

dynamic topic route by table regex

#canal.mq.dynamicTopic=.,mytest\..,mytest2.user #canal.mq.partition=0

hash partition config

#canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.\.. #################################################

测试stop.sh 方式切换

启动canal35 canal36 查看zookeeper

[zk: localhost:2181(CONNECTED) 10] ls /otter/canal/destinations/data_checksum/cluster [192.168.x.36:11111, 192.168.x.35:11111] [zk: localhost:2181(CONNECTED) 11] get /otter/canal/destinations/data_checksum/running {"active":true,"address":"192.168.x.35:11111","cid":4035}

[zk: localhost:2181(CONNECTED) 12] ls /otter/canal/destinations/fanboshi/cluster [192.168.x.36:11111, 192.168.x.35:11111] [zk: localhost:2181(CONNECTED) 13] get /otter/canal/destinations/fanboshi/running
{"active":true,"address":"192.168.x.35:11111","cid":4035}

测试 data_checksum 和 fanboshi 两个instance, 均可以写入 造数据

bin/kafka-console-consumer.sh --bootstrap-server 192.168.x.112:9092 --topic data_checksum2 {"data":[{"id":"10","sname":"node00x112---3306"}],"database":"sysbench","es":1553655037000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553655038027,"type":"INSERT"}

bin/kafka-console-consumer.sh --bootstrap-server 192.168.x.112:9092 --topic fanboshi {"data":[{"id":"10","sname":"node00x112---3307"}],"database":"sysbench","es":1553655039000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553655039829,"type":"INSERT"}

stop 35

[root@node004035 conf]# ../bin/stop.sh node004035: stopping canal 19046 ... Oook! cost:2

2019-03-27 10:55:19.736 [Thread-10] INFO com.alibaba.otter.canal.deployer.CanalStater - ## stop the canal server 2019-03-27 10:55:19.736 [Thread-11] INFO com.alibaba.otter.canal.kafka.CanalKafkaProducer - ## stop the kafka producer 2019-03-27 10:55:19.747 [Thread-11] INFO com.alibaba.otter.canal.kafka.CanalKafkaProducer - ## kafka producer is down. 2019-03-27 10:55:20.850 [Thread-10] INFO com.alibaba.otter.canal.deployer.CanalController - ## stop the canal server[192.168.x.35:11111] 2019-03-27 10:55:20.852 [Thread-10] INFO com.alibaba.otter.canal.deployer.CanalStater - ## canal server is down.

此时36

2019-03-27 10:55:24.791 [pool-2-thread-1] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^...$ 2019-03-27 10:55:24.791 [pool-2-thread-1] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^.._.new$ 2019-03-27 10:55:24.852 [destination = data_checksum , address = /192.168.x.112:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2019-03-27 10:55:24.876 [destination = data_checksum , address = /192.168.x.112:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=0041123306-mysql-bin.000001,position=317607200,serverId=41123306,gtid=81904578-3eea-11e9-8755-20040fefc7f8:1-165022,timestamp=1553655037000] cost : 10ms , the next step is binlog dump 2019-03-27 10:55:25.044 [pool-5-thread-1] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^...$ 2019-03-27 10:55:25.045 [pool-5-thread-1] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^.*..*_new$ 2019-03-27 10:55:25.060 [destination = fanboshi , address = /192.168.x.112:3307 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2019-03-27 10:55:25.063 [destination = fanboshi , address = /192.168.x.112:3307 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=0041123307-mysql-bin.000001,position=15439,serverId=41123307,gtid=81904578-3eea-11e9-8755-20040fefc7f8:1-7799,13116acc-3eeb-11e9-831b-20040fefc7f8:1-23,timestamp=1553655039000] cost : 3ms , the next step is binlog dump

查看zookeeper

[zk: localhost:2181(CONNECTED) 14] ls /otter/canal/destinations/data_checksum/cluster [192.168.x.36:11111] [zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/data_checksum/running {"active":true,"address":"192.168.x.36:11111","cid":4036}

[zk: localhost:2181(CONNECTED) 16] ls /otter/canal/destinations/fanboshi/cluster [192.168.x.36:11111] [zk: localhost:2181(CONNECTED) 17] get /otter/canal/destinations/fanboshi/running
{"active":true,"address":"192.168.x.36:11111","cid":4036}

制造数据

bin/kafka-console-consumer.sh --bootstrap-server 192.168.x.112:9092 --topic data_checksum2 {"data":[{"id":"11","sname":"node00x112---3306"}],"database":"sysbench","es":1553655416000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553655416543,"type":"INSERT"}

bin/kafka-console-consumer.sh --bootstrap-server 192.168.x.112:9092 --topic fanboshi {"data":[{"id":"11","sname":"node00x112---3307"}],"database":"sysbench","es":1553655417000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553655417643,"type":"INSERT"}

可以看到stop 35后 可以正常failover到36 并可以产生数据到kafka

那我们还要测试回切, 就是 启动35, cluster成员恢复为[192.168.x.36:11111, 192.168.x.35:11111] 后, 关闭36 ,看35能否接管

启动35

[zk: localhost:2181(CONNECTED) 18] ls /otter/canal/destinations/data_checksum/cluster [192.168.x.36:11111, 192.168.x.35:11111] [zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/data_checksum/running {"active":true,"address":"192.168.x.36:11111","cid":4036}

[zk: localhost:2181(CONNECTED) 20] ls /otter/canal/destinations/fanboshi/cluster [192.168.x.36:11111, 192.168.x.35:11111] [zk: localhost:2181(CONNECTED) 21] get /otter/canal/destinations/fanboshi/running
{"active":true,"address":"192.168.x.36:11111","cid":4036}

此时cluster成员恢复为[192.168.x.36:11111, 192.168.x.35:11111] .主仍然是36 , 我们再造两条数据

data_checksum2 {"data":[{"id":"12","sname":"node00x112---3306"}],"database":"sysbench","es":1553655721000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553655721920,"type":"INSERT"}

fanboshi {"data":[{"id":"12","sname":"node00x112---3307"}],"database":"sysbench","es":1553655723000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553655723548,"type":"INSERT"}

关闭36, 测试能否failover到35 这时,出现一个有意思的现象, 我的两个消费终端 bin/kafka-console-consumer.sh --bootstrap-server 192.168.x.112:9092 --topic data_checksum2 和 bin/kafka-console-consumer.sh --bootstrap-server 192.168.x.112:9092 --topic fanboshi 消费到了两条消息

data_checksum2 {"data":[{"id":"11","sname":"node00x112---3306"}],"database":"sysbench","es":1553655416000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553655817027,"type":"INSERT"} {"data":[{"id":"12","sname":"node00x112---3306"}],"database":"sysbench","es":1553655721000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553655817030,"type":"INSERT"}

fanboshi {"data":[{"id":"11","sname":"node00x112---3307"}],"database":"sysbench","es":1553655417000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553655817027,"type":"INSERT"} {"data":[{"id":"12","sname":"node00x112---3307"}],"database":"sysbench","es":1553655723000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553655817030,"type":"INSERT"}

而这两条消息其实是之前36为主时, 我手动制造的, 但是现在关闭36 failover到 35后又重复产生了

提问231.png

此时我们看zookeeper

[zk: localhost:2181(CONNECTED) 22] ls /otter/canal/destinations/data_checksum/cluster [192.168.x.35:11111] [zk: localhost:2181(CONNECTED) 23] get /otter/canal/destinations/data_checksum/running {"active":true,"address":"192.168.x.35:11111","cid":4035}

[zk: localhost:2181(CONNECTED) 24] ls /otter/canal/destinations/fanboshi/cluster [192.168.x.35:11111] [zk: localhost:2181(CONNECTED) 25] get /otter/canal/destinations/fanboshi/running
{"active":true,"address":"192.168.x.35:11111","cid":4035}

可以看到35是主, 我们造数据, 看kafka能否接收到

data_checksum2 {"data":[{"id":"13","sname":"node00x112---3306"}],"database":"sysbench","es":1553656076000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553656076239,"type":"INSERT"}

fanboshi {"data":[{"id":"13","sname":"node00x112---3307"}],"database":"sysbench","es":1553656077000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553656077640,"type":"INSERT"}

可以正常接收到数据

之后我自己有测了几轮, 就是 继续 关闭35, 启动35 关闭36 启动36 以此类推.. 结果都发现有重复数据产生 接管者会重复产生原主节点存活期间生成的数据 我认为这个问题会引起消息乱序和消息重复

关闭35 36, 启动 35 36

[zk: localhost:2181(CONNECTED) 26] ls /otter/canal/destinations/data_checksum/cluster [192.168.x.36:11111, 192.168.x.35:11111] [zk: localhost:2181(CONNECTED) 27] get /otter/canal/destinations/data_checksum/running {"active":true,"address":"192.168.x.35:11111","cid":4035}

[zk: localhost:2181(CONNECTED) 28] ls /otter/canal/destinations/fanboshi/cluster [192.168.x.36:11111, 192.168.x.35:11111] [zk: localhost:2181(CONNECTED) 29] get /otter/canal/destinations/fanboshi/running
{"active":true,"address":"192.168.x.35:11111","cid":4035}

可以看到35 36正常启动, cluster成员恢复为[192.168.x.36:11111, 192.168.x.35:11111], running为35 制造数据

data_checksum2 {"data":[{"id":"14","sname":"node00x112---3306"}],"database":"sysbench","es":1553656399000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553656399968,"type":"INSERT"}

fanboshi {"data":[{"id":"14","sname":"node00x112---3307"}],"database":"sysbench","es":1553656401000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553656401376,"type":"INSERT"}

可以正常接收到数据

在35手动mv data_checksum/instance.properties data_checksum/instance.properties.bak 35日志

2019-03-27 11:14:22.816 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify stop data_checksum successful.

36日志

2019-03-27 11:14:27.889 [pool-2-thread-1] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^...$ 2019-03-27 11:14:27.889 [pool-2-thread-1] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^.._._new$ 2019-03-27 11:14:27.950 [destination = data_checksum , address = /192.168.x.112:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2019-03-27 11:14:27.974 [destination = data_checksum , address = /192.168.x.112:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=0041123306-mysql-bin.000001,position=317608344,serverId=41123306,gtid=81904578-3eea-11e9-8755-20040fefc7f8:1-165026,timestamp=1553656399000] cost : 10ms , the next step is binlog dump

zookeeper

[zk: localhost:2181(CONNECTED) 31] ls /otter/canal/destinations/data_checksum/cluster [192.168.x.36:11111] [zk: localhost:2181(CONNECTED) 32] get /otter/canal/destinations/data_checksum/running {"active":true,"address":"192.168.x.36:11111","cid":4036}

向data_checksum制造数据, 可以正常接收到

{"data":[{"id":"15","sname":"node00x112---3306"}],"database":"sysbench","es":1553656500000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553656500533,"type":"INSERT"}

恢复35 mv data_checksum/instance.properties.bak data_checksum/instance.properties 35日志

2019-03-27 11:16:47.837 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start data_checksum successful.

36无日志

zookeeper

[zk: localhost:2181(CONNECTED) 33] ls /otter/canal/destinations/data_checksum/cluster [192.168.x.36:11111, 192.168.x.35:11111] [zk: localhost:2181(CONNECTED) 34] get /otter/canal/destinations/data_checksum/running {"active":true,"address":"192.168.x.36:11111","cid":4036}

向data_checksum制造数据, 可以正常接收到

{"data":[{"id":"16","sname":"node00x112---3306"}],"database":"sysbench","es":1553656672000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553656672219,"type":"INSERT"}

在36手动mv data_checksum/instance.properties data_checksum/instance.properties.bak 36日志

2019-03-27 11:18:33.042 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify stop data_checksum successful.

35日志

2019-03-27 11:18:38.057 [pool-2-thread-1] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^...$ 2019-03-27 11:18:38.057 [pool-2-thread-1] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^.._._new$ 2019-03-27 11:18:38.073 [destination = data_checksum , address = /192.168.x.112:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2019-03-27 11:18:38.075 [destination = data_checksum , address = /192.168.x.112:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=0041123306-mysql-bin.000001,position=317608344,serverId=41123306,gtid=81904578-3eea-11e9-8755-20040fefc7f8:1-165026,timestamp=1553656399000] cost : 3ms , the next step is binlog dump

zookeeper

[zk: localhost:2181(CONNECTED) 35] ls /otter/canal/destinations/data_checksum/cluster [192.168.x.35:11111] [zk: localhost:2181(CONNECTED) 36] get /otter/canal/destinations/data_checksum/running {"active":true,"address":"192.168.x.35:11111","cid":4035}

向data_checksum制造数据id=17,18, 无法接受到数据!!!!!!!!!!!!!

关闭35, 36 启动35 36, 又产生重复数据

{"data":[{"id":"15","sname":"node00x112---3306"}],"database":"sysbench","es":1553656500000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553656899655,"type":"INSERT"} {"data":[{"id":"16","sname":"node00x112---3306"}],"database":"sysbench","es":1553656672000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553656899657,"type":"INSERT"}

下面两条是刚才没有接收到的 {"data":[{"id":"17","sname":"node00x112---3306"}],"database":"sysbench","es":1553656753000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553656899658,"type":"INSERT"} {"data":[{"id":"18","sname":"node00x112---3306"}],"database":"sysbench","es":1553656877000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","sname":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"sname":12},"table":"t_test","ts":1553656899658,"type":"INSERT"}

关闭35, 36 启动35 36

懒得再测了, 总之 stop.sh kill -9 可以切 但是会产生重复数据 mv data_checksum/instance.properties data_checksum/instance.properties.bak 只能切一次, 无法回切

原提问者GitHub用户Fanduzi

展开
收起
古拉古拉 2023-05-08 13:39:28 78 0
1 条回答
写回答
取消 提交回答
  • 重复数据是不可避免的,考虑业务上做幂等处理

    原回答者GitHub用户DinoZhang

    2023-05-09 17:42:44
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载