在项目运行启动订阅时有遇到一个异常(项目采用的是CanalServerWithEmbedded内嵌,关键的配置MetaMode.MIXED & IndexMode.MEMORY_META_FAILBACK),栈输出如下:
dump address /xxx.xx.xx.xxx:3306 has an error, retrying. caused by
java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) at java.util.ArrayList$Itr.next(ArrayList.java:851) at com.alibaba.otter.canal.parse.index.MetaLogPositionManager.getLatestIndexBy(MetaLogPositionManager.java:56) at com.alibaba.otter.canal.parse.index.FailbackLogPositionManager.getLatestIndexBy(FailbackLogPositionManager.java:68) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.findStartPositionInternal(MysqlEventParser.java:567) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.findStartPosition(MysqlEventParser.java:509) at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:167) at java.lang.Thread.run(Thread.java:745)
看代码发现CanalServerWithEmbedded中的subscribe方法中有调用MetaManager的subscribe方法,最终会调用到MemoryMetaManager的subscribe方法,在得到destinations中key对应的list后,修改list(向list里add元素)。代码:
public synchronized void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
List<ClientIdentity> clientIdentitys = destinations.get(clientIdentity.getDestination());
if (clientIdentitys.contains(clientIdentity)) {
clientIdentitys.remove(clientIdentity);
}
clientIdentitys.add(clientIdentity);
}
而MemoryMetaManager类中还有一个方法listAllSubscribeInfo会直接返回destinations中key对应的list。代码:
public synchronized List<ClientIdentity> listAllSubscribeInfo(String destination) throws CanalMetaManagerException {
return destinations.get(destination);
}
我遇到的问题是在MetaLogPositionManager的getLatestIndexBy方法遍历metaManager.listAllSubscribeInfo调用结果时,业务代码线程调用了subscribe方法修改了遍历的list集合,导致了ConcurrentModificationException。
public LogPosition getLatestIndexBy(String destination) {
List<ClientIdentity> clientIdentities = metaManager.listAllSubscribeInfo(destination);
LogPosition result = null;
if (!CollectionUtils.isEmpty(clientIdentities)) {
// 尝试找到一个最小的logPosition
for (ClientIdentity clientIdentity : clientIdentities) { //在此处遍历时链表被其它线程修改
LogPosition position = (LogPosition) metaManager.getCursor(clientIdentity);
if (position == null) {
continue;
}
if (result == null) {
result = position;
} else {
result = CanalEventUtils.min(result, position);
}
}
}
return result;
}
这里感觉在MemoryMetaManager中的listAllSubscribeInfo方法有比较大的隐患,外部拿到引用以后可能在不同的线程中做遍历或者修改。 如果这里每次都拷贝一个新list返回的话不知道对效率上的损失是否太大,如果不希望过多拷贝的话可以返回unmodifiableList,阻止其他类对这个集合的修改,但是这样在其它类遍历这个集合时还是可能出现我这种ConcurrentModificationException。
想问一下你对这个问题是怎么考虑的呢
原提问者GitHub用户zwangbo
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。