文件格式概述
commitlog消息单元存储结构
commitlog中存储的是客户端发送的所有数据
ConsumeQueue消息单元存储结构
ConsumeQueue存的是主题的逻辑信息,如下图所示,代表一条记录。其中记录的信息存储在commitLog中,位置是CommitLog Offset。
流程图
源码跟踪(broker启动流程里)
入口方法
DefaultMessageStore###load
public boolean load() { boolean result = true; try { //省略 // 装载Commit Log result = result && this.commitLog.load(); if (result) { //省略 //确定Commit Log文件下一个写的位置 this.recover(lastExitOK); } } catch (Exception e) { } return result; }
装载commitlog:把commitlog中下的文件都映射成MappedFile,方便读写
CommitLog###load
public boolean load() { //跟进去,调用mappedFileQueue.load方法 boolean result = this.mappedFileQueue.load(); log.info("load commit log " + (result ? "OK" : "Failed")); return result; }
MappedFileQueue###load方法:在该方法中把commitlog下的文件映射成MappedFile
public boolean load() { //window上默认的目录:C:\Users\25682\store\commitlog File dir = new File(this.storePath); //上面目录下子文件 File[] files = dir.listFiles(); if (files != null) { // ascending order Arrays.sort(files); for (File file : files) { if (file.length() != this.mappedFileSize) { log.warn(file + "\t" + file.length() + " length not matched message store config value, please check it manually"); return false; } try { //把每一个文件映射成MappedFile对象,方便读取 MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); //此时wrotePosition设置的为mappedFileSize,不准确 mappedFile.setWrotePosition(this.mappedFileSize); mappedFile.setFlushedPosition(this.mappedFileSize); mappedFile.setCommittedPosition(this.mappedFileSize); this.mappedFiles.add(mappedFile); log.info("load " + file.getPath() + " OK"); } catch (IOException e) { log.error("load file " + file + " error", e); return false; } } } return true; }
此时CommitLog下的MappedFile的wrotePosition设置为mappedFileSize,但是最后这个MappedFile的wrotePosition还不对,因此下面需要修改
确定Commitlog要写的位置
DefaultMessageStore###recover
private void recover(final boolean lastExitOK) { //从ConsumeQueue中获取最大的物理偏移量 long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); if (lastExitOK) { this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); } else { // this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } this.recoverTopicQueueTable(); }
DefaultMessageStore###recoverConsumeQueue:获取每一个主题里每一个队列里的最大commitlog偏移量
private long recoverConsumeQueue() { long maxPhysicOffset = -1; for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { logic.recover(); if (logic.getMaxPhysicOffset() > maxPhysicOffset) { maxPhysicOffset = logic.getMaxPhysicOffset(); } } } return maxPhysicOffset; }
CommitLog###recoverAbnormally
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { // recover by the minimum time stamp boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { int index = mappedFiles.size() - 1; //获取最后一个CommitLog的MapperFile MappedFile mappedFile = null; for (; index >= 0; index--) { mappedFile = mappedFiles.get(index); if (this.isMappedFileMatchedRecover(mappedFile)) { log.info("recover from this mapped file " + mappedFile.getFileName()); break; } } if (index < 0) { index = 0; mappedFile = mappedFiles.get(index); } ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; while (true) { //不断从MapperFile中根据CommitLog的数据单元格式读取数据,当读取到数据为0时,跳出循环,说明该位置为下个需要写的位置 DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { // Normal data if (size > 0) { mappedFileOffset += size; if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { this.defaultMessageStore.doDispatch(dispatchRequest); } } else { this.defaultMessageStore.doDispatch(dispatchRequest); } } else if (size == 0) { index++; if (index >= mappedFiles.size()) { log.info("recover physics file over, last mapped file " + mappedFile.getFileName()); break; } else { mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0; log.info("recover next physics file, " + mappedFile.getFileName()); } } } else { log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position()); break; } } processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); //该位置为真正要插入的位置,所以修正上面的设置的错误的wrotePosition this.mappedFileQueue.truncateDirtyFiles(processOffset); // Clear ConsumeQueue redundant data if (maxPhyOffsetOfConsumeQueue >= processOffset) { log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset); this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } // Commitlog case files are deleted else { log.warn("The commitlog files are deleted, and delete the consume queue files"); this.mappedFileQueue.setFlushedWhere(0); this.mappedFileQueue.setCommittedWhere(0); this.defaultMessageStore.destroyLogics(); } }
结论
CommitLog一开始是把wrotePosition设置为CommitLog文件的大小,这样只有最后一个CommitLog的wrotePosition的数据是不正确的,所以后面在确定最后一个CommitLog的wrotePosition的时候是通过读取CommitLog文件里的数据来确定wrotePosition位置的,因为CommitLog里前四个字节代表这条消息的大小,这样我读取前四个字节以后就可以读取这一条数据,然后以此类推,当读取消息的大小为0时,代表此处没有消息,则确定wrotePosition的位置。