CorruptReplicasMap用于存储文件系统中所有损坏数据块的信息。仅当它的所有副本损坏时一个数据块才被认定为损坏。当汇报数据块的副本时,我们隐藏所有损坏副本。一旦一个数据块被发现完好副本达到预期,它将从CorruptReplicasMap中被移除。
我们先看下CorruptReplicasMap都有哪些成员变量,如下所示:
// 存储损坏数据块Block与它对应每个数据节点与损坏原因集合映射关系的集合 private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap = new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();是的,你没看错,就这一个corruptReplicasMap集合,它是一个用来存储损坏数据块Block实例与它对应每个数据节点和损坏原因集合的映射关系的集合。
我们看下CorruptReplicasMap都提供了哪些有用的方法。
一、addToCorruptReplicasMap()
标记属于指定数据节点的数据块为损坏
/** * Mark the block belonging to datanode as corrupt. * 标记属于指定数据节点的数据块为损坏 * * @param blk Block to be added to CorruptReplicasMap * @param dn DatanodeDescriptor which holds the corrupt replica * @param reason a textual reason (for logging purposes) * @param reasonCode the enum representation of the reason */ void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, String reason, Reason reasonCode) { // 先从corruptReplicasMap集合中查找是否存在对应数据块blk Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk); // 如果不存在,构造一个HashMap<DatanodeDescriptor, Reason>集合nodes,将blk与nodes存入corruptReplicasMap if (nodes == null) { nodes = new HashMap<DatanodeDescriptor, Reason>(); corruptReplicasMap.put(blk, nodes); } String reasonText; if (reason != null) { reasonText = " because " + reason; } else { reasonText = ""; } // 判断nodes中是否存在对应数据节点dn,分别记录日志信息 if (!nodes.keySet().contains(dn)) { NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+ blk.getBlockName() + " added as corrupt on " + dn + " by " + Server.getRemoteIp() + reasonText); } else { NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+ "duplicate requested for " + blk.getBlockName() + " to add as corrupt " + "on " + dn + " by " + Server.getRemoteIp() + reasonText); } // Add the node or update the reason. // 将数据节点dn、损坏原因编码reasonCode加入或更新入nodes nodes.put(dn, reasonCode); }处理逻辑很简单,大体如下:
1、先从corruptReplicasMap集合中查找是否存在对应数据块blk;
2、如果不存在,构造一个HashMap<DatanodeDescriptor, Reason>集合nodes,将blk与nodes存入corruptReplicasMap;
3、判断nodes中是否存在对应数据节点dn,分别记录日志信息;
4、将数据节点dn、损坏原因编码reasonCode加入或更新入nodes。
二、removeFromCorruptReplicasMap()
将指定数据块、数据节点,根据指定原因从集合corruptReplicasMap移除
// 将指定数据块、数据节点,根据指定原因从集合corruptReplicasMap移除 boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode, Reason reason) { // 先从corruptReplicasMap集合中查找是否存在对应数据块blk,获得datanodes Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk); // 如果不存在,直接返回false,表明移除失败 if (datanodes==null) return false; // if reasons can be compared but don't match, return false. // 取出数据节点datanode对应的存储损坏原因storedReason Reason storedReason = datanodes.get(datanode); // 判断存储损坏原因storedReason与参数损坏原因reason是否一致,不一致直接返回false,表明移除失败, // 判断的依据为参数损坏原因reason不是ANY且存储损坏原因storedReason不为空的情况下,两者不一致 if (reason != Reason.ANY && storedReason != null && reason != storedReason) { return false; } // 将datanode对应数据从datanodes中移除 if (datanodes.remove(datanode) != null) { // remove the replicas // 移除datanode后,如果datanodes为空 if (datanodes.isEmpty()) { // remove the block if there is no more corrupted replicas // 将数据块blk从集合corruptReplicasMap中移除 corruptReplicasMap.remove(blk); } // 返回true,表明移除成功 return true; } // 其他情况下直接返回false,表明移除失败 return false; }三、getCorruptReplicaBlockIds()
获取指定大小和起始数据块ID的损坏数据块ID数组
/** * Return a range of corrupt replica block ids. Up to numExpectedBlocks * blocks starting at the next block after startingBlockId are returned * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId * is null, up to numExpectedBlocks blocks are returned from the beginning. * If startingBlockId cannot be found, null is returned. * 获取指定大小和起始数据块ID的损坏数据块ID数组 * * @param numExpectedBlocks Number of block ids to return. * 0 <= numExpectedBlocks <= 100 * @param startingBlockId Block id from which to start. If null, start at * beginning. * @return Up to numExpectedBlocks blocks from startingBlockId if it exists * */ long[] getCorruptReplicaBlockIds(int numExpectedBlocks, Long startingBlockId) { // 校验numExpectedBlocks,需要获取的数据块ID数组最多有100个元素 if (numExpectedBlocks < 0 || numExpectedBlocks > 100) { return null; } // 获得corruptReplicasMap集合的数据块迭代器blockIt Iterator<Block> blockIt = corruptReplicasMap.keySet().iterator(); // if the starting block id was specified, iterate over keys until // we find the matching block. If we find a matching block, break // to leave the iterator on the next block after the specified block. // 如果设定了起始数据块艾迪startingBlockId if (startingBlockId != null) { boolean isBlockFound = false; // 遍历corruptReplicasMap,查看是否存在startingBlockId,如果存在,跳出循环,此时已记录住迭代器的位置了 while (blockIt.hasNext()) { Block b = blockIt.next(); if (b.getBlockId() == startingBlockId) { isBlockFound = true; break; } } // 如果不存在,直接返回null if (!isBlockFound) { return null; } } // 构造一个存储数据块ID的列表corruptReplicaBlockIds ArrayList<Long> corruptReplicaBlockIds = new ArrayList<Long>(); // append up to numExpectedBlocks blockIds to our list // 遍历corruptReplicasMap,将最多numExpectedBlocks个数据块ID添加到列表corruptReplicaBlockIds, // 此时的迭代器可能不是从头开始取数据的,在startingBlockId需要并存在的情况下,它是从下一个元素开始获取的 for(int i=0; i<numExpectedBlocks && blockIt.hasNext(); i++) { corruptReplicaBlockIds.add(blockIt.next().getBlockId()); } // 将数据块ID列表corruptReplicaBlockIds转换成数组ret long[] ret = new long[corruptReplicaBlockIds.size()]; for(int i=0; i<ret.length; i++) { ret[i] = corruptReplicaBlockIds.get(i); } // 返回数据块ID数组ret return ret; }四、getNodes()
根据损坏数据块获取对应数据节点集合
/** * Get Nodes which have corrupt replicas of Block * 根据损坏数据块获取对应数据节点集合 * * @param blk Block for which nodes are requested * @return collection of nodes. Null if does not exists */ Collection<DatanodeDescriptor> getNodes(Block blk) { Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk); if (nodes == null) return null; return nodes.keySet(); }五、isReplicaCorrupt()
检测指定数据块和数据节点是否为损坏的
/** * Check if replica belonging to Datanode is corrupt * 检测指定数据块和数据节点是否为损坏的 * * @param blk Block to check * @param node DatanodeDescriptor which holds the replica * @return true if replica is corrupt, false if does not exists in this map */ boolean isReplicaCorrupt(Block blk, DatanodeDescriptor node) { Collection<DatanodeDescriptor> nodes = getNodes(blk); return ((nodes != null) && (nodes.contains(node))); }六、numCorruptReplicas()
获取给定数据块对应数据节点数量
// 获取给定数据块对应数据节点数量 int numCorruptReplicas(Block blk) { Collection<DatanodeDescriptor> nodes = getNodes(blk); return (nodes == null) ? 0 : nodes.size(); }七、size()
获取损坏数据块数量
// 获取损坏数据块数量 int size() { return corruptReplicasMap.size(); }