问题概述 线上环境发现有些表部分增量数据一致未同步,目前定位到com.alibaba.otter.node.etl.select.selector.MessageParser#internParse(com.alibaba.otter.shared.common.model.config.pipeline.Pipeline, com.alibaba.otter.canal.protocol.CanalEntry.Entry)中的rowChange = RowChange.parseFrom(entry.getStoreValue());为空导致数据直接丢弃。
日志 com.alibaba.otter.node.etl.select.selector.MessageParser#parse调整: 代码调整 public List parse(Long pipelineId, List datas) throws SelectException { List eventDatas = new ArrayList(); Pipeline pipeline = configClientService.findPipeline(pipelineId); List transactionDataBuffer = new ArrayList(); // hz为主站点,us->hz的数据,需要回环同步会us。并且需要开启回环补救算法 PipelineParameter pipelineParameter = pipeline.getParameters(); boolean enableLoopbackRemedy = pipelineParameter.isEnableRemedy() && pipelineParameter.isHome() && pipelineParameter.getRemedyAlgorithm().isLoopback(); boolean isLoopback = false; boolean needLoopback = false; // 判断是否属于需要loopback处理的类型,只处理正常otter同步产生的回环数据,因为会有业务方手工屏蔽同步的接口,避免回环
String randStr = "";
if(pipelineId == 23){
randStr = RandomStringUtils.randomAlphanumeric(30);
}
long now = new Date().getTime();
try {
for (Entry entry : datas) {
switch (entry.getEntryType()) {
case TRANSACTIONBEGIN:
isLoopback = false;
break;
case ROWDATA:
String tableName = entry.getHeader().getTableName();
// 判断是否是回环表retl_mark
boolean isMarkTable = tableName.equalsIgnoreCase(pipeline.getParameters().getSystemMarkTable());
if (isMarkTable) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
if (!rowChange.getIsDdl()) {
// int loopback = checkLoopback(pipeline, rowChange.getRowDatas(0)); int loopback = 0; if (rowChange.getRowDatasCount() > 0) { loopback = checkLoopback(pipeline, rowChange.getRowDatas(0)); } if (loopback == 2) { needLoopback |= true; // 只处理正常同步产生的回环数据 }
isLoopback |= loopback > 0;
}
}
// 检查下otter3.0的回环表,对应的schmea会比较随意,所以不做比较
boolean isCompatibleLoopback = tableName.equalsIgnoreCase(compatibleMarkTable);
if (isCompatibleLoopback) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
if (!rowChange.getIsDdl()) {
// int loopback = checkCompatibleLoopback(pipeline, rowChange.getRowDatas(0)); int loopback = 0; if (rowChange.getRowDatasCount() > 0) { loopback = checkCompatibleLoopback(pipeline, rowChange.getRowDatas(0)); } if (loopback == 2) { needLoopback |= true; // 只处理正常同步产生的回环数据 } isLoopback |= loopback > 0; } }
// debug调试日志代码
if(pipelineId == 23 && ("global_drug_relate_indication".equalsIgnoreCase(tableName) || "drug_indications_relation".equalsIgnoreCase(tableName))){
logger.warn("binlogdebug-messageRowParser randStr:{},entry:{},"
+ "isLoopback:{},enableLoopbackRemedy:{},needLoopback:{},isMarkTable:{},isCompatibleLoopback:{}",randStr,entry.getHeader().getLogfileOffset(),
isLoopback,enableLoopbackRemedy,needLoopback,isMarkTable,isCompatibleLoopback);
}
if ((!isLoopback || (enableLoopbackRemedy && needLoopback)) && !isMarkTable
&& !isCompatibleLoopback) {
transactionDataBuffer.add(entry);
}
break;
case TRANSACTIONEND:
if (!isLoopback || (enableLoopbackRemedy && needLoopback)) {
// 添加数据解析
for (Entry bufferEntry : transactionDataBuffer) {
List<EventData> parseDatas = internParse(pipeline, bufferEntry);
if (CollectionUtils.isEmpty(parseDatas)) {// 可能为空,针对ddl返回时就为null
continue;
}
// 初步计算一下事件大小
long totalSize = bufferEntry.getHeader().getEventLength();
long eachSize = totalSize / parseDatas.size();
for (EventData eventData : parseDatas) {
if (eventData == null) {
continue;
}
eventData.setSize(eachSize);// 记录一下大小
if (needLoopback) {// 针对需要回环同步的
// 如果延迟超过指定的阀值,则设置为需要反查db
if (now - eventData.getExecuteTime() > 1000 * pipeline.getParameters()
.getRemedyDelayThresoldForMedia()) {
eventData.setSyncConsistency(SyncConsistency.MEDIA);
} else {
eventData.setSyncConsistency(SyncConsistency.BASE);
}
eventData.setRemedy(true);
}
eventDatas.add(eventData);
}
}
if(pipeline.getId() == 23){
logger.warn("binlogdebug-messageRowInteralParser1 randStr:{} transactionDataBuffer:{},eventDatas:{}",randStr,transactionDataBuffer.size(),eventDatas.size());
}
}
isLoopback = false;
needLoopback = false;
transactionDataBuffer.clear();
break;
default:
break;
}
}
// 添加最后一次的数据,可能没有TRANSACTIONEND
if (!isLoopback || (enableLoopbackRemedy && needLoopback)) {
// 添加数据解析
for (Entry bufferEntry : transactionDataBuffer) {
List<EventData> parseDatas = internParse(pipeline, bufferEntry);
if (CollectionUtils.isEmpty(parseDatas)) {// 可能为空,针对ddl返回时就为null
continue;
}
// 初步计算一下事件大小
long totalSize = bufferEntry.getHeader().getEventLength();
long eachSize = totalSize / parseDatas.size();
for (EventData eventData : parseDatas) {
if (eventData == null) {
continue;
}
eventData.setSize(eachSize);// 记录一下大小
if (needLoopback) {// 针对需要回环同步的
// 如果延迟超过指定的阀值,则设置为需要反查db
if (now - eventData.getExecuteTime() > 1000 * pipeline.getParameters()
.getRemedyDelayThresoldForMedia()) {
eventData.setSyncConsistency(SyncConsistency.MEDIA);
} else {
eventData.setSyncConsistency(SyncConsistency.BASE);
}
}
eventDatas.add(eventData);
}
}
if(pipeline.getId() == 23){
logger.warn("binlogdebug-messageRowInteralParser2 randStr:{} transactionDataBuffer:{},eventDatas:{}",randStr,transactionDataBuffer.size(),eventDatas.size());
}
}
// debug调试日志代码
if(pipelineId == 23){
if(datas.size() == 2 && EntryType.TRANSACTIONBEGIN.getNumber() == datas.get(0).getEntryType().getNumber() &&
EntryType.TRANSACTIONEND.getNumber() == datas.get(1).getEntryType().getNumber()){
}else{
logger.warn("binlogdebug-messageParser randStr:{},pid:{},size:{},binlog:{}",randStr,pipelineId,datas.size(),datas.toString());
logger.warn("binlogdebug-messageParser randStr:{},eventDatas:{}",randStr,null == eventDatas?"":eventDatas.toString());
}
}
} catch (Exception e) {
throw new SelectException(e);
}
return eventDatas;
日志:
Sep 15 07:01:18 otter2 otter-node-prd[397902]: entryType: ROWDATA Sep 15 07:01:18 otter2 otter-node-prd[397902]: storeValue: "\020\aZ\027SET INSERT_ID = 1014152" Sep 15 07:01:18 otter2 otter-node-prd[397902]: , header { Sep 15 07:01:18 otter2 otter-node-prd[397902]: version: 1 Sep 15 07:01:18 otter2 otter-node-prd[397902]: logfileName: "mysql-bin-3306.020092" Sep 15 07:01:18 otter2 otter-node-prd[397902]: logfileOffset: 924171204 Sep 15 07:01:18 otter2 otter-node-prd[397902]: serverId: 67 Sep 15 07:01:18 otter2 otter-node-prd[397902]: serverenCode: "UTF-8" Sep 15 07:01:18 otter2 otter-node-prd[397902]: executeTime: 1600124478000 Sep 15 07:01:18 otter2 otter-node-prd[397902]: sourceType: MYSQL Sep 15 07:01:18 otter2 otter-node-prd[397902]: schemaName: "drug" Sep 15 07:01:18 otter2 otter-node-prd[397902]: tableName: "drug_indications_relation" Sep 15 07:01:18 otter2 otter-node-prd[397902]: eventLength: 177 Sep 15 07:01:18 otter2 otter-node-prd[397902]: eventType: INSERT Sep 15 07:01:18 otter2 otter-node-prd[397902]: } Sep 15 07:01:18 otter2 otter-node-prd[397902]: entryType: ROWDATA Sep 15 07:01:18 otter2 otter-node-prd[397902]: storeValue: "\020\001Z_insert into drug_indications_relation (drug_id, indications_id, type) values (2099547, 1630, 6)r\004drug" 解释:插入表drug_indications_relation的一条数据,主键为1014152,偏移量为924171204。
com.alibaba.otter.node.etl.select.selector.MessageParser#internParse(com.alibaba.otter.shared.common.model.config.pipeline.Pipeline, com.alibaba.otter.canal.protocol.CanalEntry.Entry) 代码调整:
private List internParse(Pipeline pipeline, Entry entry) { RowChange rowChange = null; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new SelectException("parser of canal-event has an error , data:" + entry.toString(), e); }
if(23 == pipeline.getId() && ("global_drug_relate_indication".equalsIgnoreCase(entry.getHeader().getTableName())
|| "drug_indications_relation".equalsIgnoreCase(entry.getHeader().getTableName()) )){
logger.warn("binlogdebug-interparse entryposition:{} entryType:{}",entry.getHeader().getLogfileOffset(),
null==rowChange?"null":rowChange.getEventType());
if(null != rowChange.getRowDatasList() && rowChange.getRowDatasList().size()>0){
for(RowData rowData:rowChange.getRowDatasList()){
logger.warn("binlogdebug-interparse entryposition:{} rowData:{}",entry.getHeader().getLogfileOffset(),
rowData.toString());
}
}else{
logger.warn("binlogdebug-interparse entryposition:{} rowDatasList is null",entry.getHeader().getLogfileOffset());
}
}
if (rowChange == null) {
return null;
}
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
EventType eventType = EventType.valueOf(rowChange.getEventType().name());
// 处理下DDL操作
if (eventType.isQuery()) {
// 直接忽略query事件
return null;
}
// 首先判断是否为系统表
if (StringUtils.equalsIgnoreCase(pipeline.getParameters().getSystemSchema(), schemaName)) {
// do noting
if (eventType.isDdl()) {
return null;
}
if (StringUtils.equalsIgnoreCase(pipeline.getParameters().getSystemDualTable(), tableName)) {
// 心跳表数据直接忽略
return null;
}
} else {
if (eventType.isDdl()) {
boolean notExistReturnNull = false;
if (eventType.isRename()) {
notExistReturnNull = true;
}
DataMedia dataMedia = ConfigHelper.findSourceDataMedia(pipeline,
schemaName,
tableName,
notExistReturnNull);
// 如果EventType是CREATE/ALTER,需要reload
// DataMediaInfo;并且把CREATE/ALTER类型的事件丢弃掉.
if (dataMedia != null && (eventType.isCreate() || eventType.isAlter() || eventType.isRename())) {
DbDialect dbDialect = dbDialectFactory.getDbDialect(pipeline.getId(),
(DbMediaSource) dataMedia.getSource());
dbDialect.reloadTable(schemaName, tableName);// 更新下meta信息
}
boolean ddlSync = pipeline.getParameters().getDdlSync();
if (ddlSync) {
// 处理下ddl操作
EventData eventData = new EventData();
eventData.setSchemaName(schemaName);
eventData.setTableName(tableName);
eventData.setEventType(eventType);
eventData.setExecuteTime(entry.getHeader().getExecuteTime());
eventData.setSql(rowChange.getSql());
eventData.setDdlSchemaName(rowChange.getDdlSchemaName());
eventData.setTableId(dataMedia.getId());
return Arrays.asList(eventData);
} else {
return null;
}
}
}
List<EventData> eventDatas = new ArrayList<EventData>();
for (RowData rowData : rowChange.getRowDatasList()) {
EventData eventData = internParse(pipeline, entry, rowChange, rowData);
if (eventData != null) {
eventDatas.add(eventData);
}
if(23 == pipeline.getId() && ("global_drug_relate_indication".equalsIgnoreCase(entry.getHeader().getTableName())
|| "drug_indications_relation".equalsIgnoreCase(entry.getHeader().getTableName()) )){
logger.warn("binlogdebug-interparse entryposition:{} eventData:{}",entry.getHeader().getLogfileOffset(),null==eventData?"null":eventData.getKeys().toString());
}
}
return eventDatas;
}
根据定位到上个insert的偏移量为924171204定位到解析rowChange日志如下:
Sep 15 07:01:18 otter2 otter-node-prd[397902]: 2020-09-15 07:01:18.065 [pipelineId = 23,taskName = ProcessSelect] WARN com.alibaba.otter.node.etl.select.selector.MessageParser - binlogdebug-messageRowParser randStr:oTQo55Qa3d5Yyq0uVkzKcG4uCf4hp7,entry:924171204,isLoopback:false,enableLoopbackRemedy:false,needLoopback:false,isMarkTable:false,isCompatibleLoopback:false Sep 15 07:01:18 otter2 otter-node-prd[397902]: 2020-09-15 07:01:18.065 [pipelineId = 23,taskName = ProcessSelect] WARN com.alibaba.otter.node.etl.select.selector.MessageParser - binlogdebug-interparse entryposition:924171204 entryType:INSERT Sep 15 07:01:18 otter2 otter-node-prd[397902]: 2020-09-15 07:01:18.065 [pipelineId = 23,taskName = ProcessSelect] WARN com.alibaba.otter.node.etl.select.selector.MessageParser - binlogdebug-interparse entryposition:924171204 rowDatasList is null 发现没有解析到rowchange,导致binlog直接被过滤而造成数据丢失!!!
已确认点 本来以为是反序列化rowchange造成的数据丢失,但是发现但序列化的时候做了异常捕获和抛出,日志中也没有发现异常日志,所以应该不是反序列化的问题。 otter的版本:4.1.17版本基础的定制版本,但是我们messageparse这块没有修改调整过。 canal版本升级至1.1.4版本 mysql数据库版本5.6 疑问点 为什么会出现拿到binlog,然后解析rowchange为空?? 会不会是canal中的bug?
原提问者GitHub用户wuqiu-ai
是binlog-format格式的问题,由于DBA修改了ROW之后,由于是线上环境,不能做mysql重启,在加上业务也没重新部署,导致部分数据库连接的format格式还是之前的statement格式,最终导致增量失败
原回答者Github用户 wuqiu-ai
问题可能是由于Canal日志中缺少特定的RowChange数据导致的。具体来说,当Canal解析binlog并将数据转换为RowChange对象时,如果缺少特定的数据,则可能会导致RowChange对象为空,从而导致数据丢失。
为了解决这个问题,您可以尝试以下方法:
检查Canal Server的配置:请检查您的Canal Server的配置,确保它已正确配置,并且Canal日志可以正确解析binlog数据。您可以尝试重新配置Canal Server,以确保它已正确设置。
检查Canal日志文件:请检查Canal日志文件,以查看是否有任何错误或警告消息,这些消息可能会提供更多关于数据丢失的信息。您可以尝试使用Canal提供的命令行工具来查看Canal日志文件。
检查Canal版本:请检查您使用的Canal版本是否与您的数据库版本兼容。如果版本不兼容,可能会导致数据丢失或解析错误。
检查数据库的写入:请检查您的数据库是否在写入数据。如果您的数据库在写入数据时发生了故障或错误,可能会导致数据丢失。
检查Otter的配置:请检查您的Otter配置,确保它已正确设置,并且可以正确处理Canal解析的数据。您可以尝试重新配置Otter,以确保它已正确设置。
根据问题描述,该问题是在线上环境中发现有些表的部分增量数据没有同步,而在定位问题时发现是由于com.alibaba.otter.node.etl.select.selector.MessageParser#internParse(com.alibaba.otter.shared.common.model.config.pipeline.Pipeline, com.alibaba.otter.canal.protocol.CanalEntry.Entry)方法中的rowChange = RowChange.parseFrom(entry.getStoreValue());返回为空导致数据丢失。为了解决该问题,日志com.alibaba.otter.node.etl.select.selector.MessageParser#parse进行了调整。
然而,该问题的具体原因仍需要进一步排查,例如是否由于数据源结构变更、网络传输问题等原因导致数据丢失。建议对具体情况进行更深入的分析和排查,以确定问题的根本原因并采取相应措施进行修复。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。