diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2eac881529..38cb3df00a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -971,6 +971,9 @@ Release 2.9.0 - UNRELEASED HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had an error (Rakesh R via cmccabe) + HDFS-9260. Improve the performance and GC friendliness of NameNode startup + and full block reports (Staffan Friberg via cmccabe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 5217740799..76915cb1a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -219,6 +219,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2; public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit"; public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4; + public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY + = "dfs.namenode.storageinfo.defragment.interval.ms"; + public static final int + DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT = 10 * 60 * 1000; + public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY + = "dfs.namenode.storageinfo.defragment.timeout.ms"; + public static final int + DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT = 4; + public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY + = "dfs.namenode.storageinfo.defragment.ratio"; + public static final double + DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT = 0.75; public static final String DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter"; /* Phrased as below to avoid javac inlining as a constant, to match the behavior when this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 81c23e199e..79113dd8b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -174,12 +174,13 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, @Override public DatanodeCommand blockReport(DatanodeRegistration registration, - String poolId, StorageBlockReport[] reports, BlockReportContext context) + String poolId, StorageBlockReport[] reports, + BlockReportContext context) throws IOException { BlockReportRequestProto.Builder builder = BlockReportRequestProto .newBuilder().setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); - + boolean useBlocksBuffer = registration.getNamespaceInfo() .isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 4b6baf21d1..e70cdf0bdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -824,8 +824,8 @@ public static JournalInfoProto convert(JournalInfo j) { public static BlockReportContext convert(BlockReportContextProto proto) { - return new BlockReportContext(proto.getTotalRpcs(), - proto.getCurRpc(), proto.getId(), proto.getLeaseId()); + return new BlockReportContext(proto.getTotalRpcs(), proto.getCurRpc(), + proto.getId(), proto.getLeaseId(), proto.getSorted()); } public static BlockReportContextProto convert(BlockReportContext context) { @@ -834,6 +834,7 @@ public static BlockReportContextProto convert(BlockReportContext context) { setCurRpc(context.getCurRpc()). setId(context.getReportId()). setLeaseId(context.getLeaseId()). + setSorted(context.isSorted()). build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index e9fa123413..5da2140a5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -18,8 +18,9 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; -import java.util.LinkedList; +import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; @@ -55,19 +56,9 @@ public abstract class BlockInfo extends Block /** For implementing {@link LightWeightGSet.LinkedElement} interface. */ private LightWeightGSet.LinkedElement nextLinkedElement; - /** - * This array contains triplets of references. For each i-th storage, the - * block belongs to triplets[3*i] is the reference to the - * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are - * references to the previous and the next blocks, respectively, in the list - * of blocks belonging to this storage. - * - * Using previous and next in Object triplets is done instead of a - * {@link LinkedList} list to efficiently use memory. With LinkedList the cost - * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16 - * bytes using the triplets. - */ - protected Object[] triplets; + + // Storages this block is replicated on + protected DatanodeStorageInfo[] storages; private BlockUnderConstructionFeature uc; @@ -77,14 +68,14 @@ public abstract class BlockInfo extends Block * in the block group */ public BlockInfo(short size) { - this.triplets = new Object[3 * size]; + this.storages = new DatanodeStorageInfo[size]; this.bcId = INVALID_INODE_ID; this.replication = isStriped() ? 0 : size; } public BlockInfo(Block blk, short size) { super(blk); - this.triplets = new Object[3*size]; + this.storages = new DatanodeStorageInfo[size]; this.bcId = INVALID_INODE_ID; this.replication = isStriped() ? 0 : size; } @@ -109,79 +100,52 @@ public boolean isDeleted() { return bcId == INVALID_INODE_ID; } + public Iterator getStorageInfos() { + return new Iterator() { + + private int index = 0; + + @Override + public boolean hasNext() { + while (index < storages.length && storages[index] == null) { + index++; + } + return index < storages.length; + } + + @Override + public DatanodeStorageInfo next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return storages[index++]; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Sorry. can't remove."); + } + }; + } + public DatanodeDescriptor getDatanode(int index) { DatanodeStorageInfo storage = getStorageInfo(index); return storage == null ? null : storage.getDatanodeDescriptor(); } DatanodeStorageInfo getStorageInfo(int index) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; - return (DatanodeStorageInfo)triplets[index*3]; - } - - BlockInfo getPrevious(int index) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound"; - BlockInfo info = (BlockInfo)triplets[index*3+1]; - assert info == null || - info.getClass().getName().startsWith(BlockInfo.class.getName()) : - "BlockInfo is expected at " + index*3; - return info; - } - - BlockInfo getNext(int index) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound"; - BlockInfo info = (BlockInfo)triplets[index*3+2]; - assert info == null || info.getClass().getName().startsWith( - BlockInfo.class.getName()) : - "BlockInfo is expected at " + index*3; - return info; + assert this.storages != null : "BlockInfo is not initialized"; + return storages[index]; } void setStorageInfo(int index, DatanodeStorageInfo storage) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; - triplets[index*3] = storage; - } - - /** - * Return the previous block on the block list for the datanode at - * position index. Set the previous block on the list to "to". - * - * @param index - the datanode index - * @param to - block to be set to previous on the list of blocks - * @return current previous block on the list of blocks - */ - BlockInfo setPrevious(int index, BlockInfo to) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound"; - BlockInfo info = (BlockInfo) triplets[index*3+1]; - triplets[index*3+1] = to; - return info; - } - - /** - * Return the next block on the block list for the datanode at - * position index. Set the next block on the list to "to". - * - * @param index - the datanode index - * @param to - block to be set to next on the list of blocks - * @return current next block on the list of blocks - */ - BlockInfo setNext(int index, BlockInfo to) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound"; - BlockInfo info = (BlockInfo) triplets[index*3+2]; - triplets[index*3+2] = to; - return info; + assert this.storages != null : "BlockInfo is not initialized"; + this.storages[index] = storage; } public int getCapacity() { - assert this.triplets != null : "BlockInfo is not initialized"; - assert triplets.length % 3 == 0 : "Malformed BlockInfo"; - return triplets.length / 3; + assert this.storages != null : "BlockInfo is not initialized"; + return storages.length; } /** @@ -240,80 +204,6 @@ int findStorageInfo(DatanodeStorageInfo storageInfo) { return -1; } - /** - * Insert this block into the head of the list of blocks - * related to the specified DatanodeStorageInfo. - * If the head is null then form a new list. - * @return current block as the new head of the list. - */ - BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) { - int dnIndex = this.findStorageInfo(storage); - assert dnIndex >= 0 : "Data node is not found: current"; - assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : - "Block is already in the list and cannot be inserted."; - this.setPrevious(dnIndex, null); - this.setNext(dnIndex, head); - if (head != null) { - head.setPrevious(head.findStorageInfo(storage), this); - } - return this; - } - - /** - * Remove this block from the list of blocks - * related to the specified DatanodeStorageInfo. - * If this block is the head of the list then return the next block as - * the new head. - * @return the new head of the list or null if the list becomes - * empy after deletion. - */ - BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) { - if (head == null) { - return null; - } - int dnIndex = this.findStorageInfo(storage); - if (dnIndex < 0) { // this block is not on the data-node list - return head; - } - - BlockInfo next = this.getNext(dnIndex); - BlockInfo prev = this.getPrevious(dnIndex); - this.setNext(dnIndex, null); - this.setPrevious(dnIndex, null); - if (prev != null) { - prev.setNext(prev.findStorageInfo(storage), next); - } - if (next != null) { - next.setPrevious(next.findStorageInfo(storage), prev); - } - if (this == head) { // removing the head - head = next; - } - return head; - } - - /** - * Remove this block from the list of blocks related to the specified - * DatanodeDescriptor. Insert it into the head of the list of blocks. - * - * @return the new head of the list. - */ - public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage, - int curIndex, int headIndex) { - if (head == this) { - return this; - } - BlockInfo next = this.setNext(curIndex, head); - BlockInfo prev = this.setPrevious(curIndex, null); - - head.setPrevious(headIndex, this); - prev.setNext(prev.findStorageInfo(storage), next); - if (next != null) { - next.setPrevious(next.findStorageInfo(storage), prev); - } - return this; - } - @Override public int hashCode() { // Super implementation is sufficient diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index 746e29895e..f729c4f3bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@ -35,20 +35,20 @@ public BlockInfoContiguous(Block blk, short size) { } /** - * Ensure that there is enough space to include num more triplets. - * @return first free triplet index. + * Ensure that there is enough space to include num more storages. + * @return first free storage index. */ private int ensureCapacity(int num) { - assert this.triplets != null : "BlockInfo is not initialized"; + assert this.storages != null : "BlockInfo is not initialized"; int last = numNodes(); - if (triplets.length >= (last+num)*3) { + if (storages.length >= (last+num)) { return last; } /* Not enough space left. Create a new array. Should normally * happen only when replication is manually increased by the user. */ - Object[] old = triplets; - triplets = new Object[(last+num)*3]; - System.arraycopy(old, 0, triplets, 0, last * 3); + DatanodeStorageInfo[] old = storages; + storages = new DatanodeStorageInfo[(last+num)]; + System.arraycopy(old, 0, storages, 0, last); return last; } @@ -57,8 +57,6 @@ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { // find the last null node int lastNode = ensureCapacity(1); setStorageInfo(lastNode, storage); - setNext(lastNode, null); - setPrevious(lastNode, null); return true; } @@ -68,25 +66,18 @@ boolean removeStorage(DatanodeStorageInfo storage) { if (dnIndex < 0) { // the node is not found return false; } - assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : - "Block is still in the list and must be removed first."; // find the last not null node int lastNode = numNodes()-1; - // replace current node triplet by the lastNode one + // replace current node entry by the lastNode one setStorageInfo(dnIndex, getStorageInfo(lastNode)); - setNext(dnIndex, getNext(lastNode)); - setPrevious(dnIndex, getPrevious(lastNode)); - // set the last triplet to null + // set the last entry to null setStorageInfo(lastNode, null); - setNext(lastNode, null); - setPrevious(lastNode, null); return true; } @Override public int numNodes() { - assert this.triplets != null : "BlockInfo is not initialized"; - assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + assert this.storages != null : "BlockInfo is not initialized"; for (int idx = getCapacity()-1; idx >= 0; idx--) { if (getDatanode(idx) != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index 20d5858b11..c6e26ecc84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -26,21 +26,20 @@ /** * Subclass of {@link BlockInfo}, presenting a block group in erasure coding. * - * We still use triplets to store DatanodeStorageInfo for each block in the - * block group, as well as the previous/next block in the corresponding - * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units + * We still use a storage array to store DatanodeStorageInfo for each block in + * the block group. For a (m+k) block group, the first (m+k) storage units * are sorted and strictly mapped to the corresponding block. * * Normally each block belonging to group is stored in only one DataNode. - * However, it is possible that some block is over-replicated. Thus the triplet + * However, it is possible that some block is over-replicated. Thus the storage * array's size can be larger than (m+k). Thus currently we use an extra byte - * array to record the block index for each triplet. + * array to record the block index for each entry. */ @InterfaceAudience.Private public class BlockInfoStriped extends BlockInfo { private final ErasureCodingPolicy ecPolicy; /** - * Always the same size with triplets. Record the block index for each triplet + * Always the same size with storage. Record the block index for each entry * TODO: actually this is only necessary for over-replicated block. Thus can * be further optimized to save memory usage. */ @@ -104,7 +103,7 @@ private int findSlot() { return i; } } - // need to expand the triplet size + // need to expand the storage size ensureCapacity(i + 1, true); return i; } @@ -130,8 +129,6 @@ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { private void addStorage(DatanodeStorageInfo storage, int index, int blockIndex) { setStorageInfo(index, storage); - setNext(index, null); - setPrevious(index, null); indices[index] = (byte) blockIndex; } @@ -173,26 +170,22 @@ boolean removeStorage(DatanodeStorageInfo storage) { if (dnIndex < 0) { // the node is not found return false; } - assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : - "Block is still in the list and must be removed first."; - // set the triplet to null + // set the entry to null setStorageInfo(dnIndex, null); - setNext(dnIndex, null); - setPrevious(dnIndex, null); indices[dnIndex] = -1; return true; } private void ensureCapacity(int totalSize, boolean keepOld) { if (getCapacity() < totalSize) { - Object[] old = triplets; + DatanodeStorageInfo[] old = storages; byte[] oldIndices = indices; - triplets = new Object[totalSize * 3]; + storages = new DatanodeStorageInfo[totalSize]; indices = new byte[totalSize]; initIndices(); if (keepOld) { - System.arraycopy(old, 0, triplets, 0, old.length); + System.arraycopy(old, 0, storages, 0, old.length); System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length); } } @@ -214,8 +207,7 @@ public final boolean isStriped() { @Override public int numNodes() { - assert this.triplets != null : "BlockInfo is not initialized"; - assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + assert this.storages != null : "BlockInfo is not initialized"; int num = 0; for (int idx = getCapacity()-1; idx >= 0; idx--) { if (getStorageInfo(idx) != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 587e6b640e..25cec8a7c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; @@ -93,6 +94,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.hdfs.util.FoldedTreeSet; import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -106,6 +108,7 @@ import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.VersionInfo; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -195,7 +198,12 @@ public int getPendingDataNodeMessageCount() { /**replicationRecheckInterval is how often namenode checks for new replication work*/ private final long replicationRecheckInterval; - + + /** How often to check and the limit for the storageinfo efficiency. */ + private final long storageInfoDefragmentInterval; + private final long storageInfoDefragmentTimeout; + private final double storageInfoDefragmentRatio; + /** * Mapping: Block -> { BlockCollection, datanodes, self ref } * Updated only in response to client-sent information. @@ -204,6 +212,10 @@ public int getPendingDataNodeMessageCount() { /** Replication thread. */ final Daemon replicationThread = new Daemon(new ReplicationMonitor()); + + /** StorageInfoDefragmenter thread. */ + private final Daemon storageInfoDefragmenterThread = + new Daemon(new StorageInfoDefragmenter()); /** Block report thread for handling async reports. */ private final BlockReportProcessingThread blockReportThread = @@ -376,7 +388,20 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, this.replicationRecheckInterval = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L; - + + this.storageInfoDefragmentInterval = + conf.getLong( + DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY, + DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT); + this.storageInfoDefragmentTimeout = + conf.getLong( + DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY, + DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT); + this.storageInfoDefragmentRatio = + conf.getDouble( + DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY, + DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT); + this.encryptDataTransfer = conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); @@ -508,6 +533,8 @@ public void activate(Configuration conf, long blockTotal) { datanodeManager.activate(conf); this.replicationThread.setName("ReplicationMonitor"); this.replicationThread.start(); + storageInfoDefragmenterThread.setName("StorageInfoMonitor"); + storageInfoDefragmenterThread.start(); this.blockReportThread.start(); mxBeanName = MBeans.register("NameNode", "BlockStats", this); bmSafeMode.activate(blockTotal); @@ -517,8 +544,10 @@ public void close() { bmSafeMode.close(); try { replicationThread.interrupt(); + storageInfoDefragmenterThread.interrupt(); blockReportThread.interrupt(); replicationThread.join(3000); + storageInfoDefragmenterThread.join(3000); blockReportThread.join(3000); } catch (InterruptedException ie) { } @@ -1165,9 +1194,15 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode, /** Remove the blocks associated to the given datanode. */ void removeBlocksAssociatedTo(final DatanodeDescriptor node) { - final Iterator it = node.getBlockIterator(); - while(it.hasNext()) { - removeStoredBlock(it.next(), node); + for (DatanodeStorageInfo storage : node.getStorageInfos()) { + final Iterator it = storage.getBlockIterator(); + while (it.hasNext()) { + BlockInfo block = it.next(); + // DatanodeStorageInfo must be removed using the iterator to avoid + // ConcurrentModificationException in the underlying storage + it.remove(); + removeStoredBlock(block, node); + } } // Remove all pending DN messages referencing this DN. pendingDNMessages.removeAllMessagesForDatanode(node); @@ -1183,6 +1218,9 @@ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) { DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); while(it.hasNext()) { BlockInfo block = it.next(); + // DatanodeStorageInfo must be removed using the iterator to avoid + // ConcurrentModificationException in the underlying storage + it.remove(); removeStoredBlock(block, node); final Block b = getBlockOnStorage(block, storageInfo); if (b != null) { @@ -2033,8 +2071,8 @@ private static class BlockInfoToAdd { */ public boolean processReport(final DatanodeID nodeID, final DatanodeStorage storage, - final BlockListAsLongs newReport, BlockReportContext context, - boolean lastStorageInRpc) throws IOException { + final BlockListAsLongs newReport, + BlockReportContext context, boolean lastStorageInRpc) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); //after acquiring write lock final long endTime; @@ -2079,7 +2117,8 @@ public boolean processReport(final DatanodeID nodeID, nodeID.getDatanodeUuid()); processFirstBlockReport(storageInfo, newReport); } else { - invalidatedBlocks = processReport(storageInfo, newReport); + invalidatedBlocks = processReport(storageInfo, newReport, + context != null ? context.isSorted() : false); } storageInfo.receivedBlockReport(); @@ -2149,6 +2188,9 @@ private void removeZombieReplicas(BlockReportContext context, // TODO: remove this assumption in case we want to put a block on // more than one storage on a datanode (and because it's a difficult // assumption to really enforce) + // DatanodeStorageInfo must be removed using the iterator to avoid + // ConcurrentModificationException in the underlying storage + iter.remove(); removeStoredBlock(block, zombie.getDatanodeDescriptor()); Block b = getBlockOnStorage(block, zombie); if (b != null) { @@ -2238,7 +2280,7 @@ void rescanPostponedMisreplicatedBlocks() { private Collection processReport( final DatanodeStorageInfo storageInfo, - final BlockListAsLongs report) throws IOException { + final BlockListAsLongs report, final boolean sorted) throws IOException { // Normal case: // Modify the (block-->datanode) map, according to the difference // between the old and new block report. @@ -2248,9 +2290,29 @@ private Collection processReport( Collection toInvalidate = new LinkedList<>(); Collection toCorrupt = new LinkedList<>(); Collection toUC = new LinkedList<>(); - reportDiff(storageInfo, report, - toAdd, toRemove, toInvalidate, toCorrupt, toUC); - + + Iterable sortedReport; + if (!sorted) { + blockLog.warn("BLOCK* processReport: Report from the DataNode ({}) is " + + "unsorted. This will cause overhead on the NameNode " + + "which needs to sort the Full BR. Please update the " + + "DataNode to the same version of Hadoop HDFS as the " + + "NameNode ({}).", + storageInfo.getDatanodeDescriptor().getDatanodeUuid(), + VersionInfo.getVersion()); + Set set = new FoldedTreeSet<>(); + for (BlockReportReplica iblk : report) { + set.add(new BlockReportReplica(iblk)); + } + sortedReport = set; + } else { + sortedReport = report; + } + + reportDiffSorted(storageInfo, sortedReport, + toAdd, toRemove, toInvalidate, toCorrupt, toUC); + + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Process the blocks on each queue for (StatefulBlockInfo b : toUC) { @@ -2399,126 +2461,111 @@ private void processFirstBlockReport( } } - private void reportDiff(DatanodeStorageInfo storageInfo, - BlockListAsLongs newReport, + private void reportDiffSorted(DatanodeStorageInfo storageInfo, + Iterable newReport, Collection toAdd, // add to DatanodeDescriptor Collection toRemove, // remove from DatanodeDescriptor Collection toInvalidate, // should be removed from DN Collection toCorrupt, // add to corrupt replicas list Collection toUC) { // add to under-construction list - // place a delimiter in the list which separates blocks - // that have been reported from those that have not - Block delimiterBlock = new Block(); - BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock, - (short) 1); - AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock); - assert result == AddBlockResult.ADDED - : "Delimiting block cannot be present in the node"; - int headIndex = 0; //currently the delimiter is in the head of the list - int curIndex; + // The blocks must be sorted and the storagenodes blocks must be sorted + Iterator storageBlocksIterator = storageInfo.getBlockIterator(); + DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); + BlockInfo storageBlock = null; - if (newReport == null) { - newReport = BlockListAsLongs.EMPTY; - } - // scan the report and process newly reported blocks - for (BlockReportReplica iblk : newReport) { - ReplicaState iState = iblk.getState(); - BlockInfo storedBlock = processReportedBlock(storageInfo, - iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); + for (BlockReportReplica replica : newReport) { - // move block to the head of the list - if (storedBlock != null && - (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) { - headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex); + long replicaID = replica.getBlockId(); + if (BlockIdManager.isStripedBlockID(replicaID) + && (!hasNonEcBlockUsingStripedID || + !blocksMap.containsBlock(replica))) { + replicaID = BlockIdManager.convertToStripedID(replicaID); } + + ReplicaState reportedState = replica.getState(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Reported block " + replica + + " on " + dn + " size " + replica.getNumBytes() + + " replicaState = " + reportedState); + } + + if (shouldPostponeBlocksFromFuture + && isGenStampInFuture(replica)) { + queueReportedBlock(storageInfo, replica, reportedState, + QUEUE_REASON_FUTURE_GENSTAMP); + continue; + } + + if (storageBlock == null && storageBlocksIterator.hasNext()) { + storageBlock = storageBlocksIterator.next(); + } + + do { + int cmp; + if (storageBlock == null || + (cmp = Long.compare(replicaID, storageBlock.getBlockId())) < 0) { + // Check if block is available in NN but not yet on this storage + BlockInfo nnBlock = blocksMap.getStoredBlock(new Block(replicaID)); + if (nnBlock != null) { + reportDiffSortedInner(storageInfo, replica, reportedState, + nnBlock, toAdd, toCorrupt, toUC); + } else { + // Replica not found anywhere so it should be invalidated + toInvalidate.add(new Block(replica)); + } + break; + } else if (cmp == 0) { + // Replica matched current storageblock + reportDiffSortedInner(storageInfo, replica, reportedState, + storageBlock, toAdd, toCorrupt, toUC); + storageBlock = null; + } else { + // replica has higher ID than storedBlock + // Remove all stored blocks with IDs lower than replica + do { + toRemove.add(storageBlock); + storageBlock = storageBlocksIterator.hasNext() + ? storageBlocksIterator.next() : null; + } while (storageBlock != null && + Long.compare(replicaID, storageBlock.getBlockId()) > 0); + } + } while (storageBlock != null); } - // collect blocks that have not been reported - // all of them are next to the delimiter - Iterator it = - storageInfo.new BlockIterator(delimiter.getNext(0)); - while (it.hasNext()) { - toRemove.add(it.next()); + // Iterate any remaing blocks that have not been reported and remove them + while (storageBlocksIterator.hasNext()) { + toRemove.add(storageBlocksIterator.next()); } - storageInfo.removeBlock(delimiter); } - /** - * Process a block replica reported by the data-node. - * No side effects except adding to the passed-in Collections. - * - *
    - *
  1. If the block is not known to the system (not in blocksMap) then the - * data-node should be notified to invalidate this block.
  2. - *
  3. If the reported replica is valid that is has the same generation stamp - * and length as recorded on the name-node, then the replica location should - * be added to the name-node.
  4. - *
  5. If the reported replica is not valid, then it is marked as corrupt, - * which triggers replication of the existing valid replicas. - * Corrupt replicas are removed from the system when the block - * is fully replicated.
  6. - *
  7. If the reported replica is for a block currently marked "under - * construction" in the NN, then it should be added to the - * BlockUnderConstructionFeature's list of replicas.
  8. - *
- * - * @param storageInfo DatanodeStorageInfo that sent the report. - * @param block reported block replica - * @param reportedState reported replica state - * @param toAdd add to DatanodeDescriptor - * @param toInvalidate missing blocks (not in the blocks map) - * should be removed from the data-node - * @param toCorrupt replicas with unexpected length or generation stamp; - * add to corrupt replicas - * @param toUC replicas of blocks currently under construction - * @return the up-to-date stored block, if it should be kept. - * Otherwise, null. - */ - private BlockInfo processReportedBlock( + private void reportDiffSortedInner( final DatanodeStorageInfo storageInfo, - final Block block, final ReplicaState reportedState, + final BlockReportReplica replica, final ReplicaState reportedState, + final BlockInfo storedBlock, final Collection toAdd, - final Collection toInvalidate, final Collection toCorrupt, final Collection toUC) { - - DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); - if(LOG.isDebugEnabled()) { - LOG.debug("Reported block " + block - + " on " + dn + " size " + block.getNumBytes() - + " replicaState = " + reportedState); - } - - if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) { - queueReportedBlock(storageInfo, block, reportedState, - QUEUE_REASON_FUTURE_GENSTAMP); - return null; - } - - // find block by blockId - BlockInfo storedBlock = getStoredBlock(block); - if(storedBlock == null) { - // If blocksMap does not contain reported block id, - // the replica should be removed from the data-node. - toInvalidate.add(new Block(block)); - return null; - } + assert replica != null; + assert storedBlock != null; + + DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); BlockUCState ucState = storedBlock.getBlockUCState(); - + // Block is on the NN - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("In memory blockUCState = " + ucState); } // Ignore replicas already scheduled to be removed from the DN - if(invalidateBlocks.contains(dn, block)) { - return storedBlock; + if (invalidateBlocks.contains(dn, replica)) { + return; } - BlockToMarkCorrupt c = checkReplicaCorrupt( - block, reportedState, storedBlock, ucState, dn); + BlockToMarkCorrupt c = checkReplicaCorrupt(replica, reportedState, + storedBlock, ucState, dn); if (c != null) { if (shouldPostponeBlocksFromFuture) { // If the block is an out-of-date generation stamp or state, @@ -2532,23 +2579,16 @@ private BlockInfo processReportedBlock( } else { toCorrupt.add(c); } - return storedBlock; + } else if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { + toUC.add(new StatefulBlockInfo(storedBlock, new Block(replica), + reportedState)); + } else if (reportedState == ReplicaState.FINALIZED && + (storedBlock.findStorageInfo(storageInfo) == -1 || + corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { + // Add replica if appropriate. If the replica was previously corrupt + // but now okay, it might need to be updated. + toAdd.add(new BlockInfoToAdd(storedBlock, replica)); } - - if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { - toUC.add(new StatefulBlockInfo(storedBlock, - new Block(block), reportedState)); - return storedBlock; - } - - // Add replica if appropriate. If the replica was previously corrupt - // but now okay, it might need to be updated. - if (reportedState == ReplicaState.FINALIZED - && (storedBlock.findStorageInfo(storageInfo) == -1 || - corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { - toAdd.add(new BlockInfoToAdd(storedBlock, block)); - } - return storedBlock; } /** @@ -2774,7 +2814,7 @@ private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported, } // just add it - AddBlockResult result = storageInfo.addBlock(storedBlock, reported); + AddBlockResult result = storageInfo.addBlockInitial(storedBlock, reported); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); @@ -3497,40 +3537,75 @@ private void processAndHandleReportedBlock( DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { - // blockReceived reports a finalized block - Collection toAdd = new LinkedList<>(); - Collection toInvalidate = new LinkedList(); - Collection toCorrupt = new LinkedList(); - Collection toUC = new LinkedList(); + final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); - processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate, - toCorrupt, toUC); - // the block is only in one of the to-do lists - // if it is in none then data-node already has it - assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1 - : "The block should be only in one of the lists."; + if(LOG.isDebugEnabled()) { + LOG.debug("Reported block " + block + + " on " + node + " size " + block.getNumBytes() + + " replicaState = " + reportedState); + } - for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b, storageInfo); + if (shouldPostponeBlocksFromFuture && + isGenStampInFuture(block)) { + queueReportedBlock(storageInfo, block, reportedState, + QUEUE_REASON_FUTURE_GENSTAMP); + return; } - long numBlocksLogged = 0; - for (BlockInfoToAdd b : toAdd) { - addStoredBlock(b.stored, b.reported, storageInfo, delHintNode, - numBlocksLogged < maxNumBlocksToLog); - numBlocksLogged++; - } - if (numBlocksLogged > maxNumBlocksToLog) { - blockLog.debug("BLOCK* addBlock: logged info for {} of {} reported.", - maxNumBlocksToLog, numBlocksLogged); - } - for (Block b : toInvalidate) { + + // find block by blockId + BlockInfo storedBlock = getStoredBlock(block); + if(storedBlock == null) { + // If blocksMap does not contain reported block id, + // the replica should be removed from the data-node. blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " + - "belong to any file", b, node, b.getNumBytes()); - addToInvalidates(b, node); + "belong to any file", block, node, block.getNumBytes()); + addToInvalidates(new Block(block), node); + return; } - for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, storageInfo, node); + + BlockUCState ucState = storedBlock.getBlockUCState(); + // Block is on the NN + if(LOG.isDebugEnabled()) { + LOG.debug("In memory blockUCState = " + ucState); + } + + // Ignore replicas already scheduled to be removed from the DN + if(invalidateBlocks.contains(node, block)) { + return; + } + + BlockToMarkCorrupt c = checkReplicaCorrupt( + block, reportedState, storedBlock, ucState, node); + if (c != null) { + if (shouldPostponeBlocksFromFuture) { + // If the block is an out-of-date generation stamp or state, + // but we're the standby, we shouldn't treat it as corrupt, + // but instead just queue it for later processing. + // TODO: Pretty confident this should be s/storedBlock/block below, + // since we should be postponing the info of the reported block, not + // the stored block. See HDFS-6289 for more context. + queueReportedBlock(storageInfo, storedBlock, reportedState, + QUEUE_REASON_CORRUPT_STATE); + } else { + markBlockAsCorrupt(c, storageInfo, node); + } + return; + } + + if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { + addStoredBlockUnderConstruction( + new StatefulBlockInfo(storedBlock, new Block(block), reportedState), + storageInfo); + return; + } + + // Add replica if appropriate. If the replica was previously corrupt + // but now okay, it might need to be updated. + if (reportedState == ReplicaState.FINALIZED + && (storedBlock.findStorageInfo(storageInfo) == -1 || + corruptReplicas.isReplicaCorrupt(storedBlock, node))) { + addStoredBlock(storedBlock, block, storageInfo, delHintNode, true); } } @@ -4060,6 +4135,87 @@ public void run() { } } + /** + * Runnable that monitors the fragmentation of the StorageInfo TreeSet and + * compacts it when it falls under a certain threshold. + */ + private class StorageInfoDefragmenter implements Runnable { + + @Override + public void run() { + while (namesystem.isRunning()) { + try { + // Check storage efficiency only when active NN is out of safe mode. + if (isPopulatingReplQueues()) { + scanAndCompactStorages(); + } + Thread.sleep(storageInfoDefragmentInterval); + } catch (Throwable t) { + if (!namesystem.isRunning()) { + LOG.info("Stopping thread."); + if (!(t instanceof InterruptedException)) { + LOG.info("Received an exception while shutting down.", t); + } + break; + } else if (!checkNSRunning && t instanceof InterruptedException) { + LOG.info("Stopping for testing."); + break; + } + LOG.error("Thread received Runtime exception.", t); + terminate(1, t); + } + } + } + + private void scanAndCompactStorages() throws InterruptedException { + ArrayList datanodesAndStorages = new ArrayList<>(); + for (DatanodeDescriptor node + : datanodeManager.getDatanodeListForReport(DatanodeReportType.ALL)) { + for (DatanodeStorageInfo storage : node.getStorageInfos()) { + try { + namesystem.readLock(); + double ratio = storage.treeSetFillRatio(); + if (ratio < storageInfoDefragmentRatio) { + datanodesAndStorages.add(node.getDatanodeUuid()); + datanodesAndStorages.add(storage.getStorageID()); + } + LOG.info("StorageInfo TreeSet fill ratio {} : {}{}", + storage.getStorageID(), ratio, + (ratio < storageInfoDefragmentRatio) + ? " (queued for defragmentation)" : ""); + } finally { + namesystem.readUnlock(); + } + } + } + if (!datanodesAndStorages.isEmpty()) { + for (int i = 0; i < datanodesAndStorages.size(); i += 2) { + namesystem.writeLock(); + try { + DatanodeStorageInfo storage = datanodeManager. + getDatanode(datanodesAndStorages.get(i)). + getStorageInfo(datanodesAndStorages.get(i + 1)); + if (storage != null) { + boolean aborted = + !storage.treeSetCompact(storageInfoDefragmentTimeout); + if (aborted) { + // Compaction timed out, reset iterator to continue with + // the same storage next iteration. + i -= 2; + } + LOG.info("StorageInfo TreeSet defragmented {} : {}{}", + storage.getStorageID(), storage.treeSetFillRatio(), + aborted ? " (aborted)" : ""); + } + } finally { + namesystem.writeUnlock(); + } + // Wait between each iteration + Thread.sleep(1000); + } + } + } + } /** * Compute block replication and block invalidation work that can be scheduled diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 47a21fec53..71d0598d9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import java.util.Collections; import java.util.Iterator; import org.apache.hadoop.hdfs.protocol.Block; @@ -30,37 +31,6 @@ * the datanodes that store the block. */ class BlocksMap { - private static class StorageIterator implements Iterator { - private final BlockInfo blockInfo; - private int nextIdx = 0; - - StorageIterator(BlockInfo blkInfo) { - this.blockInfo = blkInfo; - } - - @Override - public boolean hasNext() { - if (blockInfo == null) { - return false; - } - while (nextIdx < blockInfo.getCapacity() && - blockInfo.getDatanode(nextIdx) == null) { - // note that for striped blocks there may be null in the triplets - nextIdx++; - } - return nextIdx < blockInfo.getCapacity(); - } - - @Override - public DatanodeStorageInfo next() { - return blockInfo.getStorageInfo(nextIdx++); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Sorry. can't remove."); - } - } /** Constant {@link LightWeightGSet} capacity. */ private final int capacity; @@ -132,6 +102,16 @@ void removeBlock(Block block) { } } + /** + * Check if BlocksMap contains the block. + * + * @param b Block to check + * @return true if block is in the map, otherwise false + */ + boolean containsBlock(Block b) { + return blocks.contains(b); + } + /** Returns the block object if it exists in the map. */ BlockInfo getStoredBlock(Block b) { return blocks.get(b); @@ -142,7 +122,9 @@ BlockInfo getStoredBlock(Block b) { * returns {@link Iterable} of the storages the block belongs to. */ Iterable getStorages(Block b) { - return getStorages(blocks.get(b)); + BlockInfo block = blocks.get(b); + return block != null ? getStorages(block) + : Collections.emptyList(); } /** @@ -150,12 +132,16 @@ Iterable getStorages(Block b) { * returns {@link Iterable} of the storages the block belongs to. */ Iterable getStorages(final BlockInfo storedBlock) { - return new Iterable() { - @Override - public Iterator iterator() { - return new StorageIterator(storedBlock); - } - }; + if (storedBlock == null) { + return Collections.emptyList(); + } else { + return new Iterable() { + @Override + public Iterator iterator() { + return storedBlock.getStorageInfos(); + } + }; + } } /** counts number of containing nodes. Better than using iterator. */ @@ -174,7 +160,7 @@ boolean removeNode(Block b, DatanodeDescriptor node) { if (info == null) return false; - // remove block from the data-node list and the node from the block info + // remove block from the data-node set and the node from the block info boolean removed = removeBlock(node, info); if (info.hasNoStorage() // no datanodes left @@ -185,7 +171,7 @@ boolean removeNode(Block b, DatanodeDescriptor node) { } /** - * Remove block from the list of blocks belonging to the data-node. Remove + * Remove block from the set of blocks belonging to the data-node. Remove * data-node from the block. */ static boolean removeBlock(DatanodeDescriptor dn, BlockInfo b) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 1f1b24b12c..c4729eafec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.util.FoldedTreeSet; import com.google.common.annotations.VisibleForTesting; @@ -85,31 +86,6 @@ public void updateFromStorage(DatanodeStorage storage) { storageType = storage.getStorageType(); } - /** - * Iterates over the list of blocks belonging to the data-node. - */ - class BlockIterator implements Iterator { - private BlockInfo current; - - BlockIterator(BlockInfo head) { - this.current = head; - } - - public boolean hasNext() { - return current != null; - } - - public BlockInfo next() { - BlockInfo res = current; - current = current.getNext(current.findStorageInfo(DatanodeStorageInfo.this)); - return res; - } - - public void remove() { - throw new UnsupportedOperationException("Sorry. can't remove."); - } - } - private final DatanodeDescriptor dn; private final String storageID; private StorageType storageType; @@ -120,8 +96,7 @@ public void remove() { private volatile long remaining; private long blockPoolUsed; - private volatile BlockInfo blockList = null; - private int numBlocks = 0; + private final FoldedTreeSet blocks = new FoldedTreeSet<>(); // The ID of the last full block report which updated this storage. private long lastBlockReportId = 0; @@ -207,7 +182,7 @@ void setState(State state) { } boolean areBlocksOnFailedStorage() { - return getState() == State.FAILED && numBlocks != 0; + return getState() == State.FAILED && !blocks.isEmpty(); } @VisibleForTesting @@ -234,6 +209,36 @@ long getRemaining() { long getBlockPoolUsed() { return blockPoolUsed; } + /** + * For use during startup. Expects block to be added in sorted order + * to enable fast insert in to the DatanodeStorageInfo + * + * @param b Block to add to DatanodeStorageInfo + * @param reportedBlock The reported replica + * @return Enum describing if block was added, replaced or already existed + */ + public AddBlockResult addBlockInitial(BlockInfo b, Block reportedBlock) { + // First check whether the block belongs to a different storage + // on the same DN. + AddBlockResult result = AddBlockResult.ADDED; + DatanodeStorageInfo otherStorage = + b.findStorageInfo(getDatanodeDescriptor()); + + if (otherStorage != null) { + if (otherStorage != this) { + // The block belongs to a different storage. Remove it first. + otherStorage.removeBlock(b); + result = AddBlockResult.REPLACED; + } else { + // The block is already associated with this storage. + return AddBlockResult.ALREADY_EXIST; + } + } + + b.addStorage(this, reportedBlock); + blocks.addSortedLast(b); + return result; + } public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) { // First check whether the block belongs to a different storage @@ -253,9 +258,8 @@ public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) { } } - // add to the head of the data-node list b.addStorage(this, reportedBlock); - insertToList(b); + blocks.add(b); return result; } @@ -263,45 +267,17 @@ AddBlockResult addBlock(BlockInfo b) { return addBlock(b, b); } - public void insertToList(BlockInfo b) { - blockList = b.listInsert(blockList, this); - numBlocks++; - } - - public boolean removeBlock(BlockInfo b) { - blockList = b.listRemove(blockList, this); - if (b.removeStorage(this)) { - numBlocks--; - return true; - } else { - return false; - } + boolean removeBlock(BlockInfo b) { + blocks.remove(b); + return b.removeStorage(this); } int numBlocks() { - return numBlocks; + return blocks.size(); } Iterator getBlockIterator() { - return new BlockIterator(blockList); - } - - /** - * Move block to the head of the list of blocks belonging to the data-node. - * @return the index of the head of the blockList - */ - int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) { - blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex); - return curIndex; - } - - /** - * Used for testing only - * @return the head of the blockList - */ - @VisibleForTesting - BlockInfo getBlockListHeadForTesting(){ - return blockList; + return blocks.iterator(); } void updateState(StorageReport r) { @@ -349,6 +325,27 @@ StorageReport toStorageReport() { false, capacity, dfsUsed, remaining, blockPoolUsed); } + /** + * The fill ratio of the underlying TreeSet holding blocks. + * + * @return the fill ratio of the tree + */ + public double treeSetFillRatio() { + return blocks.fillRatio(); + } + + /** + * Compact the underlying TreeSet holding blocks. + * + * @param timeout Maximum time to spend compacting the tree set in + * milliseconds. + * + * @return true if compaction completed, false if aborted + */ + public boolean treeSetCompact(long timeout) { + return blocks.compact(timeout); + } + static Iterable toStorageTypes( final Iterable infos) { return new Iterable() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 1b72961d83..bc4f2d8973 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -461,7 +461,7 @@ List blockReport(long fullBrLeaseId) throws IOException { // Below split threshold, send all reports in a single message. DatanodeCommand cmd = bpNamenode.blockReport( bpRegistration, bpos.getBlockPoolId(), reports, - new BlockReportContext(1, 0, reportId, fullBrLeaseId)); + new BlockReportContext(1, 0, reportId, fullBrLeaseId, true)); numRPCs = 1; numReportsSent = reports.length; if (cmd != null) { @@ -474,7 +474,7 @@ List blockReport(long fullBrLeaseId) throws IOException { DatanodeCommand cmd = bpNamenode.blockReport( bpRegistration, bpos.getBlockPoolId(), singleReport, new BlockReportContext(reports.length, r, reportId, - fullBrLeaseId)); + fullBrLeaseId, true)); numReportsSent++; numRPCs++; if (cmd != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index 6f0b8a7f4c..34c9f2e53b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -18,13 +18,14 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; -import org.apache.hadoop.util.LightWeightResizableGSet; +import org.apache.hadoop.hdfs.util.FoldedTreeSet; /** * Maintains the replica map. @@ -33,9 +34,20 @@ class ReplicaMap { // Object using which this class is synchronized private final Object mutex; - // Map of block pool Id to another map of block Id to ReplicaInfo. - private final Map> map = - new HashMap>(); + // Map of block pool Id to a set of ReplicaInfo. + private final Map> map = new HashMap<>(); + + // Special comparator used to compare Long to Block ID in the TreeSet. + private static final Comparator LONG_AND_BLOCK_COMPARATOR + = new Comparator() { + + @Override + public int compare(Object o1, Object o2) { + long lookup = (long) o1; + long stored = ((Block) o2).getBlockId(); + return lookup > stored ? 1 : lookup < stored ? -1 : 0; + } + }; ReplicaMap(Object mutex) { if (mutex == null) { @@ -92,11 +104,14 @@ ReplicaInfo get(String bpid, Block block) { ReplicaInfo get(String bpid, long blockId) { checkBlockPool(bpid); synchronized(mutex) { - LightWeightResizableGSet m = map.get(bpid); - return m != null ? m.get(new Block(blockId)) : null; + FoldedTreeSet set = map.get(bpid); + if (set == null) { + return null; + } + return set.get(blockId, LONG_AND_BLOCK_COMPARATOR); } } - + /** * Add a replica's meta information into the map * @@ -109,13 +124,13 @@ ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) { checkBlockPool(bpid); checkBlock(replicaInfo); synchronized(mutex) { - LightWeightResizableGSet m = map.get(bpid); - if (m == null) { + FoldedTreeSet set = map.get(bpid); + if (set == null) { // Add an entry for block pool if it does not exist already - m = new LightWeightResizableGSet(); - map.put(bpid, m); + set = new FoldedTreeSet<>(); + map.put(bpid, set); } - return m.put(replicaInfo); + return set.addOrReplace(replicaInfo); } } @@ -138,12 +153,13 @@ ReplicaInfo remove(String bpid, Block block) { checkBlockPool(bpid); checkBlock(block); synchronized(mutex) { - LightWeightResizableGSet m = map.get(bpid); - if (m != null) { - ReplicaInfo replicaInfo = m.get(block); + FoldedTreeSet set = map.get(bpid); + if (set != null) { + ReplicaInfo replicaInfo = + set.get(block.getBlockId(), LONG_AND_BLOCK_COMPARATOR); if (replicaInfo != null && block.getGenerationStamp() == replicaInfo.getGenerationStamp()) { - return m.remove(block); + return set.removeAndGet(replicaInfo); } } } @@ -160,9 +176,9 @@ ReplicaInfo remove(String bpid, Block block) { ReplicaInfo remove(String bpid, long blockId) { checkBlockPool(bpid); synchronized(mutex) { - LightWeightResizableGSet m = map.get(bpid); - if (m != null) { - return m.remove(new Block(blockId)); + FoldedTreeSet set = map.get(bpid); + if (set != null) { + return set.removeAndGet(blockId, LONG_AND_BLOCK_COMPARATOR); } } return null; @@ -174,10 +190,9 @@ ReplicaInfo remove(String bpid, long blockId) { * @return the number of replicas in the map */ int size(String bpid) { - LightWeightResizableGSet m = null; synchronized(mutex) { - m = map.get(bpid); - return m != null ? m.size() : 0; + FoldedTreeSet set = map.get(bpid); + return set != null ? set.size() : 0; } } @@ -192,19 +207,17 @@ int size(String bpid) { * @return a collection of the replicas belonging to the block pool */ Collection replicas(String bpid) { - LightWeightResizableGSet m = null; - m = map.get(bpid); - return m != null ? m.values() : null; + return map.get(bpid); } void initBlockPool(String bpid) { checkBlockPool(bpid); synchronized(mutex) { - LightWeightResizableGSet m = map.get(bpid); - if (m == null) { + FoldedTreeSet set = map.get(bpid); + if (set == null) { // Add an entry for block pool if it does not exist already - m = new LightWeightResizableGSet(); - map.put(bpid, m); + set = new FoldedTreeSet<>(); + map.put(bpid, set); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java index 5bcd719b70..94749e2d5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java @@ -52,12 +52,16 @@ public class BlockReportContext { */ private final long leaseId; + private final boolean sorted; + public BlockReportContext(int totalRpcs, int curRpc, - long reportId, long leaseId) { + long reportId, long leaseId, + boolean sorted) { this.totalRpcs = totalRpcs; this.curRpc = curRpc; this.reportId = reportId; this.leaseId = leaseId; + this.sorted = sorted; } public int getTotalRpcs() { @@ -75,4 +79,8 @@ public long getReportId() { public long getLeaseId() { return leaseId; } + + public boolean isSorted() { + return sorted; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index add4e736ed..b962855795 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -131,7 +131,6 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, * Each finalized block is represented as 3 longs. Each under- * construction replica is represented as 4 longs. * This is done instead of Block[] to reduce memory used by block reports. - * @param reports report of blocks per storage * @param context Context information for this block report. * * @return - the next command for DN to process. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java new file mode 100644 index 0000000000..1c6be1d629 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java @@ -0,0 +1,1285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import org.apache.hadoop.util.Time; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.SortedSet; + +/** + * A memory efficient implementation of RBTree. Instead of having a Node for + * each entry each node contains an array holding 64 entries. + * + * Based on the Apache Harmony folded TreeMap. + * + * @param Entry type + */ +public class FoldedTreeSet implements SortedSet { + + private static final boolean RED = true; + private static final boolean BLACK = false; + + private final Comparator comparator; + private Node root; + private int size; + private int nodeCount; + private int modCount; + private Node cachedNode; + + /** + * Internal tree node that holds a sorted array of entries. + * + * @param type of the elements + */ + private static class Node { + + private static final int NODE_SIZE = 64; + + // Tree structure + private Node parent, left, right; + private boolean color; + private final E[] entries; + private int leftIndex = 0, rightIndex = -1; + private int size = 0; + // List for fast ordered iteration + private Node prev, next; + + @SuppressWarnings("unchecked") + public Node() { + entries = (E[]) new Object[NODE_SIZE]; + } + + public boolean isRed() { + return color == RED; + } + + public boolean isBlack() { + return color == BLACK; + } + + public Node getLeftMostNode() { + Node node = this; + while (node.left != null) { + node = node.left; + } + return node; + } + + public Node getRightMostNode() { + Node node = this; + while (node.right != null) { + node = node.right; + } + return node; + } + + public void addEntryLeft(E entry) { + assert rightIndex < entries.length; + assert !isFull(); + + if (leftIndex == 0) { + rightIndex++; + // Shift entries right/up + System.arraycopy(entries, 0, entries, 1, size); + } else { + leftIndex--; + } + size++; + entries[leftIndex] = entry; + } + + public void addEntryRight(E entry) { + assert !isFull(); + + if (rightIndex == NODE_SIZE - 1) { + assert leftIndex > 0; + // Shift entries left/down + System.arraycopy(entries, leftIndex, entries, --leftIndex, size); + } else { + rightIndex++; + } + size++; + entries[rightIndex] = entry; + } + + public void addEntryAt(E entry, int index) { + assert !isFull(); + + if (leftIndex == 0 || ((rightIndex != Node.NODE_SIZE - 1) + && (rightIndex - index <= index - leftIndex))) { + rightIndex++; + System.arraycopy(entries, index, + entries, index + 1, rightIndex - index); + entries[index] = entry; + } else { + int newLeftIndex = leftIndex - 1; + System.arraycopy(entries, leftIndex, + entries, newLeftIndex, index - leftIndex); + leftIndex = newLeftIndex; + entries[index - 1] = entry; + } + size++; + } + + public void addEntriesLeft(Node from) { + leftIndex -= from.size; + size += from.size; + System.arraycopy(from.entries, from.leftIndex, + entries, leftIndex, from.size); + } + + public void addEntriesRight(Node from) { + System.arraycopy(from.entries, from.leftIndex, + entries, rightIndex + 1, from.size); + size += from.size; + rightIndex += from.size; + } + + public E insertEntrySlideLeft(E entry, int index) { + E pushedEntry = entries[0]; + System.arraycopy(entries, 1, entries, 0, index - 1); + entries[index - 1] = entry; + return pushedEntry; + } + + public E insertEntrySlideRight(E entry, int index) { + E movedEntry = entries[rightIndex]; + System.arraycopy(entries, index, entries, index + 1, rightIndex - index); + entries[index] = entry; + return movedEntry; + } + + public E removeEntryLeft() { + assert !isEmpty(); + E entry = entries[leftIndex]; + entries[leftIndex] = null; + leftIndex++; + size--; + return entry; + } + + public E removeEntryRight() { + assert !isEmpty(); + E entry = entries[rightIndex]; + entries[rightIndex] = null; + rightIndex--; + size--; + return entry; + } + + public E removeEntryAt(int index) { + assert !isEmpty(); + + E entry = entries[index]; + int rightSize = rightIndex - index; + int leftSize = index - leftIndex; + if (rightSize <= leftSize) { + System.arraycopy(entries, index + 1, entries, index, rightSize); + entries[rightIndex] = null; + rightIndex--; + } else { + System.arraycopy(entries, leftIndex, entries, leftIndex + 1, leftSize); + entries[leftIndex] = null; + leftIndex++; + } + size--; + return entry; + } + + public boolean isFull() { + return size == NODE_SIZE; + } + + public boolean isEmpty() { + return size == 0; + } + + public void clear() { + if (leftIndex < rightIndex) { + Arrays.fill(entries, leftIndex, rightIndex + 1, null); + } + size = 0; + leftIndex = 0; + rightIndex = -1; + prev = null; + next = null; + parent = null; + left = null; + right = null; + color = BLACK; + } + } + + private static final class TreeSetIterator implements Iterator { + + private final FoldedTreeSet tree; + private int iteratorModCount; + private Node node; + private int index; + private E lastEntry; + private int lastIndex; + private Node lastNode; + + private TreeSetIterator(FoldedTreeSet tree) { + this.tree = tree; + this.iteratorModCount = tree.modCount; + if (!tree.isEmpty()) { + this.node = tree.root.getLeftMostNode(); + this.index = this.node.leftIndex; + } + } + + @Override + public boolean hasNext() { + checkForModification(); + return node != null; + } + + @Override + public E next() { + if (hasNext()) { + lastEntry = node.entries[index]; + lastIndex = index; + lastNode = node; + if (++index > node.rightIndex) { + node = node.next; + if (node != null) { + index = node.leftIndex; + } + } + return lastEntry; + } else { + throw new NoSuchElementException("Iterator exhausted"); + } + } + + @Override + public void remove() { + if (lastEntry == null) { + throw new IllegalStateException("No current element"); + } + checkForModification(); + if (lastNode.size == 1) { + // Safe to remove lastNode, the iterator is on the next node + tree.deleteNode(lastNode); + } else if (lastNode.leftIndex == lastIndex) { + // Safe to remove leftmost entry, the iterator is on the next index + lastNode.removeEntryLeft(); + } else if (lastNode.rightIndex == lastIndex) { + // Safe to remove the rightmost entry, the iterator is on the next node + lastNode.removeEntryRight(); + } else { + // Remove entry in the middle of the array + assert node == lastNode; + int oldRIndex = lastNode.rightIndex; + lastNode.removeEntryAt(lastIndex); + if (oldRIndex > lastNode.rightIndex) { + // Entries moved to the left in the array so index must be reset + index = lastIndex; + } + } + lastEntry = null; + iteratorModCount++; + tree.modCount++; + tree.size--; + } + + private void checkForModification() { + if (iteratorModCount != tree.modCount) { + throw new ConcurrentModificationException("Tree has been modified " + + "outside of iterator"); + } + } + } + + /** + * Create a new TreeSet that uses the natural ordering of objects. The element + * type must implement Comparable. + */ + public FoldedTreeSet() { + this(null); + } + + /** + * Create a new TreeSet that orders the elements using the supplied + * Comparator. + * + * @param comparator Comparator able to compare elements of type E + */ + public FoldedTreeSet(Comparator comparator) { + this.comparator = comparator; + } + + private Node cachedOrNewNode(E entry) { + Node node = (cachedNode != null) ? cachedNode : new Node(); + cachedNode = null; + nodeCount++; + // Since BlockIDs are always increasing for new blocks it is best to + // add values on the left side to enable quicker inserts on the right + node.addEntryLeft(entry); + return node; + } + + private void cacheAndClear(Node node) { + if (cachedNode == null) { + node.clear(); + cachedNode = node; + } + } + + @Override + public Comparator comparator() { + return comparator; + } + + @Override + public SortedSet subSet(E fromElement, E toElement) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public SortedSet headSet(E toElement) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public SortedSet tailSet(E fromElement) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public E first() { + if (!isEmpty()) { + Node node = root.getLeftMostNode(); + return node.entries[node.leftIndex]; + } + return null; + } + + @Override + public E last() { + if (!isEmpty()) { + Node node = root.getRightMostNode(); + return node.entries[node.rightIndex]; + } + return null; + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return root == null; + } + + /** + * Lookup and return a stored object using a user provided comparator. + * + * @param obj Lookup key + * @param cmp User provided Comparator. The comparator should expect that the + * proved obj will always be the first method parameter and any + * stored object will be the second parameter. + * + * @return A matching stored object or null if non is found + */ + public E get(Object obj, Comparator cmp) { + Objects.requireNonNull(obj); + + Node node = root; + while (node != null) { + E[] entries = node.entries; + + int leftIndex = node.leftIndex; + int result = compare(obj, entries[leftIndex], cmp); + if (result < 0) { + node = node.left; + } else if (result == 0) { + return entries[leftIndex]; + } else { + int rightIndex = node.rightIndex; + if (leftIndex != rightIndex) { + result = compare(obj, entries[rightIndex], cmp); + } + if (result == 0) { + return entries[rightIndex]; + } else if (result > 0) { + node = node.right; + } else { + int low = leftIndex + 1; + int high = rightIndex - 1; + while (low <= high) { + int mid = (low + high) >>> 1; + result = compare(obj, entries[mid], cmp); + if (result > 0) { + low = mid + 1; + } else if (result < 0) { + high = mid - 1; + } else { + return entries[mid]; + } + } + return null; + } + } + } + return null; + } + + /** + * Lookup and return a stored object. + * + * @param entry Lookup entry + * + * @return A matching stored object or null if non is found + */ + public E get(E entry) { + return get(entry, comparator); + } + + @Override + @SuppressWarnings("unchecked") + public boolean contains(Object obj) { + return get((E) obj) != null; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static int compare(Object lookup, Object stored, Comparator cmp) { + return cmp != null + ? cmp.compare(lookup, stored) + : ((Comparable) lookup).compareTo(stored); + } + + @Override + public Iterator iterator() { + return new TreeSetIterator<>(this); + } + + @Override + public Object[] toArray() { + Object[] objects = new Object[size]; + if (!isEmpty()) { + int pos = 0; + for (Node node = root.getLeftMostNode(); node != null; + pos += node.size, node = node.next) { + System.arraycopy(node.entries, node.leftIndex, objects, pos, node.size); + } + } + return objects; + } + + @Override + @SuppressWarnings("unchecked") + public T[] toArray(T[] a) { + T[] r = a.length >= size ? a + : (T[]) java.lang.reflect.Array + .newInstance(a.getClass().getComponentType(), size); + if (!isEmpty()) { + Node node = root.getLeftMostNode(); + int pos = 0; + while (node != null) { + System.arraycopy(node.entries, node.leftIndex, r, pos, node.size); + pos += node.size; + node = node.next; + } + if (r.length > pos) { + r[pos] = null; + } + } else if (a.length > 0) { + a[0] = null; + } + return r; + } + + /** + * Add or replace an entry in the TreeSet. + * + * @param entry Entry to add or replace/update. + * + * @return the previous entry, or null if this set did not already contain the + * specified entry + */ + public E addOrReplace(E entry) { + return add(entry, true); + } + + @Override + public boolean add(E entry) { + return add(entry, false) == null; + } + + /** + * Internal add method to add a entry to the set. + * + * @param entry Entry to add + * @param replace Should the entry replace an old entry which is equal to the + * new entry + * + * @return null if entry added and didn't exist or the previous value (which + * might not have been overwritten depending on the replace parameter) + */ + private E add(E entry, boolean replace) { + Objects.requireNonNull(entry); + + // Empty tree + if (isEmpty()) { + root = cachedOrNewNode(entry); + size = 1; + modCount++; + return null; + } + + // Compare right entry first since inserts of comperatively larger entries + // is more likely to be inserted. BlockID is always increasing in HDFS. + Node node = root; + Node prevNode = null; + int result = 0; + while (node != null) { + prevNode = node; + E[] entries = node.entries; + int rightIndex = node.rightIndex; + result = compare(entry, entries[rightIndex], comparator); + if (result > 0) { + node = node.right; + } else if (result == 0) { + E prevEntry = entries[rightIndex]; + if (replace) { + entries[rightIndex] = entry; + } + return prevEntry; + } else { + int leftIndex = node.leftIndex; + if (leftIndex != rightIndex) { + result = compare(entry, entries[leftIndex], comparator); + } + if (result < 0) { + node = node.left; + } else if (result == 0) { + E prevEntry = entries[leftIndex]; + if (replace) { + entries[leftIndex] = entry; + } + return prevEntry; + } else { + // Insert in this node + int low = leftIndex + 1, high = rightIndex - 1; + while (low <= high) { + int mid = (low + high) >>> 1; + result = compare(entry, entries[mid], comparator); + if (result > 0) { + low = mid + 1; + } else if (result == 0) { + E prevEntry = entries[mid]; + if (replace) { + entries[mid] = entry; + } + return prevEntry; + } else { + high = mid - 1; + } + } + addElementInNode(node, entry, low); + return null; + } + } + } + + assert prevNode != null; + size++; + modCount++; + if (!prevNode.isFull()) { + // The previous node still has space + if (result < 0) { + prevNode.addEntryLeft(entry); + } else { + prevNode.addEntryRight(entry); + } + } else if (result < 0) { + // The previous node is full, add to adjencent node or a new node + if (prevNode.prev != null && !prevNode.prev.isFull()) { + prevNode.prev.addEntryRight(entry); + } else { + attachNodeLeft(prevNode, cachedOrNewNode(entry)); + } + } else if (prevNode.next != null && !prevNode.next.isFull()) { + prevNode.next.addEntryLeft(entry); + } else { + attachNodeRight(prevNode, cachedOrNewNode(entry)); + } + return null; + } + + /** + * Insert an entry last in the sorted tree. The entry must be the considered + * larger than the currently largest entry in the set when doing + * current.compareTo(entry), if entry is not the largest entry the method will + * fall back on the regular add method. + * + * @param entry entry to add + * + * @return True if added, false if already existed in the set + */ + public boolean addSortedLast(E entry) { + + if (isEmpty()) { + root = cachedOrNewNode(entry); + size = 1; + modCount++; + return true; + } else { + Node node = root.getRightMostNode(); + if (compare(node.entries[node.rightIndex], entry, comparator) < 0) { + size++; + modCount++; + if (!node.isFull()) { + node.addEntryRight(entry); + } else { + attachNodeRight(node, cachedOrNewNode(entry)); + } + return true; + } + } + + // Fallback on normal add if entry is unsorted + return add(entry); + } + + private void addElementInNode(Node node, E entry, int index) { + size++; + modCount++; + + if (!node.isFull()) { + node.addEntryAt(entry, index); + } else { + // Node is full, insert and push old entry + Node prev = node.prev; + Node next = node.next; + if (prev == null) { + // First check if we have space in the the next node + if (next != null && !next.isFull()) { + E movedEntry = node.insertEntrySlideRight(entry, index); + next.addEntryLeft(movedEntry); + } else { + // Since prev is null the left child must be null + assert node.left == null; + E movedEntry = node.insertEntrySlideLeft(entry, index); + Node newNode = cachedOrNewNode(movedEntry); + attachNodeLeft(node, newNode); + } + } else if (!prev.isFull()) { + // Prev has space + E movedEntry = node.insertEntrySlideLeft(entry, index); + prev.addEntryRight(movedEntry); + } else if (next == null) { + // Since next is null the right child must be null + assert node.right == null; + E movedEntry = node.insertEntrySlideRight(entry, index); + Node newNode = cachedOrNewNode(movedEntry); + attachNodeRight(node, newNode); + } else if (!next.isFull()) { + // Next has space + E movedEntry = node.insertEntrySlideRight(entry, index); + next.addEntryLeft(movedEntry); + } else { + // Both prev and next nodes exist and are full + E movedEntry = node.insertEntrySlideRight(entry, index); + Node newNode = cachedOrNewNode(movedEntry); + if (node.right == null) { + attachNodeRight(node, newNode); + } else { + // Since our right node exist, + // the left node of our next node must be empty + assert next.left == null; + attachNodeLeft(next, newNode); + } + } + } + } + + private void attachNodeLeft(Node node, Node newNode) { + newNode.parent = node; + node.left = newNode; + + newNode.next = node; + newNode.prev = node.prev; + if (newNode.prev != null) { + newNode.prev.next = newNode; + } + node.prev = newNode; + balanceInsert(newNode); + } + + private void attachNodeRight(Node node, Node newNode) { + newNode.parent = node; + node.right = newNode; + + newNode.prev = node; + newNode.next = node.next; + if (newNode.next != null) { + newNode.next.prev = newNode; + } + node.next = newNode; + balanceInsert(newNode); + } + + /** + * Balance the RB Tree after insert. + * + * @param node Added node + */ + private void balanceInsert(Node node) { + node.color = RED; + + while (node != root && node.parent.isRed()) { + if (node.parent == node.parent.parent.left) { + Node uncle = node.parent.parent.right; + if (uncle != null && uncle.isRed()) { + node.parent.color = BLACK; + uncle.color = BLACK; + node.parent.parent.color = RED; + node = node.parent.parent; + } else { + if (node == node.parent.right) { + node = node.parent; + rotateLeft(node); + } + node.parent.color = BLACK; + node.parent.parent.color = RED; + rotateRight(node.parent.parent); + } + } else { + Node uncle = node.parent.parent.left; + if (uncle != null && uncle.isRed()) { + node.parent.color = BLACK; + uncle.color = BLACK; + node.parent.parent.color = RED; + node = node.parent.parent; + } else { + if (node == node.parent.left) { + node = node.parent; + rotateRight(node); + } + node.parent.color = BLACK; + node.parent.parent.color = RED; + rotateLeft(node.parent.parent); + } + } + } + root.color = BLACK; + } + + private void rotateRight(Node node) { + Node pivot = node.left; + node.left = pivot.right; + if (pivot.right != null) { + pivot.right.parent = node; + } + pivot.parent = node.parent; + if (node.parent == null) { + root = pivot; + } else if (node == node.parent.right) { + node.parent.right = pivot; + } else { + node.parent.left = pivot; + } + pivot.right = node; + node.parent = pivot; + } + + private void rotateLeft(Node node) { + Node pivot = node.right; + node.right = pivot.left; + if (pivot.left != null) { + pivot.left.parent = node; + } + pivot.parent = node.parent; + if (node.parent == null) { + root = pivot; + } else if (node == node.parent.left) { + node.parent.left = pivot; + } else { + node.parent.right = pivot; + } + pivot.left = node; + node.parent = pivot; + } + + /** + * Remove object using a provided comparator, and return the removed entry. + * + * @param obj Lookup entry + * @param cmp User provided Comparator. The comparator should expect that the + * proved obj will always be the first method parameter and any + * stored object will be the second parameter. + * + * @return The removed entry or null if not found + */ + public E removeAndGet(Object obj, Comparator cmp) { + Objects.requireNonNull(obj); + + if (!isEmpty()) { + Node node = root; + while (node != null) { + E[] entries = node.entries; + int leftIndex = node.leftIndex; + int result = compare(obj, entries[leftIndex], cmp); + if (result < 0) { + node = node.left; + } else if (result == 0) { + return removeElementLeft(node); + } else { + int rightIndex = node.rightIndex; + if (leftIndex != rightIndex) { + result = compare(obj, entries[rightIndex], cmp); + } + if (result == 0) { + return removeElementRight(node); + } else if (result > 0) { + node = node.right; + } else { + int low = leftIndex + 1, high = rightIndex - 1; + while (low <= high) { + int mid = (low + high) >>> 1; + result = compare(obj, entries[mid], cmp); + if (result > 0) { + low = mid + 1; + } else if (result == 0) { + return removeElementAt(node, mid); + } else { + high = mid - 1; + } + } + return null; + } + } + } + } + return null; + } + + /** + * Remove object and return the removed entry. + * + * @param obj Lookup entry + * + * @return The removed entry or null if not found + */ + public E removeAndGet(Object obj) { + return removeAndGet(obj, comparator); + } + + /** + * Remove object using a provided comparator. + * + * @param obj Lookup entry + * @param cmp User provided Comparator. The comparator should expect that the + * proved obj will always be the first method parameter and any + * stored object will be the second parameter. + * + * @return True if found and removed, else false + */ + public boolean remove(Object obj, Comparator cmp) { + return removeAndGet(obj, cmp) != null; + } + + @Override + public boolean remove(Object obj) { + return removeAndGet(obj, comparator) != null; + } + + private E removeElementLeft(Node node) { + modCount++; + size--; + E entry = node.removeEntryLeft(); + + if (node.isEmpty()) { + deleteNode(node); + } else if (node.prev != null + && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) { + // Remaining entries fit in the prev node, move them and delete this node + node.prev.addEntriesRight(node); + deleteNode(node); + } else if (node.next != null && node.next.leftIndex >= node.size) { + // Remaining entries fit in the next node, move them and delete this node + node.next.addEntriesLeft(node); + deleteNode(node); + } else if (node.prev != null && node.prev.size < node.leftIndex) { + // Entries in prev node will fit in this node, move them and delete prev + node.addEntriesLeft(node.prev); + deleteNode(node.prev); + } + + return entry; + } + + private E removeElementRight(Node node) { + modCount++; + size--; + E entry = node.removeEntryRight(); + + if (node.isEmpty()) { + deleteNode(node); + } else if (node.prev != null + && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) { + // Remaining entries fit in the prev node, move them and delete this node + node.prev.addEntriesRight(node); + deleteNode(node); + } else if (node.next != null && node.next.leftIndex >= node.size) { + // Remaining entries fit in the next node, move them and delete this node + node.next.addEntriesLeft(node); + deleteNode(node); + } else if (node.next != null + && node.next.size < (Node.NODE_SIZE - 1 - node.rightIndex)) { + // Entries in next node will fit in this node, move them and delete next + node.addEntriesRight(node.next); + deleteNode(node.next); + } + + return entry; + } + + private E removeElementAt(Node node, int index) { + modCount++; + size--; + E entry = node.removeEntryAt(index); + + if (node.prev != null + && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) { + // Remaining entries fit in the prev node, move them and delete this node + node.prev.addEntriesRight(node); + deleteNode(node); + } else if (node.next != null && (node.next.leftIndex) >= node.size) { + // Remaining entries fit in the next node, move them and delete this node + node.next.addEntriesLeft(node); + deleteNode(node); + } else if (node.prev != null && node.prev.size < node.leftIndex) { + // Entries in prev node will fit in this node, move them and delete prev + node.addEntriesLeft(node.prev); + deleteNode(node.prev); + } else if (node.next != null + && node.next.size < (Node.NODE_SIZE - 1 - node.rightIndex)) { + // Entries in next node will fit in this node, move them and delete next + node.addEntriesRight(node.next); + deleteNode(node.next); + } + + return entry; + } + + /** + * Delete the node and ensure the tree is balanced. + * + * @param node node to delete + */ + private void deleteNode(final Node node) { + if (node.right == null) { + if (node.left != null) { + attachToParent(node, node.left); + } else { + attachNullToParent(node); + } + } else if (node.left == null) { + attachToParent(node, node.right); + } else { + // node.left != null && node.right != null + // node.next should replace node in tree + // node.next != null guaranteed since node.left != null + // node.next.left == null since node.next.prev is node + // node.next.right may be null or non-null + Node toMoveUp = node.next; + if (toMoveUp.right == null) { + attachNullToParent(toMoveUp); + } else { + attachToParent(toMoveUp, toMoveUp.right); + } + toMoveUp.left = node.left; + if (toMoveUp.left != null) { + toMoveUp.left.parent = toMoveUp; + } + toMoveUp.right = node.right; + if (toMoveUp.right != null) { + toMoveUp.right.parent = toMoveUp; + } + attachToParentNoBalance(node, toMoveUp); + toMoveUp.color = node.color; + } + + // Remove node from ordered list of nodes + if (node.prev != null) { + node.prev.next = node.next; + } + if (node.next != null) { + node.next.prev = node.prev; + } + + nodeCount--; + cacheAndClear(node); + } + + private void attachToParentNoBalance(Node toDelete, Node toConnect) { + Node parent = toDelete.parent; + toConnect.parent = parent; + if (parent == null) { + root = toConnect; + } else if (toDelete == parent.left) { + parent.left = toConnect; + } else { + parent.right = toConnect; + } + } + + private void attachToParent(Node toDelete, Node toConnect) { + attachToParentNoBalance(toDelete, toConnect); + if (toDelete.isBlack()) { + balanceDelete(toConnect); + } + } + + private void attachNullToParent(Node toDelete) { + Node parent = toDelete.parent; + if (parent == null) { + root = null; + } else { + if (toDelete == parent.left) { + parent.left = null; + } else { + parent.right = null; + } + if (toDelete.isBlack()) { + balanceDelete(parent); + } + } + } + + /** + * Balance tree after removing a node. + * + * @param node Node to balance after deleting another node + */ + private void balanceDelete(Node node) { + while (node != root && node.isBlack()) { + if (node == node.parent.left) { + Node sibling = node.parent.right; + if (sibling == null) { + node = node.parent; + continue; + } + if (sibling.isRed()) { + sibling.color = BLACK; + node.parent.color = RED; + rotateLeft(node.parent); + sibling = node.parent.right; + if (sibling == null) { + node = node.parent; + continue; + } + } + if ((sibling.left == null || !sibling.left.isRed()) + && (sibling.right == null || !sibling.right.isRed())) { + sibling.color = RED; + node = node.parent; + } else { + if (sibling.right == null || !sibling.right.isRed()) { + sibling.left.color = BLACK; + sibling.color = RED; + rotateRight(sibling); + sibling = node.parent.right; + } + sibling.color = node.parent.color; + node.parent.color = BLACK; + sibling.right.color = BLACK; + rotateLeft(node.parent); + node = root; + } + } else { + Node sibling = node.parent.left; + if (sibling == null) { + node = node.parent; + continue; + } + if (sibling.isRed()) { + sibling.color = BLACK; + node.parent.color = RED; + rotateRight(node.parent); + sibling = node.parent.left; + if (sibling == null) { + node = node.parent; + continue; + } + } + if ((sibling.left == null || sibling.left.isBlack()) + && (sibling.right == null || sibling.right.isBlack())) { + sibling.color = RED; + node = node.parent; + } else { + if (sibling.left == null || sibling.left.isBlack()) { + sibling.right.color = BLACK; + sibling.color = RED; + rotateLeft(sibling); + sibling = node.parent.left; + } + sibling.color = node.parent.color; + node.parent.color = BLACK; + sibling.left.color = BLACK; + rotateRight(node.parent); + node = root; + } + } + } + node.color = BLACK; + } + + @Override + public boolean containsAll(Collection c) { + for (Object entry : c) { + if (!contains(entry)) { + return false; + } + } + return true; + } + + @Override + public boolean addAll(Collection c) { + boolean modified = false; + for (E entry : c) { + if (add(entry)) { + modified = true; + } + } + return modified; + } + + @Override + public boolean retainAll(Collection c) { + boolean modified = false; + Iterator it = iterator(); + while (it.hasNext()) { + if (!c.contains(it.next())) { + it.remove(); + modified = true; + } + } + return modified; + } + + @Override + public boolean removeAll(Collection c) { + boolean modified = false; + for (Object entry : c) { + if (remove(entry)) { + modified = true; + } + } + return modified; + } + + @Override + public void clear() { + modCount++; + if (!isEmpty()) { + size = 0; + nodeCount = 0; + cacheAndClear(root); + root = null; + } + } + + /** + * Returns the current size divided by the capacity of the tree. A value + * between 0.0 and 1.0, where 1.0 means that every allocated node in the tree + * is completely full. + * + * An empty set will return 1.0 + * + * @return the fill ratio of the tree + */ + public double fillRatio() { + if (nodeCount > 1) { + // Count the last node as completely full since it can't be compacted + return (size + (Node.NODE_SIZE - root.getRightMostNode().size)) + / (double) (nodeCount * Node.NODE_SIZE); + } + return 1.0; + } + + /** + * Compact all the entries to use the fewest number of nodes in the tree. + * + * Having a compact tree minimize memory usage, but can cause inserts to get + * slower due to new nodes needs to be allocated as there is no space in any + * of the existing nodes anymore for entries added in the middle of the set. + * + * Useful to do to reduce memory consumption and if the tree is know to not + * change after compaction or mainly added to at either extreme. + * + * @param timeout Maximum time to spend compacting the tree set in + * milliseconds. + * + * @return true if compaction completed, false if aborted + */ + public boolean compact(long timeout) { + + if (!isEmpty()) { + long start = Time.monotonicNow(); + Node node = root.getLeftMostNode(); + while (node != null) { + if (node.prev != null && !node.prev.isFull()) { + Node prev = node.prev; + int count = Math.min(Node.NODE_SIZE - prev.size, node.size); + System.arraycopy(node.entries, node.leftIndex, + prev.entries, prev.rightIndex + 1, count); + node.leftIndex += count; + node.size -= count; + prev.rightIndex += count; + prev.size += count; + } + if (node.isEmpty()) { + Node temp = node.next; + deleteNode(node); + node = temp; + continue; + } else if (!node.isFull()) { + if (node.leftIndex != 0) { + System.arraycopy(node.entries, node.leftIndex, + node.entries, 0, node.size); + Arrays.fill(node.entries, node.size, node.rightIndex + 1, null); + node.leftIndex = 0; + node.rightIndex = node.size - 1; + } + } + node = node.next; + + if (Time.monotonicNow() - start > timeout) { + return false; + } + } + } + + return true; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 05a68306f1..02d5b81d69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -261,6 +261,9 @@ message BlockReportContextProto { // The block report lease ID, or 0 if we are sending without a lease to // bypass rate-limiting. optional uint64 leaseId = 4 [ default = 0 ]; + + // True if the reported blocks are sorted by increasing block IDs + optional bool sorted = 5 [default = false]; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java index bf293732ef..2cc1f7d80b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java @@ -228,7 +228,7 @@ public BlockReportResponseProto answer(InvocationOnMock invocation) { request.set(null); nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask()); nn.blockReport(reg, "pool", sbr, - new BlockReportContext(1, 0, System.nanoTime(), 0L)); + new BlockReportContext(1, 0, System.nanoTime(), 0L, true)); BlockReportRequestProto proto = request.get(); assertNotNull(proto); assertTrue(proto.getReports(0).getBlocksList().isEmpty()); @@ -238,7 +238,7 @@ public BlockReportResponseProto answer(InvocationOnMock invocation) { request.set(null); nsInfo.setCapabilities(Capability.UNKNOWN.getMask()); nn.blockReport(reg, "pool", sbr, - new BlockReportContext(1, 0, System.nanoTime(), 0L)); + new BlockReportContext(1, 0, System.nanoTime(), 0L, true)); proto = request.get(); assertNotNull(proto); assertFalse(proto.getReports(0).getBlocksList().isEmpty()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java index d6213ff239..4e7cf3d6da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java @@ -19,19 +19,11 @@ import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID; import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; -import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.junit.Assert; import org.junit.Test; @@ -91,84 +83,4 @@ public void testReplaceStorage() throws Exception { Assert.assertThat(added, is(false)); Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2)); } - - @Test - public void testBlockListMoveToHead() throws Exception { - LOG.info("BlockInfo moveToHead tests..."); - - final int MAX_BLOCKS = 10; - - DatanodeStorageInfo dd = DFSTestUtil.createDatanodeStorageInfo("s1", "1.1.1.1"); - ArrayList blockList = new ArrayList(MAX_BLOCKS); - ArrayList blockInfoList = new ArrayList(); - int headIndex; - int curIndex; - - LOG.info("Building block list..."); - for (int i = 0; i < MAX_BLOCKS; i++) { - blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP)); - blockInfoList.add(new BlockInfoContiguous(blockList.get(i), (short) 3)); - dd.addBlock(blockInfoList.get(i)); - - // index of the datanode should be 0 - assertEquals("Find datanode should be 0", 0, blockInfoList.get(i) - .findStorageInfo(dd)); - } - - // list length should be equal to the number of blocks we inserted - LOG.info("Checking list length..."); - assertEquals("Length should be MAX_BLOCK", MAX_BLOCKS, dd.numBlocks()); - Iterator it = dd.getBlockIterator(); - int len = 0; - while (it.hasNext()) { - it.next(); - len++; - } - assertEquals("There should be MAX_BLOCK blockInfo's", MAX_BLOCKS, len); - - headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd); - - LOG.info("Moving each block to the head of the list..."); - for (int i = 0; i < MAX_BLOCKS; i++) { - curIndex = blockInfoList.get(i).findStorageInfo(dd); - headIndex = dd.moveBlockToHead(blockInfoList.get(i), curIndex, headIndex); - // the moved element must be at the head of the list - assertEquals("Block should be at the head of the list now.", - blockInfoList.get(i), dd.getBlockListHeadForTesting()); - } - - // move head of the list to the head - this should not change the list - LOG.info("Moving head to the head..."); - - BlockInfo temp = dd.getBlockListHeadForTesting(); - curIndex = 0; - headIndex = 0; - dd.moveBlockToHead(temp, curIndex, headIndex); - assertEquals( - "Moving head to the head of the list shopuld not change the list", - temp, dd.getBlockListHeadForTesting()); - - // check all elements of the list against the original blockInfoList - LOG.info("Checking elements of the list..."); - temp = dd.getBlockListHeadForTesting(); - assertNotNull("Head should not be null", temp); - int c = MAX_BLOCKS - 1; - while (temp != null) { - assertEquals("Expected element is not on the list", - blockInfoList.get(c--), temp); - temp = temp.getNext(0); - } - - LOG.info("Moving random blocks to the head of the list..."); - headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd); - Random rand = new Random(); - for (int i = 0; i < MAX_BLOCKS; i++) { - int j = rand.nextInt(MAX_BLOCKS); - curIndex = blockInfoList.get(j).findStorageInfo(dd); - headIndex = dd.moveBlockToHead(blockInfoList.get(j), curIndex, headIndex); - // the moved element must be at the head of the list - assertEquals("Block should be at the head of the list now.", - blockInfoList.get(j), dd.getBlockListHeadForTesting()); - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 0e4e167da2..a970d77c25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.LinkedList; import java.util.List; @@ -75,6 +76,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.TestINodeFile; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; @@ -806,7 +808,8 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception { // Make sure it's the first full report assertEquals(0, ds.getBlockReportCount()); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - builder.build(), null, false); + builder.build(), + new BlockReportContext(1, 0, System.nanoTime(), 0, true), false); assertEquals(1, ds.getBlockReportCount()); // verify the storage info is correct @@ -821,6 +824,70 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception { (ds) >= 0); } + @Test + public void testFullBR() throws Exception { + doReturn(true).when(fsn).isRunning(); + + DatanodeDescriptor node = nodes.get(0); + DatanodeStorageInfo ds = node.getStorageInfos()[0]; + node.setAlive(true); + DatanodeRegistration nodeReg = new DatanodeRegistration(node, null, null, ""); + + // register new node + bm.getDatanodeManager().registerDatanode(nodeReg); + bm.getDatanodeManager().addDatanode(node); + assertEquals(node, bm.getDatanodeManager().getDatanode(node)); + assertEquals(0, ds.getBlockReportCount()); + + ArrayList blocks = new ArrayList<>(); + for (int id = 24; id > 0; id--) { + blocks.add(addBlockToBM(id)); + } + + // Make sure it's the first full report + assertEquals(0, ds.getBlockReportCount()); + bm.processReport(node, new DatanodeStorage(ds.getStorageID()), + generateReport(blocks), + new BlockReportContext(1, 0, System.nanoTime(), 0, false), + false); + assertEquals(1, ds.getBlockReportCount()); + // verify the storage info is correct + for (BlockInfo block : blocks) { + assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0); + } + + // Send unsorted report + bm.processReport(node, new DatanodeStorage(ds.getStorageID()), + generateReport(blocks), + new BlockReportContext(1, 0, System.nanoTime(), 0, false), + false); + assertEquals(2, ds.getBlockReportCount()); + // verify the storage info is correct + for (BlockInfo block : blocks) { + assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0); + } + + // Sort list and send a sorted report + Collections.sort(blocks); + bm.processReport(node, new DatanodeStorage(ds.getStorageID()), + generateReport(blocks), + new BlockReportContext(1, 0, System.nanoTime(), 0, true), + false); + assertEquals(3, ds.getBlockReportCount()); + // verify the storage info is correct + for (BlockInfo block : blocks) { + assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0); + } + } + + private BlockListAsLongs generateReport(List blocks) { + BlockListAsLongs.Builder builder = BlockListAsLongs.builder(); + for (BlockInfo block : blocks) { + builder.add(new FinalizedReplica(block, null, null)); + } + return builder.build(); + } + private BlockInfo addBlockToBM(long blkId) { Block block = new Block(blkId); BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3); @@ -1061,4 +1128,4 @@ public void run() { cluster.shutdown(); } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index c843938e2b..f4e88b7d50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import javax.management.NotCompliantMBeanException; @@ -566,7 +567,7 @@ public synchronized void injectBlocks(String bpid, } Map map = blockMap.get(bpid); if (map == null) { - map = new HashMap(); + map = new TreeMap<>(); blockMap.put(bpid, map); } @@ -1206,7 +1207,7 @@ public long getReplicaVisibleLength(ExtendedBlock block) { @Override // FsDatasetSpi public void addBlockPool(String bpid, Configuration conf) { - Map map = new HashMap(); + Map map = new TreeMap<>(); blockMap.put(bpid, map); storage.addBlockPool(bpid); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java index 27d1ceac9d..61321e420e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java @@ -19,14 +19,23 @@ import java.io.IOException; import java.util.ArrayList; - +import java.util.Collections; +import java.util.Comparator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -108,12 +117,13 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException { StorageBlockReport reports[] = new StorageBlockReport[cluster.getStoragesPerDatanode()]; - ArrayList blocks = new ArrayList(); + ArrayList blocks = new ArrayList<>(); for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { Block localBlock = locatedBlock.getBlock().getLocalBlock(); blocks.add(new FinalizedReplica(localBlock, null, null)); } + Collections.sort(blocks); try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset().getFsVolumeReferences()) { @@ -126,7 +136,7 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException { // Should not assert! cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports, - new BlockReportContext(1, 0, System.nanoTime(), 0L)); + new BlockReportContext(1, 0, System.nanoTime(), 0L, true)); // Get the block locations once again. locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index 212d2e6ec6..27029a2e40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -82,6 +82,7 @@ import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.timeout; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 90e000bb3b..66f804c09f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; @@ -190,7 +191,8 @@ public void testVolumeFailure() throws Exception { new StorageBlockReport(dnStorage, blockList); } - cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null); + cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, + new BlockReportContext(1, 0, System.nanoTime(), 0, true)); // verify number of blocks and files... verify(filename, filesize); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java index aadd9b2020..badd59353c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java @@ -134,7 +134,7 @@ public void testAlwaysSplit() throws IOException, InterruptedException { Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport( any(DatanodeRegistration.class), anyString(), - captor.capture(), Mockito.anyObject()); + captor.capture(), Mockito.anyObject()); verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); } @@ -166,7 +166,7 @@ public void testCornerCaseUnderThreshold() throws IOException, InterruptedExcept Mockito.verify(nnSpy, times(1)).blockReport( any(DatanodeRegistration.class), anyString(), - captor.capture(), Mockito.anyObject()); + captor.capture(), Mockito.anyObject()); verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java index 67bbefe342..791ee20190 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; -import org.apache.hadoop.util.Time; /** @@ -40,7 +39,7 @@ protected void sendBlockReports(DatanodeRegistration dnR, String poolId, LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); StorageBlockReport[] singletonReport = { report }; cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport, - new BlockReportContext(reports.length, i, System.nanoTime(), 0L)); + new BlockReportContext(reports.length, i, System.nanoTime(), 0L, true)); i++; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java index fd19ba6f0b..a35fa48932 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java @@ -36,6 +36,6 @@ protected void sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports) throws IOException { LOG.info("Sending combined block reports for " + dnR); cluster.getNameNodeRpc().blockReport(dnR, poolId, reports, - new BlockReportContext(1, 0, System.nanoTime(), 0L)); + new BlockReportContext(1, 0, System.nanoTime(), 0L, true)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java index 00c0f22d63..f12bc181c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.times; import static org.mockito.Mockito.timeout; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 42cb72f8a3..7fa3803fd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -973,7 +973,7 @@ void register() throws IOException { new StorageBlockReport(storage, BlockListAsLongs.EMPTY) }; dataNodeProto.blockReport(dnRegistration, bpid, reports, - new BlockReportContext(1, 0, System.nanoTime(), 0L)); + new BlockReportContext(1, 0, System.nanoTime(), 0L, true)); } /** @@ -1247,7 +1247,7 @@ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { StorageBlockReport[] report = { new StorageBlockReport( dn.storage, dn.getBlockReportList()) }; dataNodeProto.blockReport(dn.dnRegistration, bpid, report, - new BlockReportContext(1, 0, System.nanoTime(), 0L)); + new BlockReportContext(1, 0, System.nanoTime(), 0L, true)); long end = Time.now(); return end-start; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java index 542c616d2f..dfe9cbe9a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; @@ -303,7 +304,8 @@ public void testAddUCReplica() throws Exception { StorageBlockReport[] reports = {new StorageBlockReport(storage, bll)}; cluster.getNameNodeRpc().blockReport(dn.getDNRegistrationForBP(bpId), - bpId, reports, null); + bpId, reports, + new BlockReportContext(1, 0, System.nanoTime(), 0, true)); } DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 7fd0c30046..ff8f81b27c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -118,7 +118,7 @@ public void testDeadDatanode() throws Exception { BlockListAsLongs.EMPTY) }; try { dnp.blockReport(reg, poolId, report, - new BlockReportContext(1, 0, System.nanoTime(), 0L)); + new BlockReportContext(1, 0, System.nanoTime(), 0L, true)); fail("Expected IOException is not thrown"); } catch (IOException ex) { // Expected diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java new file mode 100644 index 0000000000..d554b1b1b7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java @@ -0,0 +1,644 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Random; + +/** + * Test of TreeSet + */ +public class FoldedTreeSetTest { + + private static Random srand; + + public FoldedTreeSetTest() { + } + + @BeforeClass + public static void setUpClass() { + long seed = System.nanoTime(); + System.out.println("This run uses the random seed " + seed); + srand = new Random(seed); + } + + @AfterClass + public static void tearDownClass() { + } + + @Before + public void setUp() { + } + + @After + public void tearDown() { + } + + /** + * Test of comparator method, of class TreeSet. + */ + @Test + public void testComparator() { + Comparator comparator = new Comparator() { + + @Override + public int compare(String o1, String o2) { + return o1.compareTo(o2); + } + }; + assertEquals(null, new FoldedTreeSet<>().comparator()); + assertEquals(comparator, new FoldedTreeSet<>(comparator).comparator()); + + FoldedTreeSet set = new FoldedTreeSet<>(comparator); + set.add("apa3"); + set.add("apa2"); + set.add("apa"); + set.add("apa5"); + set.add("apa4"); + assertEquals(5, set.size()); + assertEquals("apa", set.get("apa")); + } + + /** + * Test of first method, of class TreeSet. + */ + @Test + public void testFirst() { + FoldedTreeSet tree = new FoldedTreeSet<>(); + for (int i = 0; i < 256; i++) { + tree.add(1024 + i); + assertEquals(1024, tree.first().intValue()); + } + for (int i = 1; i < 256; i++) { + tree.remove(1024 + i); + assertEquals(1024, tree.first().intValue()); + } + } + + /** + * Test of last method, of class TreeSet. + */ + @Test + public void testLast() { + FoldedTreeSet tree = new FoldedTreeSet<>(); + for (int i = 0; i < 256; i++) { + tree.add(1024 + i); + assertEquals(1024 + i, tree.last().intValue()); + } + for (int i = 0; i < 255; i++) { + tree.remove(1024 + i); + assertEquals(1279, tree.last().intValue()); + } + } + + /** + * Test of size method, of class TreeSet. + */ + @Test + public void testSize() { + FoldedTreeSet instance = new FoldedTreeSet<>(); + String entry = "apa"; + assertEquals(0, instance.size()); + instance.add(entry); + assertEquals(1, instance.size()); + instance.remove(entry); + assertEquals(0, instance.size()); + } + + /** + * Test of isEmpty method, of class TreeSet. + */ + @Test + public void testIsEmpty() { + FoldedTreeSet instance = new FoldedTreeSet<>(); + boolean expResult = true; + boolean result = instance.isEmpty(); + assertEquals(expResult, result); + instance.add("apa"); + instance.remove("apa"); + assertEquals(expResult, result); + } + + /** + * Test of contains method, of class TreeSet. + */ + @Test + public void testContains() { + FoldedTreeSet instance = new FoldedTreeSet<>(); + String entry = "apa"; + assertEquals(false, instance.contains(entry)); + instance.add(entry); + assertEquals(true, instance.contains(entry)); + assertEquals(false, instance.contains(entry + entry)); + } + + /** + * Test of iterator method, of class TreeSet. + */ + @Test + public void testIterator() { + + for (int iter = 0; iter < 10; iter++) { + FoldedTreeSet set = new FoldedTreeSet<>(); + long[] longs = new long[64723]; + for (int i = 0; i < longs.length; i++) { + Holder val = new Holder(srand.nextLong()); + while (set.contains(val)) { + val = new Holder(srand.nextLong()); + } + longs[i] = val.getId(); + set.add(val); + } + assertEquals(longs.length, set.size()); + Arrays.sort(longs); + + Iterator it = set.iterator(); + for (int i = 0; i < longs.length; i++) { + assertTrue(it.hasNext()); + Holder val = it.next(); + assertEquals(longs[i], val.getId()); + // remove randomly to force non linear removes + if (srand.nextBoolean()) { + it.remove(); + } + } + } + } + + /** + * Test of toArray method, of class TreeSet. + */ + @Test + public void testToArray() { + FoldedTreeSet tree = new FoldedTreeSet<>(); + ArrayList list = new ArrayList<>(256); + for (int i = 0; i < 256; i++) { + list.add(1024 + i); + } + tree.addAll(list); + assertArrayEquals(list.toArray(), tree.toArray()); + } + + /** + * Test of toArray method, of class TreeSet. + */ + @Test + public void testToArray_GenericType() { + FoldedTreeSet tree = new FoldedTreeSet<>(); + ArrayList list = new ArrayList<>(256); + for (int i = 0; i < 256; i++) { + list.add(1024 + i); + } + tree.addAll(list); + assertArrayEquals(list.toArray(new Integer[tree.size()]), tree.toArray(new Integer[tree.size()])); + assertArrayEquals(list.toArray(new Integer[tree.size() + 100]), tree.toArray(new Integer[tree.size() + 100])); + } + + /** + * Test of add method, of class TreeSet. + */ + @Test + public void testAdd() { + FoldedTreeSet simpleSet = new FoldedTreeSet<>(); + String entry = "apa"; + assertTrue(simpleSet.add(entry)); + assertFalse(simpleSet.add(entry)); + + FoldedTreeSet intSet = new FoldedTreeSet<>(); + for (int i = 512; i < 1024; i++) { + assertTrue(intSet.add(i)); + } + for (int i = -1024; i < -512; i++) { + assertTrue(intSet.add(i)); + } + for (int i = 0; i < 512; i++) { + assertTrue(intSet.add(i)); + } + for (int i = -512; i < 0; i++) { + assertTrue(intSet.add(i)); + } + assertEquals(2048, intSet.size()); + + FoldedTreeSet set = new FoldedTreeSet<>(); + long[] longs = new long[23432]; + for (int i = 0; i < longs.length; i++) { + Holder val = new Holder(srand.nextLong()); + while (set.contains(val)) { + val = new Holder(srand.nextLong()); + } + longs[i] = val.getId(); + assertTrue(set.add(val)); + } + assertEquals(longs.length, set.size()); + Arrays.sort(longs); + + Iterator it = set.iterator(); + for (int i = 0; i < longs.length; i++) { + assertTrue(it.hasNext()); + Holder val = it.next(); + assertEquals(longs[i], val.getId()); + } + + // Specially constructed adds to exercise all code paths + FoldedTreeSet specialAdds = new FoldedTreeSet<>(); + // Fill node with even numbers + for (int i = 0; i < 128; i += 2) { + assertTrue(specialAdds.add(i)); + } + // Remove left and add left + assertTrue(specialAdds.remove(0)); + assertTrue(specialAdds.add(-1)); + assertTrue(specialAdds.remove(-1)); + // Add right and shift everything left + assertTrue(specialAdds.add(127)); + assertTrue(specialAdds.remove(127)); + + // Empty at both ends + assertTrue(specialAdds.add(0)); + assertTrue(specialAdds.remove(0)); + assertTrue(specialAdds.remove(126)); + // Add in the middle left to slide entries left + assertTrue(specialAdds.add(11)); + assertTrue(specialAdds.remove(11)); + // Add in the middle right to slide entries right + assertTrue(specialAdds.add(99)); + assertTrue(specialAdds.remove(99)); + // Add existing entry in the middle of a node + assertFalse(specialAdds.add(64)); + } + + @Test + public void testAddOrReplace() { + FoldedTreeSet simpleSet = new FoldedTreeSet<>(); + String entry = "apa"; + assertNull(simpleSet.addOrReplace(entry)); + assertEquals(entry, simpleSet.addOrReplace(entry)); + + FoldedTreeSet intSet = new FoldedTreeSet<>(); + for (int i = 0; i < 1024; i++) { + assertNull(intSet.addOrReplace(i)); + } + for (int i = 0; i < 1024; i++) { + assertEquals(i, intSet.addOrReplace(i).intValue()); + } + } + + private static class Holder implements Comparable { + + private final long id; + + public Holder(long id) { + this.id = id; + } + + public long getId() { + return id; + } + + @Override + public int compareTo(Holder o) { + return id < o.getId() ? -1 + : id > o.getId() ? 1 : 0; + } + } + + @Test + public void testRemoveWithComparator() { + FoldedTreeSet set = new FoldedTreeSet<>(); + long[] longs = new long[98327]; + for (int i = 0; i < longs.length; i++) { + Holder val = new Holder(srand.nextLong()); + while (set.contains(val)) { + val = new Holder(srand.nextLong()); + } + longs[i] = val.getId(); + set.add(val); + } + assertEquals(longs.length, set.size()); + Comparator cmp = new Comparator() { + @Override + public int compare(Object o1, Object o2) { + long lookup = (long) o1; + long stored = ((Holder) o2).getId(); + return lookup < stored ? -1 + : lookup > stored ? 1 : 0; + } + }; + + for (long val : longs) { + set.remove(val, cmp); + } + assertEquals(0, set.size()); + assertTrue(set.isEmpty()); + } + + @Test + public void testGetWithComparator() { + FoldedTreeSet set = new FoldedTreeSet<>(); + long[] longs = new long[32147]; + for (int i = 0; i < longs.length; i++) { + Holder val = new Holder(srand.nextLong()); + while (set.contains(val)) { + val = new Holder(srand.nextLong()); + } + longs[i] = val.getId(); + set.add(val); + } + assertEquals(longs.length, set.size()); + Comparator cmp = new Comparator() { + @Override + public int compare(Object o1, Object o2) { + long lookup = (long) o1; + long stored = ((Holder) o2).getId(); + return lookup < stored ? -1 + : lookup > stored ? 1 : 0; + } + }; + + for (long val : longs) { + assertEquals(val, set.get(val, cmp).getId()); + } + } + + @Test + public void testGet() { + FoldedTreeSet set = new FoldedTreeSet<>(); + long[] longs = new long[43277]; + for (int i = 0; i < longs.length; i++) { + Holder val = new Holder(srand.nextLong()); + while (set.contains(val)) { + val = new Holder(srand.nextLong()); + } + longs[i] = val.getId(); + set.add(val); + } + assertEquals(longs.length, set.size()); + + for (long val : longs) { + assertEquals(val, set.get(new Holder(val)).getId()); + } + } + + /** + * Test of remove method, of class TreeSet. + */ + @Test + public void testRemove() { + FoldedTreeSet instance = new FoldedTreeSet<>(); + assertEquals(false, instance.remove("apa")); + instance.add("apa"); + assertEquals(true, instance.remove("apa")); + + removeLeft(); + removeRight(); + removeAt(); + removeRandom(); + } + + public void removeLeft() { + FoldedTreeSet set = new FoldedTreeSet<>(); + for (int i = 1; i <= 320; i++) { + set.add(i); + } + for (int i = 193; i < 225; i++) { + assertEquals(true, set.remove(i)); + assertEquals(false, set.remove(i)); + } + for (int i = 129; i < 161; i++) { + assertEquals(true, set.remove(i)); + assertEquals(false, set.remove(i)); + } + for (int i = 256; i > 224; i--) { + assertEquals(true, set.remove(i)); + assertEquals(false, set.remove(i)); + } + for (int i = 257; i < 289; i++) { + assertEquals(true, set.remove(i)); + assertEquals(false, set.remove(i)); + } + while (!set.isEmpty()) { + assertTrue(set.remove(set.first())); + } + } + + public void removeRight() { + FoldedTreeSet set = new FoldedTreeSet<>(); + for (int i = 1; i <= 320; i++) { + set.add(i); + } + for (int i = 193; i < 225; i++) { + assertEquals(true, set.remove(i)); + assertEquals(false, set.remove(i)); + } + for (int i = 192; i > 160; i--) { + assertEquals(true, set.remove(i)); + assertEquals(false, set.remove(i)); + } + for (int i = 256; i > 224; i--) { + assertEquals(true, set.remove(i)); + assertEquals(false, set.remove(i)); + } + for (int i = 320; i > 288; i--) { + assertEquals(true, set.remove(i)); + assertEquals(false, set.remove(i)); + } + while (!set.isEmpty()) { + assertTrue(set.remove(set.last())); + } + } + + public void removeAt() { + FoldedTreeSet set = new FoldedTreeSet<>(); + for (int i = 1; i <= 320; i++) { + set.add(i); + } + for (int i = 193; i < 225; i++) { + assertEquals(true, set.remove(i)); + assertEquals(false, set.remove(i)); + } + for (int i = 160; i < 192; i++) { + assertEquals(true, set.remove(i)); + assertEquals(false, set.remove(i)); + } + for (int i = 225; i < 257; i++) { + assertEquals(true, set.remove(i)); + assertEquals(false, set.remove(i)); + } + for (int i = 288; i < 320; i++) { + assertEquals(true, set.remove(i)); + assertEquals(false, set.remove(i)); + } + } + + public void removeRandom() { + FoldedTreeSet set = new FoldedTreeSet<>(); + int[] integers = new int[2048]; + for (int i = 0; i < 2048; i++) { + int val = srand.nextInt(); + while (set.contains(val)) { + val = srand.nextInt(); + } + integers[i] = val; + set.add(val); + } + assertEquals(2048, set.size()); + + for (int val : integers) { + assertEquals(true, set.remove(val)); + assertEquals(false, set.remove(val)); + } + assertEquals(true, set.isEmpty()); + } + + /** + * Test of containsAll method, of class TreeSet. + */ + @Test + public void testContainsAll() { + Collection list = Arrays.asList(new String[]{"apa", "apa2", "apa"}); + FoldedTreeSet instance = new FoldedTreeSet<>(); + assertEquals(false, instance.containsAll(list)); + instance.addAll(list); + assertEquals(true, instance.containsAll(list)); + } + + /** + * Test of addAll method, of class TreeSet. + */ + @Test + public void testAddAll() { + Collection list = Arrays.asList(new String[]{"apa", "apa2", "apa"}); + FoldedTreeSet instance = new FoldedTreeSet<>(); + assertEquals(true, instance.addAll(list)); + assertEquals(false, instance.addAll(list)); // add same entries again + } + + /** + * Test of retainAll method, of class TreeSet. + */ + @Test + public void testRetainAll() { + Collection list = Arrays.asList(new String[]{"apa", "apa2", "apa"}); + FoldedTreeSet instance = new FoldedTreeSet<>(); + instance.addAll(list); + assertEquals(false, instance.retainAll(list)); + assertEquals(2, instance.size()); + Collection list2 = Arrays.asList(new String[]{"apa"}); + assertEquals(true, instance.retainAll(list2)); + assertEquals(1, instance.size()); + } + + /** + * Test of removeAll method, of class TreeSet. + */ + @Test + public void testRemoveAll() { + Collection list = Arrays.asList(new String[]{"apa", "apa2", "apa"}); + FoldedTreeSet instance = new FoldedTreeSet<>(); + assertEquals(false, instance.removeAll(list)); + instance.addAll(list); + assertEquals(true, instance.removeAll(list)); + assertEquals(true, instance.isEmpty()); + } + + /** + * Test of clear method, of class TreeSet. + */ + @Test + public void testClear() { + FoldedTreeSet instance = new FoldedTreeSet<>(); + instance.clear(); + assertEquals(true, instance.isEmpty()); + instance.add("apa"); + assertEquals(false, instance.isEmpty()); + instance.clear(); + assertEquals(true, instance.isEmpty()); + } + + @Test + public void testFillRatio() { + FoldedTreeSet set = new FoldedTreeSet<>(); + final int size = 1024; + for (int i = 1; i <= size; i++) { + set.add(i); + assertEquals("Iteration: " + i, 1.0, set.fillRatio(), 0.0); + } + + for (int i = 1; i <= size / 2; i++) { + set.remove(i * 2); + // Need the max since all the removes from the last node doesn't + // affect the fill ratio + assertEquals("Iteration: " + i, + Math.max((size - i) / (double) size, 0.53125), + set.fillRatio(), 0.0); + } + } + + @Test + public void testCompact() { + FoldedTreeSet set = new FoldedTreeSet<>(); + long[] longs = new long[24553]; + for (int i = 0; i < longs.length; i++) { + Holder val = new Holder(srand.nextLong()); + while (set.contains(val)) { + val = new Holder(srand.nextLong()); + } + longs[i] = val.getId(); + set.add(val); + } + assertEquals(longs.length, set.size()); + + long[] longs2 = new long[longs.length]; + for (int i = 0; i < longs2.length; i++) { + Holder val = new Holder(srand.nextLong()); + while (set.contains(val)) { + val = new Holder(srand.nextLong()); + } + longs2[i] = val.getId(); + set.add(val); + } + assertEquals(longs.length + longs2.length, set.size()); + + // Create fragementation + for (long val : longs) { + assertTrue(set.remove(new Holder(val))); + } + assertEquals(longs2.length, set.size()); + + assertFalse(set.compact(0)); + assertTrue(set.compact(Long.MAX_VALUE)); + assertEquals(longs2.length, set.size()); + for (long val : longs) { + assertFalse(set.remove(new Holder(val))); + } + for (long val : longs2) { + assertEquals(val, set.get(new Holder(val)).getId()); + } + } +}