现网数据变更较大,发现数据存在遗漏的情况,定位在binlog未接收到事件信息。
对比发现binlog 29827255 未知的信息丢失
dump binlog
canal log
日志丢失的问题看源码找到原因了,但是binlog接收处确实未接收到,log每次会将接收到的binlog打印出来,29827255未被接收,处理代码如下
void execute() { long batchId; LOGGER.debug("execute destination : " + destination); while (true) { try { connector.connect(); connector.subscribe(filter); while (true) { Message message = connector.getWithoutAck(BATCH_SIZE); batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException ignored) { // ignored } } else { process(message.getEntries()); } connector.ack(batchId); } } catch (Exception e) { LOGGER.error("canal connect error, destination : " + destination, e); AlarmUtil.dcAlarm(App.DC_ID, "canal_connect_error", "canal connect error, destination : " + destination); } finally { connector.disconnect(); }
try {
Thread.sleep(1000 * 60);
} catch (InterruptedException e) {
// ignored
}
}
}
private void process(List<CanalEntry.Entry> entryList) {
for (CanalEntry.Entry entry : entryList) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == CanalEntry.EventType.DELETE) {
return;
}
if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE) {
return;
}
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
if (rowDataList == null || rowDataList.size() == 0) {
continue;
}
List<BinlogColumnDTO> columns = new ArrayList<>();
try {
for (CanalEntry.RowData rowData : rowDataList) {
columns = convertColumnList(rowData.getAfterColumnsList());
long updatedCount = columns.stream().filter(BinlogColumnDTO::getUpdated).count();
if (updatedCount < 1) {
return;
}
String binlogFileOffset = entry.getHeader().getLogfileName() + ":"
+ entry.getHeader().getLogfileOffset() + ":"
+ DateUtil.timestamp2DateTime(entry.getHeader().getExecuteTime());
LOGGER.log(ACCESS, binlogFileOffset + ", eventType : {}, data : {}",
eventType, toJsonString(columns));
if (eventType == CanalEntry.EventType.INSERT) {
syncService.insert(columns, binlogFileOffset);
}
else if (eventType == CanalEntry.EventType.UPDATE){
syncService.update(columns, binlogFileOffset);
}
}
}
catch (Exception e) {
LOGGER.error("binlog handle error, data : " + JSONArray.toJSONString(columns), e);
AlarmUtil.dcAlarm(App.DC_ID, "binlog_handle_error", e.toString());
}
}
}
跟踪源码,canal server打印debug日志,发现client发请求过来的时候是有读取到对应postion的,但就是没有返回事件内容过去。
另一个案例的截图
binlog file
canal server log
原提问者GitHub用户xiaopan0513
1、你这最后的日志是在位点定位的时候得,不是数据读取的日志
2、canal log里记录的位点是以批次为单位,并没有精确到一条记录
建议你们先用标准的example工程打印接收到的数据,如果有能复现的方式最好能提供一下
原回答者GitHub用户agapple
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。