环境信息
canal version latest release mysql version not relevant
问题描述
MemoryEventStoreWithBuffer的get和put方法,支持指定size。但是可能存在这样的case,导致notEmpty和notFull都无法被notify而出现死锁。假设buffer size 为16,首先put 6 events,然后尝试get 7 events,再尝试put 12 events。
步骤重现
执行下面代码
final MemoryEventStoreWithBuffer buffer = new MemoryEventStoreWithBuffer(BatchMode.ITEMSIZE); buffer.setBufferSize(16); buffer.start(); try { List events = new ArrayList<>(6); for (int i = 0; i < 6; ++i) { events.add(new Event()); } buffer.put(events); System.out.println("put 6 events");
new Thread(new Runnable() {
@Override
public void run() {
List<Event> events = new ArrayList<>(12);
for (int i = 0; i < 12; ++i) {
events.add(new Event());
}
System.out.println("trying to put 12 events");
try {
buffer.put(events);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("put 12 events");
}
}).start();
System.out.println("trying to get 7 events");
buffer.get(null, 7);
System.out.println("get 7 events");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
buffer.stop();
}
解决方案
我觉得get(Position start, int batchSize)的batchSize是期望读取的数量,因此在checkUnGetSlotAt改下判断条件,如果current < maxAbleSequence则返回true。
private boolean checkUnGetSlotAt(LogPosition startPosition, int batchSize) { if (batchMode.isItemSize()) { long current = getSequence.get(); long maxAbleSequence = putSequence.get(); long next = current; if (startPosition == null || !startPosition.getPostion().isIncluded()) { // 第一次订阅之后,需要包含一下start位置,防止丢失第一条记录 next = next + 1;// 少一条数据 }
//-- if (current < maxAbleSequence && next + batchSize - 1 <= maxAbleSequence) { if (current < maxAbleSequence) { return true; } else { return false; } ...
原提问者GitHub用户sosozhuang
如果bufferSize过小,设计上是有这么一个问题. 之前不这么做的一个考虑,主要是想避免产生很多小包
你们生产环境设置的bufferSize是多小?put写入目前最大是1024,理论上只要大于2 * 1024就不会遇上
原回答者GitHub用户agapple
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。