HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2016-02-02 11:23:00 -08:00
parent 2da03b48eb
commit dd9ebf6eed
31 changed files with 2564 additions and 588 deletions

View File

@ -971,6 +971,9 @@ Release 2.9.0 - UNRELEASED
HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had
an error (Rakesh R via cmccabe)
HDFS-9260. Improve the performance and GC friendliness of NameNode startup
and full block reports (Staffan Friberg via cmccabe)
OPTIMIZATIONS
BUG FIXES

View File

@ -219,6 +219,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";
public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;
public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY
= "dfs.namenode.storageinfo.defragment.interval.ms";
public static final int
DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT = 10 * 60 * 1000;
public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY
= "dfs.namenode.storageinfo.defragment.timeout.ms";
public static final int
DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT = 4;
public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY
= "dfs.namenode.storageinfo.defragment.ratio";
public static final double
DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT = 0.75;
public static final String DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter";
/* Phrased as below to avoid javac inlining as a constant, to match the behavior when
this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you

View File

@ -174,12 +174,13 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
@Override
public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, StorageBlockReport[] reports, BlockReportContext context)
String poolId, StorageBlockReport[] reports,
BlockReportContext context)
throws IOException {
BlockReportRequestProto.Builder builder = BlockReportRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId);
boolean useBlocksBuffer = registration.getNamespaceInfo()
.isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS);

View File

@ -824,8 +824,8 @@ public static JournalInfoProto convert(JournalInfo j) {
public static BlockReportContext convert(BlockReportContextProto proto) {
return new BlockReportContext(proto.getTotalRpcs(),
proto.getCurRpc(), proto.getId(), proto.getLeaseId());
return new BlockReportContext(proto.getTotalRpcs(), proto.getCurRpc(),
proto.getId(), proto.getLeaseId(), proto.getSorted());
}
public static BlockReportContextProto convert(BlockReportContext context) {
@ -834,6 +834,7 @@ public static BlockReportContextProto convert(BlockReportContext context) {
setCurRpc(context.getCurRpc()).
setId(context.getReportId()).
setLeaseId(context.getLeaseId()).
setSorted(context.isSorted()).
build();
}

View File

@ -18,8 +18,9 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
@ -55,19 +56,9 @@ public abstract class BlockInfo extends Block
/** For implementing {@link LightWeightGSet.LinkedElement} interface. */
private LightWeightGSet.LinkedElement nextLinkedElement;
/**
* This array contains triplets of references. For each i-th storage, the
* block belongs to triplets[3*i] is the reference to the
* {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
* references to the previous and the next blocks, respectively, in the list
* of blocks belonging to this storage.
*
* Using previous and next in Object triplets is done instead of a
* {@link LinkedList} list to efficiently use memory. With LinkedList the cost
* per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
* bytes using the triplets.
*/
protected Object[] triplets;
// Storages this block is replicated on
protected DatanodeStorageInfo[] storages;
private BlockUnderConstructionFeature uc;
@ -77,14 +68,14 @@ public abstract class BlockInfo extends Block
* in the block group
*/
public BlockInfo(short size) {
this.triplets = new Object[3 * size];
this.storages = new DatanodeStorageInfo[size];
this.bcId = INVALID_INODE_ID;
this.replication = isStriped() ? 0 : size;
}
public BlockInfo(Block blk, short size) {
super(blk);
this.triplets = new Object[3*size];
this.storages = new DatanodeStorageInfo[size];
this.bcId = INVALID_INODE_ID;
this.replication = isStriped() ? 0 : size;
}
@ -109,79 +100,52 @@ public boolean isDeleted() {
return bcId == INVALID_INODE_ID;
}
public Iterator<DatanodeStorageInfo> getStorageInfos() {
return new Iterator<DatanodeStorageInfo>() {
private int index = 0;
@Override
public boolean hasNext() {
while (index < storages.length && storages[index] == null) {
index++;
}
return index < storages.length;
}
@Override
public DatanodeStorageInfo next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return storages[index++];
}
@Override
public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
}
};
}
public DatanodeDescriptor getDatanode(int index) {
DatanodeStorageInfo storage = getStorageInfo(index);
return storage == null ? null : storage.getDatanodeDescriptor();
}
DatanodeStorageInfo getStorageInfo(int index) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
return (DatanodeStorageInfo)triplets[index*3];
}
BlockInfo getPrevious(int index) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
BlockInfo info = (BlockInfo)triplets[index*3+1];
assert info == null ||
info.getClass().getName().startsWith(BlockInfo.class.getName()) :
"BlockInfo is expected at " + index*3;
return info;
}
BlockInfo getNext(int index) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
BlockInfo info = (BlockInfo)triplets[index*3+2];
assert info == null || info.getClass().getName().startsWith(
BlockInfo.class.getName()) :
"BlockInfo is expected at " + index*3;
return info;
assert this.storages != null : "BlockInfo is not initialized";
return storages[index];
}
void setStorageInfo(int index, DatanodeStorageInfo storage) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
triplets[index*3] = storage;
}
/**
* Return the previous block on the block list for the datanode at
* position index. Set the previous block on the list to "to".
*
* @param index - the datanode index
* @param to - block to be set to previous on the list of blocks
* @return current previous block on the list of blocks
*/
BlockInfo setPrevious(int index, BlockInfo to) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
BlockInfo info = (BlockInfo) triplets[index*3+1];
triplets[index*3+1] = to;
return info;
}
/**
* Return the next block on the block list for the datanode at
* position index. Set the next block on the list to "to".
*
* @param index - the datanode index
* @param to - block to be set to next on the list of blocks
* @return current next block on the list of blocks
*/
BlockInfo setNext(int index, BlockInfo to) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
BlockInfo info = (BlockInfo) triplets[index*3+2];
triplets[index*3+2] = to;
return info;
assert this.storages != null : "BlockInfo is not initialized";
this.storages[index] = storage;
}
public int getCapacity() {
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
return triplets.length / 3;
assert this.storages != null : "BlockInfo is not initialized";
return storages.length;
}
/**
@ -240,80 +204,6 @@ int findStorageInfo(DatanodeStorageInfo storageInfo) {
return -1;
}
/**
* Insert this block into the head of the list of blocks
* related to the specified DatanodeStorageInfo.
* If the head is null then form a new list.
* @return current block as the new head of the list.
*/
BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
int dnIndex = this.findStorageInfo(storage);
assert dnIndex >= 0 : "Data node is not found: current";
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is already in the list and cannot be inserted.";
this.setPrevious(dnIndex, null);
this.setNext(dnIndex, head);
if (head != null) {
head.setPrevious(head.findStorageInfo(storage), this);
}
return this;
}
/**
* Remove this block from the list of blocks
* related to the specified DatanodeStorageInfo.
* If this block is the head of the list then return the next block as
* the new head.
* @return the new head of the list or null if the list becomes
* empy after deletion.
*/
BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
if (head == null) {
return null;
}
int dnIndex = this.findStorageInfo(storage);
if (dnIndex < 0) { // this block is not on the data-node list
return head;
}
BlockInfo next = this.getNext(dnIndex);
BlockInfo prev = this.getPrevious(dnIndex);
this.setNext(dnIndex, null);
this.setPrevious(dnIndex, null);
if (prev != null) {
prev.setNext(prev.findStorageInfo(storage), next);
}
if (next != null) {
next.setPrevious(next.findStorageInfo(storage), prev);
}
if (this == head) { // removing the head
head = next;
}
return head;
}
/**
* Remove this block from the list of blocks related to the specified
* DatanodeDescriptor. Insert it into the head of the list of blocks.
*
* @return the new head of the list.
*/
public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
int curIndex, int headIndex) {
if (head == this) {
return this;
}
BlockInfo next = this.setNext(curIndex, head);
BlockInfo prev = this.setPrevious(curIndex, null);
head.setPrevious(headIndex, this);
prev.setNext(prev.findStorageInfo(storage), next);
if (next != null) {
next.setPrevious(next.findStorageInfo(storage), prev);
}
return this;
}
@Override
public int hashCode() {
// Super implementation is sufficient

View File

@ -35,20 +35,20 @@ public BlockInfoContiguous(Block blk, short size) {
}
/**
* Ensure that there is enough space to include num more triplets.
* @return first free triplet index.
* Ensure that there is enough space to include num more storages.
* @return first free storage index.
*/
private int ensureCapacity(int num) {
assert this.triplets != null : "BlockInfo is not initialized";
assert this.storages != null : "BlockInfo is not initialized";
int last = numNodes();
if (triplets.length >= (last+num)*3) {
if (storages.length >= (last+num)) {
return last;
}
/* Not enough space left. Create a new array. Should normally
* happen only when replication is manually increased by the user. */
Object[] old = triplets;
triplets = new Object[(last+num)*3];
System.arraycopy(old, 0, triplets, 0, last * 3);
DatanodeStorageInfo[] old = storages;
storages = new DatanodeStorageInfo[(last+num)];
System.arraycopy(old, 0, storages, 0, last);
return last;
}
@ -57,8 +57,6 @@ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
// find the last null node
int lastNode = ensureCapacity(1);
setStorageInfo(lastNode, storage);
setNext(lastNode, null);
setPrevious(lastNode, null);
return true;
}
@ -68,25 +66,18 @@ boolean removeStorage(DatanodeStorageInfo storage) {
if (dnIndex < 0) { // the node is not found
return false;
}
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is still in the list and must be removed first.";
// find the last not null node
int lastNode = numNodes()-1;
// replace current node triplet by the lastNode one
// replace current node entry by the lastNode one
setStorageInfo(dnIndex, getStorageInfo(lastNode));
setNext(dnIndex, getNext(lastNode));
setPrevious(dnIndex, getPrevious(lastNode));
// set the last triplet to null
// set the last entry to null
setStorageInfo(lastNode, null);
setNext(lastNode, null);
setPrevious(lastNode, null);
return true;
}
@Override
public int numNodes() {
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
assert this.storages != null : "BlockInfo is not initialized";
for (int idx = getCapacity()-1; idx >= 0; idx--) {
if (getDatanode(idx) != null) {

View File

@ -26,21 +26,20 @@
/**
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
*
* We still use triplets to store DatanodeStorageInfo for each block in the
* block group, as well as the previous/next block in the corresponding
* DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
* We still use a storage array to store DatanodeStorageInfo for each block in
* the block group. For a (m+k) block group, the first (m+k) storage units
* are sorted and strictly mapped to the corresponding block.
*
* Normally each block belonging to group is stored in only one DataNode.
* However, it is possible that some block is over-replicated. Thus the triplet
* However, it is possible that some block is over-replicated. Thus the storage
* array's size can be larger than (m+k). Thus currently we use an extra byte
* array to record the block index for each triplet.
* array to record the block index for each entry.
*/
@InterfaceAudience.Private
public class BlockInfoStriped extends BlockInfo {
private final ErasureCodingPolicy ecPolicy;
/**
* Always the same size with triplets. Record the block index for each triplet
* Always the same size with storage. Record the block index for each entry
* TODO: actually this is only necessary for over-replicated block. Thus can
* be further optimized to save memory usage.
*/
@ -104,7 +103,7 @@ private int findSlot() {
return i;
}
}
// need to expand the triplet size
// need to expand the storage size
ensureCapacity(i + 1, true);
return i;
}
@ -130,8 +129,6 @@ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
private void addStorage(DatanodeStorageInfo storage, int index,
int blockIndex) {
setStorageInfo(index, storage);
setNext(index, null);
setPrevious(index, null);
indices[index] = (byte) blockIndex;
}
@ -173,26 +170,22 @@ boolean removeStorage(DatanodeStorageInfo storage) {
if (dnIndex < 0) { // the node is not found
return false;
}
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is still in the list and must be removed first.";
// set the triplet to null
// set the entry to null
setStorageInfo(dnIndex, null);
setNext(dnIndex, null);
setPrevious(dnIndex, null);
indices[dnIndex] = -1;
return true;
}
private void ensureCapacity(int totalSize, boolean keepOld) {
if (getCapacity() < totalSize) {
Object[] old = triplets;
DatanodeStorageInfo[] old = storages;
byte[] oldIndices = indices;
triplets = new Object[totalSize * 3];
storages = new DatanodeStorageInfo[totalSize];
indices = new byte[totalSize];
initIndices();
if (keepOld) {
System.arraycopy(old, 0, triplets, 0, old.length);
System.arraycopy(old, 0, storages, 0, old.length);
System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
}
}
@ -214,8 +207,7 @@ public final boolean isStriped() {
@Override
public int numNodes() {
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
assert this.storages != null : "BlockInfo is not initialized";
int num = 0;
for (int idx = getCapacity()-1; idx >= 0; idx--) {
if (getStorageInfo(idx) != null) {

View File

@ -61,6 +61,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@ -93,6 +94,7 @@
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@ -106,6 +108,7 @@
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@ -195,7 +198,12 @@ public int getPendingDataNodeMessageCount() {
/**replicationRecheckInterval is how often namenode checks for new replication work*/
private final long replicationRecheckInterval;
/** How often to check and the limit for the storageinfo efficiency. */
private final long storageInfoDefragmentInterval;
private final long storageInfoDefragmentTimeout;
private final double storageInfoDefragmentRatio;
/**
* Mapping: Block -> { BlockCollection, datanodes, self ref }
* Updated only in response to client-sent information.
@ -204,6 +212,10 @@ public int getPendingDataNodeMessageCount() {
/** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
/** StorageInfoDefragmenter thread. */
private final Daemon storageInfoDefragmenterThread =
new Daemon(new StorageInfoDefragmenter());
/** Block report thread for handling async reports. */
private final BlockReportProcessingThread blockReportThread =
@ -376,7 +388,20 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
this.storageInfoDefragmentInterval =
conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY,
DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT);
this.storageInfoDefragmentTimeout =
conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY,
DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT);
this.storageInfoDefragmentRatio =
conf.getDouble(
DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY,
DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT);
this.encryptDataTransfer =
conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
@ -508,6 +533,8 @@ public void activate(Configuration conf, long blockTotal) {
datanodeManager.activate(conf);
this.replicationThread.setName("ReplicationMonitor");
this.replicationThread.start();
storageInfoDefragmenterThread.setName("StorageInfoMonitor");
storageInfoDefragmenterThread.start();
this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal);
@ -517,8 +544,10 @@ public void close() {
bmSafeMode.close();
try {
replicationThread.interrupt();
storageInfoDefragmenterThread.interrupt();
blockReportThread.interrupt();
replicationThread.join(3000);
storageInfoDefragmenterThread.join(3000);
blockReportThread.join(3000);
} catch (InterruptedException ie) {
}
@ -1165,9 +1194,15 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
/** Remove the blocks associated to the given datanode. */
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
final Iterator<BlockInfo> it = node.getBlockIterator();
while(it.hasNext()) {
removeStoredBlock(it.next(), node);
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
final Iterator<BlockInfo> it = storage.getBlockIterator();
while (it.hasNext()) {
BlockInfo block = it.next();
// DatanodeStorageInfo must be removed using the iterator to avoid
// ConcurrentModificationException in the underlying storage
it.remove();
removeStoredBlock(block, node);
}
}
// Remove all pending DN messages referencing this DN.
pendingDNMessages.removeAllMessagesForDatanode(node);
@ -1183,6 +1218,9 @@ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
while(it.hasNext()) {
BlockInfo block = it.next();
// DatanodeStorageInfo must be removed using the iterator to avoid
// ConcurrentModificationException in the underlying storage
it.remove();
removeStoredBlock(block, node);
final Block b = getBlockOnStorage(block, storageInfo);
if (b != null) {
@ -2033,8 +2071,8 @@ private static class BlockInfoToAdd {
*/
public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage,
final BlockListAsLongs newReport, BlockReportContext context,
boolean lastStorageInRpc) throws IOException {
final BlockListAsLongs newReport,
BlockReportContext context, boolean lastStorageInRpc) throws IOException {
namesystem.writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime;
@ -2079,7 +2117,8 @@ public boolean processReport(final DatanodeID nodeID,
nodeID.getDatanodeUuid());
processFirstBlockReport(storageInfo, newReport);
} else {
invalidatedBlocks = processReport(storageInfo, newReport);
invalidatedBlocks = processReport(storageInfo, newReport,
context != null ? context.isSorted() : false);
}
storageInfo.receivedBlockReport();
@ -2149,6 +2188,9 @@ private void removeZombieReplicas(BlockReportContext context,
// TODO: remove this assumption in case we want to put a block on
// more than one storage on a datanode (and because it's a difficult
// assumption to really enforce)
// DatanodeStorageInfo must be removed using the iterator to avoid
// ConcurrentModificationException in the underlying storage
iter.remove();
removeStoredBlock(block, zombie.getDatanodeDescriptor());
Block b = getBlockOnStorage(block, zombie);
if (b != null) {
@ -2238,7 +2280,7 @@ void rescanPostponedMisreplicatedBlocks() {
private Collection<Block> processReport(
final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report) throws IOException {
final BlockListAsLongs report, final boolean sorted) throws IOException {
// Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
@ -2248,9 +2290,29 @@ private Collection<Block> processReport(
Collection<Block> toInvalidate = new LinkedList<>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
Collection<StatefulBlockInfo> toUC = new LinkedList<>();
reportDiff(storageInfo, report,
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
Iterable<BlockReportReplica> sortedReport;
if (!sorted) {
blockLog.warn("BLOCK* processReport: Report from the DataNode ({}) is "
+ "unsorted. This will cause overhead on the NameNode "
+ "which needs to sort the Full BR. Please update the "
+ "DataNode to the same version of Hadoop HDFS as the "
+ "NameNode ({}).",
storageInfo.getDatanodeDescriptor().getDatanodeUuid(),
VersionInfo.getVersion());
Set<BlockReportReplica> set = new FoldedTreeSet<>();
for (BlockReportReplica iblk : report) {
set.add(new BlockReportReplica(iblk));
}
sortedReport = set;
} else {
sortedReport = report;
}
reportDiffSorted(storageInfo, sortedReport,
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Process the blocks on each queue
for (StatefulBlockInfo b : toUC) {
@ -2399,126 +2461,111 @@ private void processFirstBlockReport(
}
}
private void reportDiff(DatanodeStorageInfo storageInfo,
BlockListAsLongs newReport,
private void reportDiffSorted(DatanodeStorageInfo storageInfo,
Iterable<BlockReportReplica> newReport,
Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
Block delimiterBlock = new Block();
BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
(short) 1);
AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
assert result == AddBlockResult.ADDED
: "Delimiting block cannot be present in the node";
int headIndex = 0; //currently the delimiter is in the head of the list
int curIndex;
// The blocks must be sorted and the storagenodes blocks must be sorted
Iterator<BlockInfo> storageBlocksIterator = storageInfo.getBlockIterator();
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
BlockInfo storageBlock = null;
if (newReport == null) {
newReport = BlockListAsLongs.EMPTY;
}
// scan the report and process newly reported blocks
for (BlockReportReplica iblk : newReport) {
ReplicaState iState = iblk.getState();
BlockInfo storedBlock = processReportedBlock(storageInfo,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
for (BlockReportReplica replica : newReport) {
// move block to the head of the list
if (storedBlock != null &&
(curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) {
headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
long replicaID = replica.getBlockId();
if (BlockIdManager.isStripedBlockID(replicaID)
&& (!hasNonEcBlockUsingStripedID ||
!blocksMap.containsBlock(replica))) {
replicaID = BlockIdManager.convertToStripedID(replicaID);
}
ReplicaState reportedState = replica.getState();
if (LOG.isDebugEnabled()) {
LOG.debug("Reported block " + replica
+ " on " + dn + " size " + replica.getNumBytes()
+ " replicaState = " + reportedState);
}
if (shouldPostponeBlocksFromFuture
&& isGenStampInFuture(replica)) {
queueReportedBlock(storageInfo, replica, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
continue;
}
if (storageBlock == null && storageBlocksIterator.hasNext()) {
storageBlock = storageBlocksIterator.next();
}
do {
int cmp;
if (storageBlock == null ||
(cmp = Long.compare(replicaID, storageBlock.getBlockId())) < 0) {
// Check if block is available in NN but not yet on this storage
BlockInfo nnBlock = blocksMap.getStoredBlock(new Block(replicaID));
if (nnBlock != null) {
reportDiffSortedInner(storageInfo, replica, reportedState,
nnBlock, toAdd, toCorrupt, toUC);
} else {
// Replica not found anywhere so it should be invalidated
toInvalidate.add(new Block(replica));
}
break;
} else if (cmp == 0) {
// Replica matched current storageblock
reportDiffSortedInner(storageInfo, replica, reportedState,
storageBlock, toAdd, toCorrupt, toUC);
storageBlock = null;
} else {
// replica has higher ID than storedBlock
// Remove all stored blocks with IDs lower than replica
do {
toRemove.add(storageBlock);
storageBlock = storageBlocksIterator.hasNext()
? storageBlocksIterator.next() : null;
} while (storageBlock != null &&
Long.compare(replicaID, storageBlock.getBlockId()) > 0);
}
} while (storageBlock != null);
}
// collect blocks that have not been reported
// all of them are next to the delimiter
Iterator<BlockInfo> it =
storageInfo.new BlockIterator(delimiter.getNext(0));
while (it.hasNext()) {
toRemove.add(it.next());
// Iterate any remaing blocks that have not been reported and remove them
while (storageBlocksIterator.hasNext()) {
toRemove.add(storageBlocksIterator.next());
}
storageInfo.removeBlock(delimiter);
}
/**
* Process a block replica reported by the data-node.
* No side effects except adding to the passed-in Collections.
*
* <ol>
* <li>If the block is not known to the system (not in blocksMap) then the
* data-node should be notified to invalidate this block.</li>
* <li>If the reported replica is valid that is has the same generation stamp
* and length as recorded on the name-node, then the replica location should
* be added to the name-node.</li>
* <li>If the reported replica is not valid, then it is marked as corrupt,
* which triggers replication of the existing valid replicas.
* Corrupt replicas are removed from the system when the block
* is fully replicated.</li>
* <li>If the reported replica is for a block currently marked "under
* construction" in the NN, then it should be added to the
* BlockUnderConstructionFeature's list of replicas.</li>
* </ol>
*
* @param storageInfo DatanodeStorageInfo that sent the report.
* @param block reported block replica
* @param reportedState reported replica state
* @param toAdd add to DatanodeDescriptor
* @param toInvalidate missing blocks (not in the blocks map)
* should be removed from the data-node
* @param toCorrupt replicas with unexpected length or generation stamp;
* add to corrupt replicas
* @param toUC replicas of blocks currently under construction
* @return the up-to-date stored block, if it should be kept.
* Otherwise, null.
*/
private BlockInfo processReportedBlock(
private void reportDiffSortedInner(
final DatanodeStorageInfo storageInfo,
final Block block, final ReplicaState reportedState,
final BlockReportReplica replica, final ReplicaState reportedState,
final BlockInfo storedBlock,
final Collection<BlockInfoToAdd> toAdd,
final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
if(LOG.isDebugEnabled()) {
LOG.debug("Reported block " + block
+ " on " + dn + " size " + block.getNumBytes()
+ " replicaState = " + reportedState);
}
if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
queueReportedBlock(storageInfo, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
return null;
}
// find block by blockId
BlockInfo storedBlock = getStoredBlock(block);
if(storedBlock == null) {
// If blocksMap does not contain reported block id,
// the replica should be removed from the data-node.
toInvalidate.add(new Block(block));
return null;
}
assert replica != null;
assert storedBlock != null;
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
BlockUCState ucState = storedBlock.getBlockUCState();
// Block is on the NN
if(LOG.isDebugEnabled()) {
if (LOG.isDebugEnabled()) {
LOG.debug("In memory blockUCState = " + ucState);
}
// Ignore replicas already scheduled to be removed from the DN
if(invalidateBlocks.contains(dn, block)) {
return storedBlock;
if (invalidateBlocks.contains(dn, replica)) {
return;
}
BlockToMarkCorrupt c = checkReplicaCorrupt(
block, reportedState, storedBlock, ucState, dn);
BlockToMarkCorrupt c = checkReplicaCorrupt(replica, reportedState,
storedBlock, ucState, dn);
if (c != null) {
if (shouldPostponeBlocksFromFuture) {
// If the block is an out-of-date generation stamp or state,
@ -2532,23 +2579,16 @@ private BlockInfo processReportedBlock(
} else {
toCorrupt.add(c);
}
return storedBlock;
} else if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
toUC.add(new StatefulBlockInfo(storedBlock, new Block(replica),
reportedState));
} else if (reportedState == ReplicaState.FINALIZED &&
(storedBlock.findStorageInfo(storageInfo) == -1 ||
corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
toAdd.add(new BlockInfoToAdd(storedBlock, replica));
}
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
toUC.add(new StatefulBlockInfo(storedBlock,
new Block(block), reportedState));
return storedBlock;
}
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED
&& (storedBlock.findStorageInfo(storageInfo) == -1 ||
corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
toAdd.add(new BlockInfoToAdd(storedBlock, block));
}
return storedBlock;
}
/**
@ -2774,7 +2814,7 @@ private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
}
// just add it
AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
AddBlockResult result = storageInfo.addBlockInitial(storedBlock, reported);
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
@ -3497,40 +3537,75 @@ private void processAndHandleReportedBlock(
DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
// blockReceived reports a finalized block
Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
toCorrupt, toUC);
// the block is only in one of the to-do lists
// if it is in none then data-node already has it
assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
: "The block should be only in one of the lists.";
if(LOG.isDebugEnabled()) {
LOG.debug("Reported block " + block
+ " on " + node + " size " + block.getNumBytes()
+ " replicaState = " + reportedState);
}
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b, storageInfo);
if (shouldPostponeBlocksFromFuture &&
isGenStampInFuture(block)) {
queueReportedBlock(storageInfo, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
return;
}
long numBlocksLogged = 0;
for (BlockInfoToAdd b : toAdd) {
addStoredBlock(b.stored, b.reported, storageInfo, delHintNode,
numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
blockLog.debug("BLOCK* addBlock: logged info for {} of {} reported.",
maxNumBlocksToLog, numBlocksLogged);
}
for (Block b : toInvalidate) {
// find block by blockId
BlockInfo storedBlock = getStoredBlock(block);
if(storedBlock == null) {
// If blocksMap does not contain reported block id,
// the replica should be removed from the data-node.
blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
"belong to any file", b, node, b.getNumBytes());
addToInvalidates(b, node);
"belong to any file", block, node, block.getNumBytes());
addToInvalidates(new Block(block), node);
return;
}
for (BlockToMarkCorrupt b : toCorrupt) {
markBlockAsCorrupt(b, storageInfo, node);
BlockUCState ucState = storedBlock.getBlockUCState();
// Block is on the NN
if(LOG.isDebugEnabled()) {
LOG.debug("In memory blockUCState = " + ucState);
}
// Ignore replicas already scheduled to be removed from the DN
if(invalidateBlocks.contains(node, block)) {
return;
}
BlockToMarkCorrupt c = checkReplicaCorrupt(
block, reportedState, storedBlock, ucState, node);
if (c != null) {
if (shouldPostponeBlocksFromFuture) {
// If the block is an out-of-date generation stamp or state,
// but we're the standby, we shouldn't treat it as corrupt,
// but instead just queue it for later processing.
// TODO: Pretty confident this should be s/storedBlock/block below,
// since we should be postponing the info of the reported block, not
// the stored block. See HDFS-6289 for more context.
queueReportedBlock(storageInfo, storedBlock, reportedState,
QUEUE_REASON_CORRUPT_STATE);
} else {
markBlockAsCorrupt(c, storageInfo, node);
}
return;
}
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
addStoredBlockUnderConstruction(
new StatefulBlockInfo(storedBlock, new Block(block), reportedState),
storageInfo);
return;
}
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED
&& (storedBlock.findStorageInfo(storageInfo) == -1 ||
corruptReplicas.isReplicaCorrupt(storedBlock, node))) {
addStoredBlock(storedBlock, block, storageInfo, delHintNode, true);
}
}
@ -4060,6 +4135,87 @@ public void run() {
}
}
/**
* Runnable that monitors the fragmentation of the StorageInfo TreeSet and
* compacts it when it falls under a certain threshold.
*/
private class StorageInfoDefragmenter implements Runnable {
@Override
public void run() {
while (namesystem.isRunning()) {
try {
// Check storage efficiency only when active NN is out of safe mode.
if (isPopulatingReplQueues()) {
scanAndCompactStorages();
}
Thread.sleep(storageInfoDefragmentInterval);
} catch (Throwable t) {
if (!namesystem.isRunning()) {
LOG.info("Stopping thread.");
if (!(t instanceof InterruptedException)) {
LOG.info("Received an exception while shutting down.", t);
}
break;
} else if (!checkNSRunning && t instanceof InterruptedException) {
LOG.info("Stopping for testing.");
break;
}
LOG.error("Thread received Runtime exception.", t);
terminate(1, t);
}
}
}
private void scanAndCompactStorages() throws InterruptedException {
ArrayList<String> datanodesAndStorages = new ArrayList<>();
for (DatanodeDescriptor node
: datanodeManager.getDatanodeListForReport(DatanodeReportType.ALL)) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
try {
namesystem.readLock();
double ratio = storage.treeSetFillRatio();
if (ratio < storageInfoDefragmentRatio) {
datanodesAndStorages.add(node.getDatanodeUuid());
datanodesAndStorages.add(storage.getStorageID());
}
LOG.info("StorageInfo TreeSet fill ratio {} : {}{}",
storage.getStorageID(), ratio,
(ratio < storageInfoDefragmentRatio)
? " (queued for defragmentation)" : "");
} finally {
namesystem.readUnlock();
}
}
}
if (!datanodesAndStorages.isEmpty()) {
for (int i = 0; i < datanodesAndStorages.size(); i += 2) {
namesystem.writeLock();
try {
DatanodeStorageInfo storage = datanodeManager.
getDatanode(datanodesAndStorages.get(i)).
getStorageInfo(datanodesAndStorages.get(i + 1));
if (storage != null) {
boolean aborted =
!storage.treeSetCompact(storageInfoDefragmentTimeout);
if (aborted) {
// Compaction timed out, reset iterator to continue with
// the same storage next iteration.
i -= 2;
}
LOG.info("StorageInfo TreeSet defragmented {} : {}{}",
storage.getStorageID(), storage.treeSetFillRatio(),
aborted ? " (aborted)" : "");
}
} finally {
namesystem.writeUnlock();
}
// Wait between each iteration
Thread.sleep(1000);
}
}
}
}
/**
* Compute block replication and block invalidation work that can be scheduled

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Collections;
import java.util.Iterator;
import org.apache.hadoop.hdfs.protocol.Block;
@ -30,37 +31,6 @@
* the datanodes that store the block.
*/
class BlocksMap {
private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
private final BlockInfo blockInfo;
private int nextIdx = 0;
StorageIterator(BlockInfo blkInfo) {
this.blockInfo = blkInfo;
}
@Override
public boolean hasNext() {
if (blockInfo == null) {
return false;
}
while (nextIdx < blockInfo.getCapacity() &&
blockInfo.getDatanode(nextIdx) == null) {
// note that for striped blocks there may be null in the triplets
nextIdx++;
}
return nextIdx < blockInfo.getCapacity();
}
@Override
public DatanodeStorageInfo next() {
return blockInfo.getStorageInfo(nextIdx++);
}
@Override
public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
}
}
/** Constant {@link LightWeightGSet} capacity. */
private final int capacity;
@ -132,6 +102,16 @@ void removeBlock(Block block) {
}
}
/**
* Check if BlocksMap contains the block.
*
* @param b Block to check
* @return true if block is in the map, otherwise false
*/
boolean containsBlock(Block b) {
return blocks.contains(b);
}
/** Returns the block object if it exists in the map. */
BlockInfo getStoredBlock(Block b) {
return blocks.get(b);
@ -142,7 +122,9 @@ BlockInfo getStoredBlock(Block b) {
* returns {@link Iterable} of the storages the block belongs to.
*/
Iterable<DatanodeStorageInfo> getStorages(Block b) {
return getStorages(blocks.get(b));
BlockInfo block = blocks.get(b);
return block != null ? getStorages(block)
: Collections.<DatanodeStorageInfo>emptyList();
}
/**
@ -150,12 +132,16 @@ Iterable<DatanodeStorageInfo> getStorages(Block b) {
* returns {@link Iterable} of the storages the block belongs to.
*/
Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
return new Iterable<DatanodeStorageInfo>() {
@Override
public Iterator<DatanodeStorageInfo> iterator() {
return new StorageIterator(storedBlock);
}
};
if (storedBlock == null) {
return Collections.emptyList();
} else {
return new Iterable<DatanodeStorageInfo>() {
@Override
public Iterator<DatanodeStorageInfo> iterator() {
return storedBlock.getStorageInfos();
}
};
}
}
/** counts number of containing nodes. Better than using iterator. */
@ -174,7 +160,7 @@ boolean removeNode(Block b, DatanodeDescriptor node) {
if (info == null)
return false;
// remove block from the data-node list and the node from the block info
// remove block from the data-node set and the node from the block info
boolean removed = removeBlock(node, info);
if (info.hasNoStorage() // no datanodes left
@ -185,7 +171,7 @@ boolean removeNode(Block b, DatanodeDescriptor node) {
}
/**
* Remove block from the list of blocks belonging to the data-node. Remove
* Remove block from the set of blocks belonging to the data-node. Remove
* data-node from the block.
*/
static boolean removeBlock(DatanodeDescriptor dn, BlockInfo b) {

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import com.google.common.annotations.VisibleForTesting;
@ -85,31 +86,6 @@ public void updateFromStorage(DatanodeStorage storage) {
storageType = storage.getStorageType();
}
/**
* Iterates over the list of blocks belonging to the data-node.
*/
class BlockIterator implements Iterator<BlockInfo> {
private BlockInfo current;
BlockIterator(BlockInfo head) {
this.current = head;
}
public boolean hasNext() {
return current != null;
}
public BlockInfo next() {
BlockInfo res = current;
current = current.getNext(current.findStorageInfo(DatanodeStorageInfo.this));
return res;
}
public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
}
}
private final DatanodeDescriptor dn;
private final String storageID;
private StorageType storageType;
@ -120,8 +96,7 @@ public void remove() {
private volatile long remaining;
private long blockPoolUsed;
private volatile BlockInfo blockList = null;
private int numBlocks = 0;
private final FoldedTreeSet<BlockInfo> blocks = new FoldedTreeSet<>();
// The ID of the last full block report which updated this storage.
private long lastBlockReportId = 0;
@ -207,7 +182,7 @@ void setState(State state) {
}
boolean areBlocksOnFailedStorage() {
return getState() == State.FAILED && numBlocks != 0;
return getState() == State.FAILED && !blocks.isEmpty();
}
@VisibleForTesting
@ -234,6 +209,36 @@ long getRemaining() {
long getBlockPoolUsed() {
return blockPoolUsed;
}
/**
* For use during startup. Expects block to be added in sorted order
* to enable fast insert in to the DatanodeStorageInfo
*
* @param b Block to add to DatanodeStorageInfo
* @param reportedBlock The reported replica
* @return Enum describing if block was added, replaced or already existed
*/
public AddBlockResult addBlockInitial(BlockInfo b, Block reportedBlock) {
// First check whether the block belongs to a different storage
// on the same DN.
AddBlockResult result = AddBlockResult.ADDED;
DatanodeStorageInfo otherStorage =
b.findStorageInfo(getDatanodeDescriptor());
if (otherStorage != null) {
if (otherStorage != this) {
// The block belongs to a different storage. Remove it first.
otherStorage.removeBlock(b);
result = AddBlockResult.REPLACED;
} else {
// The block is already associated with this storage.
return AddBlockResult.ALREADY_EXIST;
}
}
b.addStorage(this, reportedBlock);
blocks.addSortedLast(b);
return result;
}
public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
// First check whether the block belongs to a different storage
@ -253,9 +258,8 @@ public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
}
}
// add to the head of the data-node list
b.addStorage(this, reportedBlock);
insertToList(b);
blocks.add(b);
return result;
}
@ -263,45 +267,17 @@ AddBlockResult addBlock(BlockInfo b) {
return addBlock(b, b);
}
public void insertToList(BlockInfo b) {
blockList = b.listInsert(blockList, this);
numBlocks++;
}
public boolean removeBlock(BlockInfo b) {
blockList = b.listRemove(blockList, this);
if (b.removeStorage(this)) {
numBlocks--;
return true;
} else {
return false;
}
boolean removeBlock(BlockInfo b) {
blocks.remove(b);
return b.removeStorage(this);
}
int numBlocks() {
return numBlocks;
return blocks.size();
}
Iterator<BlockInfo> getBlockIterator() {
return new BlockIterator(blockList);
}
/**
* Move block to the head of the list of blocks belonging to the data-node.
* @return the index of the head of the blockList
*/
int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
return curIndex;
}
/**
* Used for testing only
* @return the head of the blockList
*/
@VisibleForTesting
BlockInfo getBlockListHeadForTesting(){
return blockList;
return blocks.iterator();
}
void updateState(StorageReport r) {
@ -349,6 +325,27 @@ StorageReport toStorageReport() {
false, capacity, dfsUsed, remaining, blockPoolUsed);
}
/**
* The fill ratio of the underlying TreeSet holding blocks.
*
* @return the fill ratio of the tree
*/
public double treeSetFillRatio() {
return blocks.fillRatio();
}
/**
* Compact the underlying TreeSet holding blocks.
*
* @param timeout Maximum time to spend compacting the tree set in
* milliseconds.
*
* @return true if compaction completed, false if aborted
*/
public boolean treeSetCompact(long timeout) {
return blocks.compact(timeout);
}
static Iterable<StorageType> toStorageTypes(
final Iterable<DatanodeStorageInfo> infos) {
return new Iterable<StorageType>() {

View File

@ -461,7 +461,7 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
// Below split threshold, send all reports in a single message.
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), reports,
new BlockReportContext(1, 0, reportId, fullBrLeaseId));
new BlockReportContext(1, 0, reportId, fullBrLeaseId, true));
numRPCs = 1;
numReportsSent = reports.length;
if (cmd != null) {
@ -474,7 +474,7 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), singleReport,
new BlockReportContext(reports.length, r, reportId,
fullBrLeaseId));
fullBrLeaseId, true));
numReportsSent++;
numRPCs++;
if (cmd != null) {

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.util.LightWeightResizableGSet;
import org.apache.hadoop.hdfs.util.FoldedTreeSet;
/**
* Maintains the replica map.
@ -33,9 +34,20 @@ class ReplicaMap {
// Object using which this class is synchronized
private final Object mutex;
// Map of block pool Id to another map of block Id to ReplicaInfo.
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>();
// Map of block pool Id to a set of ReplicaInfo.
private final Map<String, FoldedTreeSet<ReplicaInfo>> map = new HashMap<>();
// Special comparator used to compare Long to Block ID in the TreeSet.
private static final Comparator<Object> LONG_AND_BLOCK_COMPARATOR
= new Comparator<Object>() {
@Override
public int compare(Object o1, Object o2) {
long lookup = (long) o1;
long stored = ((Block) o2).getBlockId();
return lookup > stored ? 1 : lookup < stored ? -1 : 0;
}
};
ReplicaMap(Object mutex) {
if (mutex == null) {
@ -92,11 +104,14 @@ ReplicaInfo get(String bpid, Block block) {
ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid);
synchronized(mutex) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.get(new Block(blockId)) : null;
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
return null;
}
return set.get(blockId, LONG_AND_BLOCK_COMPARATOR);
}
}
/**
* Add a replica's meta information into the map
*
@ -109,13 +124,13 @@ ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
synchronized(mutex) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
// Add an entry for block pool if it does not exist already
m = new LightWeightResizableGSet<Block, ReplicaInfo>();
map.put(bpid, m);
set = new FoldedTreeSet<>();
map.put(bpid, set);
}
return m.put(replicaInfo);
return set.addOrReplace(replicaInfo);
}
}
@ -138,12 +153,13 @@ ReplicaInfo remove(String bpid, Block block) {
checkBlockPool(bpid);
checkBlock(block);
synchronized(mutex) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
ReplicaInfo replicaInfo = m.get(block);
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set != null) {
ReplicaInfo replicaInfo =
set.get(block.getBlockId(), LONG_AND_BLOCK_COMPARATOR);
if (replicaInfo != null &&
block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
return m.remove(block);
return set.removeAndGet(replicaInfo);
}
}
}
@ -160,9 +176,9 @@ ReplicaInfo remove(String bpid, Block block) {
ReplicaInfo remove(String bpid, long blockId) {
checkBlockPool(bpid);
synchronized(mutex) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
return m.remove(new Block(blockId));
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set != null) {
return set.removeAndGet(blockId, LONG_AND_BLOCK_COMPARATOR);
}
}
return null;
@ -174,10 +190,9 @@ ReplicaInfo remove(String bpid, long blockId) {
* @return the number of replicas in the map
*/
int size(String bpid) {
LightWeightResizableGSet<Block, ReplicaInfo> m = null;
synchronized(mutex) {
m = map.get(bpid);
return m != null ? m.size() : 0;
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
return set != null ? set.size() : 0;
}
}
@ -192,19 +207,17 @@ int size(String bpid) {
* @return a collection of the replicas belonging to the block pool
*/
Collection<ReplicaInfo> replicas(String bpid) {
LightWeightResizableGSet<Block, ReplicaInfo> m = null;
m = map.get(bpid);
return m != null ? m.values() : null;
return map.get(bpid);
}
void initBlockPool(String bpid) {
checkBlockPool(bpid);
synchronized(mutex) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
// Add an entry for block pool if it does not exist already
m = new LightWeightResizableGSet<Block, ReplicaInfo>();
map.put(bpid, m);
set = new FoldedTreeSet<>();
map.put(bpid, set);
}
}
}

View File

@ -52,12 +52,16 @@ public class BlockReportContext {
*/
private final long leaseId;
private final boolean sorted;
public BlockReportContext(int totalRpcs, int curRpc,
long reportId, long leaseId) {
long reportId, long leaseId,
boolean sorted) {
this.totalRpcs = totalRpcs;
this.curRpc = curRpc;
this.reportId = reportId;
this.leaseId = leaseId;
this.sorted = sorted;
}
public int getTotalRpcs() {
@ -75,4 +79,8 @@ public long getReportId() {
public long getLeaseId() {
return leaseId;
}
public boolean isSorted() {
return sorted;
}
}

View File

@ -131,7 +131,6 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
* Each finalized block is represented as 3 longs. Each under-
* construction replica is represented as 4 longs.
* This is done instead of Block[] to reduce memory used by block reports.
* @param reports report of blocks per storage
* @param context Context information for this block report.
*
* @return - the next command for DN to process.

View File

@ -261,6 +261,9 @@ message BlockReportContextProto {
// The block report lease ID, or 0 if we are sending without a lease to
// bypass rate-limiting.
optional uint64 leaseId = 4 [ default = 0 ];
// True if the reported blocks are sorted by increasing block IDs
optional bool sorted = 5 [default = false];
}
/**

View File

@ -228,7 +228,7 @@ public BlockReportResponseProto answer(InvocationOnMock invocation) {
request.set(null);
nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
nn.blockReport(reg, "pool", sbr,
new BlockReportContext(1, 0, System.nanoTime(), 0L));
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
BlockReportRequestProto proto = request.get();
assertNotNull(proto);
assertTrue(proto.getReports(0).getBlocksList().isEmpty());
@ -238,7 +238,7 @@ public BlockReportResponseProto answer(InvocationOnMock invocation) {
request.set(null);
nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
nn.blockReport(reg, "pool", sbr,
new BlockReportContext(1, 0, System.nanoTime(), 0L));
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
proto = request.get();
assertNotNull(proto);
assertFalse(proto.getReports(0).getBlocksList().isEmpty());

View File

@ -19,19 +19,11 @@
import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.junit.Assert;
import org.junit.Test;
@ -91,84 +83,4 @@ public void testReplaceStorage() throws Exception {
Assert.assertThat(added, is(false));
Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2));
}
@Test
public void testBlockListMoveToHead() throws Exception {
LOG.info("BlockInfo moveToHead tests...");
final int MAX_BLOCKS = 10;
DatanodeStorageInfo dd = DFSTestUtil.createDatanodeStorageInfo("s1", "1.1.1.1");
ArrayList<Block> blockList = new ArrayList<Block>(MAX_BLOCKS);
ArrayList<BlockInfo> blockInfoList = new ArrayList<BlockInfo>();
int headIndex;
int curIndex;
LOG.info("Building block list...");
for (int i = 0; i < MAX_BLOCKS; i++) {
blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
blockInfoList.add(new BlockInfoContiguous(blockList.get(i), (short) 3));
dd.addBlock(blockInfoList.get(i));
// index of the datanode should be 0
assertEquals("Find datanode should be 0", 0, blockInfoList.get(i)
.findStorageInfo(dd));
}
// list length should be equal to the number of blocks we inserted
LOG.info("Checking list length...");
assertEquals("Length should be MAX_BLOCK", MAX_BLOCKS, dd.numBlocks());
Iterator<BlockInfo> it = dd.getBlockIterator();
int len = 0;
while (it.hasNext()) {
it.next();
len++;
}
assertEquals("There should be MAX_BLOCK blockInfo's", MAX_BLOCKS, len);
headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
LOG.info("Moving each block to the head of the list...");
for (int i = 0; i < MAX_BLOCKS; i++) {
curIndex = blockInfoList.get(i).findStorageInfo(dd);
headIndex = dd.moveBlockToHead(blockInfoList.get(i), curIndex, headIndex);
// the moved element must be at the head of the list
assertEquals("Block should be at the head of the list now.",
blockInfoList.get(i), dd.getBlockListHeadForTesting());
}
// move head of the list to the head - this should not change the list
LOG.info("Moving head to the head...");
BlockInfo temp = dd.getBlockListHeadForTesting();
curIndex = 0;
headIndex = 0;
dd.moveBlockToHead(temp, curIndex, headIndex);
assertEquals(
"Moving head to the head of the list shopuld not change the list",
temp, dd.getBlockListHeadForTesting());
// check all elements of the list against the original blockInfoList
LOG.info("Checking elements of the list...");
temp = dd.getBlockListHeadForTesting();
assertNotNull("Head should not be null", temp);
int c = MAX_BLOCKS - 1;
while (temp != null) {
assertEquals("Expected element is not on the list",
blockInfoList.get(c--), temp);
temp = temp.getNext(0);
}
LOG.info("Moving random blocks to the head of the list...");
headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
Random rand = new Random();
for (int i = 0; i < MAX_BLOCKS; i++) {
int j = rand.nextInt(MAX_BLOCKS);
curIndex = blockInfoList.get(j).findStorageInfo(dd);
headIndex = dd.moveBlockToHead(blockInfoList.get(j), curIndex, headIndex);
// the moved element must be at the head of the list
assertEquals("Block should be at the head of the list now.",
blockInfoList.get(j), dd.getBlockListHeadForTesting());
}
}
}

View File

@ -32,6 +32,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
@ -75,6 +76,7 @@
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -806,7 +808,8 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception {
// Make sure it's the first full report
assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
builder.build(), null, false);
builder.build(),
new BlockReportContext(1, 0, System.nanoTime(), 0, true), false);
assertEquals(1, ds.getBlockReportCount());
// verify the storage info is correct
@ -821,6 +824,70 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception {
(ds) >= 0);
}
@Test
public void testFullBR() throws Exception {
doReturn(true).when(fsn).isRunning();
DatanodeDescriptor node = nodes.get(0);
DatanodeStorageInfo ds = node.getStorageInfos()[0];
node.setAlive(true);
DatanodeRegistration nodeReg = new DatanodeRegistration(node, null, null, "");
// register new node
bm.getDatanodeManager().registerDatanode(nodeReg);
bm.getDatanodeManager().addDatanode(node);
assertEquals(node, bm.getDatanodeManager().getDatanode(node));
assertEquals(0, ds.getBlockReportCount());
ArrayList<BlockInfo> blocks = new ArrayList<>();
for (int id = 24; id > 0; id--) {
blocks.add(addBlockToBM(id));
}
// Make sure it's the first full report
assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
generateReport(blocks),
new BlockReportContext(1, 0, System.nanoTime(), 0, false),
false);
assertEquals(1, ds.getBlockReportCount());
// verify the storage info is correct
for (BlockInfo block : blocks) {
assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
}
// Send unsorted report
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
generateReport(blocks),
new BlockReportContext(1, 0, System.nanoTime(), 0, false),
false);
assertEquals(2, ds.getBlockReportCount());
// verify the storage info is correct
for (BlockInfo block : blocks) {
assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
}
// Sort list and send a sorted report
Collections.sort(blocks);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
generateReport(blocks),
new BlockReportContext(1, 0, System.nanoTime(), 0, true),
false);
assertEquals(3, ds.getBlockReportCount());
// verify the storage info is correct
for (BlockInfo block : blocks) {
assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
}
}
private BlockListAsLongs generateReport(List<BlockInfo> blocks) {
BlockListAsLongs.Builder builder = BlockListAsLongs.builder();
for (BlockInfo block : blocks) {
builder.add(new FinalizedReplica(block, null, null));
}
return builder.build();
}
private BlockInfo addBlockToBM(long blkId) {
Block block = new Block(blkId);
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
@ -1061,4 +1128,4 @@ public void run() {
cluster.shutdown();
}
}
}
}

View File

@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import javax.management.NotCompliantMBeanException;
@ -566,7 +567,7 @@ public synchronized void injectBlocks(String bpid,
}
Map<Block, BInfo> map = blockMap.get(bpid);
if (map == null) {
map = new HashMap<Block, BInfo>();
map = new TreeMap<>();
blockMap.put(bpid, map);
}
@ -1206,7 +1207,7 @@ public long getReplicaVisibleLength(ExtendedBlock block) {
@Override // FsDatasetSpi
public void addBlockPool(String bpid, Configuration conf) {
Map<Block, BInfo> map = new HashMap<Block, BInfo>();
Map<Block, BInfo> map = new TreeMap<>();
blockMap.put(bpid, map);
storage.addBlockPool(bpid);
}

View File

@ -19,14 +19,23 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -108,12 +117,13 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException {
StorageBlockReport reports[] =
new StorageBlockReport[cluster.getStoragesPerDatanode()];
ArrayList<Replica> blocks = new ArrayList<Replica>();
ArrayList<ReplicaInfo> blocks = new ArrayList<>();
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
Block localBlock = locatedBlock.getBlock().getLocalBlock();
blocks.add(new FinalizedReplica(localBlock, null, null));
}
Collections.sort(blocks);
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
@ -126,7 +136,7 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException {
// Should not assert!
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
new BlockReportContext(1, 0, System.nanoTime(), 0L));
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
// Get the block locations once again.
locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);

View File

@ -82,6 +82,7 @@
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.timeout;

View File

@ -65,6 +65,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -190,7 +191,8 @@ public void testVolumeFailure() throws Exception {
new StorageBlockReport(dnStorage, blockList);
}
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null);
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports,
new BlockReportContext(1, 0, System.nanoTime(), 0, true));
// verify number of blocks and files...
verify(filename, filesize);

View File

@ -134,7 +134,7 @@ public void testAlwaysSplit() throws IOException, InterruptedException {
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
any(DatanodeRegistration.class),
anyString(),
captor.capture(), Mockito.<BlockReportContext>anyObject());
captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
}
@ -166,7 +166,7 @@ public void testCornerCaseUnderThreshold() throws IOException, InterruptedExcept
Mockito.verify(nnSpy, times(1)).blockReport(
any(DatanodeRegistration.class),
anyString(),
captor.capture(), Mockito.<BlockReportContext>anyObject());
captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE);
}

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.util.Time;
/**
@ -40,7 +39,7 @@ protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
StorageBlockReport[] singletonReport = { report };
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
new BlockReportContext(reports.length, i, System.nanoTime(), 0L));
new BlockReportContext(reports.length, i, System.nanoTime(), 0L, true));
i++;
}
}

View File

@ -36,6 +36,6 @@ protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException {
LOG.info("Sending combined block reports for " + dnR);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
new BlockReportContext(1, 0, System.nanoTime(), 0L));
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.timeout;

View File

@ -973,7 +973,7 @@ void register() throws IOException {
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
};
dataNodeProto.blockReport(dnRegistration, bpid, reports,
new BlockReportContext(1, 0, System.nanoTime(), 0L));
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
}
/**
@ -1247,7 +1247,7 @@ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
StorageBlockReport[] report = { new StorageBlockReport(
dn.storage, dn.getBlockReportList()) };
dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
new BlockReportContext(1, 0, System.nanoTime(), 0L));
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
long end = Time.now();
return end-start;
}

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@ -303,7 +304,8 @@ public void testAddUCReplica() throws Exception {
StorageBlockReport[] reports = {new StorageBlockReport(storage,
bll)};
cluster.getNameNodeRpc().blockReport(dn.getDNRegistrationForBP(bpId),
bpId, reports, null);
bpId, reports,
new BlockReportContext(1, 0, System.nanoTime(), 0, true));
}
DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature()

View File

@ -118,7 +118,7 @@ public void testDeadDatanode() throws Exception {
BlockListAsLongs.EMPTY) };
try {
dnp.blockReport(reg, poolId, report,
new BlockReportContext(1, 0, System.nanoTime(), 0L));
new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
fail("Expected IOException is not thrown");
} catch (IOException ex) {
// Expected

View File

@ -0,0 +1,644 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Random;
/**
* Test of TreeSet
*/
public class FoldedTreeSetTest {
private static Random srand;
public FoldedTreeSetTest() {
}
@BeforeClass
public static void setUpClass() {
long seed = System.nanoTime();
System.out.println("This run uses the random seed " + seed);
srand = new Random(seed);
}
@AfterClass
public static void tearDownClass() {
}
@Before
public void setUp() {
}
@After
public void tearDown() {
}
/**
* Test of comparator method, of class TreeSet.
*/
@Test
public void testComparator() {
Comparator<String> comparator = new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return o1.compareTo(o2);
}
};
assertEquals(null, new FoldedTreeSet<>().comparator());
assertEquals(comparator, new FoldedTreeSet<>(comparator).comparator());
FoldedTreeSet<String> set = new FoldedTreeSet<>(comparator);
set.add("apa3");
set.add("apa2");
set.add("apa");
set.add("apa5");
set.add("apa4");
assertEquals(5, set.size());
assertEquals("apa", set.get("apa"));
}
/**
* Test of first method, of class TreeSet.
*/
@Test
public void testFirst() {
FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
for (int i = 0; i < 256; i++) {
tree.add(1024 + i);
assertEquals(1024, tree.first().intValue());
}
for (int i = 1; i < 256; i++) {
tree.remove(1024 + i);
assertEquals(1024, tree.first().intValue());
}
}
/**
* Test of last method, of class TreeSet.
*/
@Test
public void testLast() {
FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
for (int i = 0; i < 256; i++) {
tree.add(1024 + i);
assertEquals(1024 + i, tree.last().intValue());
}
for (int i = 0; i < 255; i++) {
tree.remove(1024 + i);
assertEquals(1279, tree.last().intValue());
}
}
/**
* Test of size method, of class TreeSet.
*/
@Test
public void testSize() {
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
String entry = "apa";
assertEquals(0, instance.size());
instance.add(entry);
assertEquals(1, instance.size());
instance.remove(entry);
assertEquals(0, instance.size());
}
/**
* Test of isEmpty method, of class TreeSet.
*/
@Test
public void testIsEmpty() {
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
boolean expResult = true;
boolean result = instance.isEmpty();
assertEquals(expResult, result);
instance.add("apa");
instance.remove("apa");
assertEquals(expResult, result);
}
/**
* Test of contains method, of class TreeSet.
*/
@Test
public void testContains() {
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
String entry = "apa";
assertEquals(false, instance.contains(entry));
instance.add(entry);
assertEquals(true, instance.contains(entry));
assertEquals(false, instance.contains(entry + entry));
}
/**
* Test of iterator method, of class TreeSet.
*/
@Test
public void testIterator() {
for (int iter = 0; iter < 10; iter++) {
FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
long[] longs = new long[64723];
for (int i = 0; i < longs.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
}
longs[i] = val.getId();
set.add(val);
}
assertEquals(longs.length, set.size());
Arrays.sort(longs);
Iterator<Holder> it = set.iterator();
for (int i = 0; i < longs.length; i++) {
assertTrue(it.hasNext());
Holder val = it.next();
assertEquals(longs[i], val.getId());
// remove randomly to force non linear removes
if (srand.nextBoolean()) {
it.remove();
}
}
}
}
/**
* Test of toArray method, of class TreeSet.
*/
@Test
public void testToArray() {
FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
ArrayList<Integer> list = new ArrayList<>(256);
for (int i = 0; i < 256; i++) {
list.add(1024 + i);
}
tree.addAll(list);
assertArrayEquals(list.toArray(), tree.toArray());
}
/**
* Test of toArray method, of class TreeSet.
*/
@Test
public void testToArray_GenericType() {
FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
ArrayList<Integer> list = new ArrayList<>(256);
for (int i = 0; i < 256; i++) {
list.add(1024 + i);
}
tree.addAll(list);
assertArrayEquals(list.toArray(new Integer[tree.size()]), tree.toArray(new Integer[tree.size()]));
assertArrayEquals(list.toArray(new Integer[tree.size() + 100]), tree.toArray(new Integer[tree.size() + 100]));
}
/**
* Test of add method, of class TreeSet.
*/
@Test
public void testAdd() {
FoldedTreeSet<String> simpleSet = new FoldedTreeSet<>();
String entry = "apa";
assertTrue(simpleSet.add(entry));
assertFalse(simpleSet.add(entry));
FoldedTreeSet<Integer> intSet = new FoldedTreeSet<>();
for (int i = 512; i < 1024; i++) {
assertTrue(intSet.add(i));
}
for (int i = -1024; i < -512; i++) {
assertTrue(intSet.add(i));
}
for (int i = 0; i < 512; i++) {
assertTrue(intSet.add(i));
}
for (int i = -512; i < 0; i++) {
assertTrue(intSet.add(i));
}
assertEquals(2048, intSet.size());
FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
long[] longs = new long[23432];
for (int i = 0; i < longs.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
}
longs[i] = val.getId();
assertTrue(set.add(val));
}
assertEquals(longs.length, set.size());
Arrays.sort(longs);
Iterator<Holder> it = set.iterator();
for (int i = 0; i < longs.length; i++) {
assertTrue(it.hasNext());
Holder val = it.next();
assertEquals(longs[i], val.getId());
}
// Specially constructed adds to exercise all code paths
FoldedTreeSet<Integer> specialAdds = new FoldedTreeSet<>();
// Fill node with even numbers
for (int i = 0; i < 128; i += 2) {
assertTrue(specialAdds.add(i));
}
// Remove left and add left
assertTrue(specialAdds.remove(0));
assertTrue(specialAdds.add(-1));
assertTrue(specialAdds.remove(-1));
// Add right and shift everything left
assertTrue(specialAdds.add(127));
assertTrue(specialAdds.remove(127));
// Empty at both ends
assertTrue(specialAdds.add(0));
assertTrue(specialAdds.remove(0));
assertTrue(specialAdds.remove(126));
// Add in the middle left to slide entries left
assertTrue(specialAdds.add(11));
assertTrue(specialAdds.remove(11));
// Add in the middle right to slide entries right
assertTrue(specialAdds.add(99));
assertTrue(specialAdds.remove(99));
// Add existing entry in the middle of a node
assertFalse(specialAdds.add(64));
}
@Test
public void testAddOrReplace() {
FoldedTreeSet<String> simpleSet = new FoldedTreeSet<>();
String entry = "apa";
assertNull(simpleSet.addOrReplace(entry));
assertEquals(entry, simpleSet.addOrReplace(entry));
FoldedTreeSet<Integer> intSet = new FoldedTreeSet<>();
for (int i = 0; i < 1024; i++) {
assertNull(intSet.addOrReplace(i));
}
for (int i = 0; i < 1024; i++) {
assertEquals(i, intSet.addOrReplace(i).intValue());
}
}
private static class Holder implements Comparable<Holder> {
private final long id;
public Holder(long id) {
this.id = id;
}
public long getId() {
return id;
}
@Override
public int compareTo(Holder o) {
return id < o.getId() ? -1
: id > o.getId() ? 1 : 0;
}
}
@Test
public void testRemoveWithComparator() {
FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
long[] longs = new long[98327];
for (int i = 0; i < longs.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
}
longs[i] = val.getId();
set.add(val);
}
assertEquals(longs.length, set.size());
Comparator<Object> cmp = new Comparator<Object>() {
@Override
public int compare(Object o1, Object o2) {
long lookup = (long) o1;
long stored = ((Holder) o2).getId();
return lookup < stored ? -1
: lookup > stored ? 1 : 0;
}
};
for (long val : longs) {
set.remove(val, cmp);
}
assertEquals(0, set.size());
assertTrue(set.isEmpty());
}
@Test
public void testGetWithComparator() {
FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
long[] longs = new long[32147];
for (int i = 0; i < longs.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
}
longs[i] = val.getId();
set.add(val);
}
assertEquals(longs.length, set.size());
Comparator<Object> cmp = new Comparator<Object>() {
@Override
public int compare(Object o1, Object o2) {
long lookup = (long) o1;
long stored = ((Holder) o2).getId();
return lookup < stored ? -1
: lookup > stored ? 1 : 0;
}
};
for (long val : longs) {
assertEquals(val, set.get(val, cmp).getId());
}
}
@Test
public void testGet() {
FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
long[] longs = new long[43277];
for (int i = 0; i < longs.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
}
longs[i] = val.getId();
set.add(val);
}
assertEquals(longs.length, set.size());
for (long val : longs) {
assertEquals(val, set.get(new Holder(val)).getId());
}
}
/**
* Test of remove method, of class TreeSet.
*/
@Test
public void testRemove() {
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
assertEquals(false, instance.remove("apa"));
instance.add("apa");
assertEquals(true, instance.remove("apa"));
removeLeft();
removeRight();
removeAt();
removeRandom();
}
public void removeLeft() {
FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
for (int i = 1; i <= 320; i++) {
set.add(i);
}
for (int i = 193; i < 225; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
}
for (int i = 129; i < 161; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
}
for (int i = 256; i > 224; i--) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
}
for (int i = 257; i < 289; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
}
while (!set.isEmpty()) {
assertTrue(set.remove(set.first()));
}
}
public void removeRight() {
FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
for (int i = 1; i <= 320; i++) {
set.add(i);
}
for (int i = 193; i < 225; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
}
for (int i = 192; i > 160; i--) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
}
for (int i = 256; i > 224; i--) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
}
for (int i = 320; i > 288; i--) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
}
while (!set.isEmpty()) {
assertTrue(set.remove(set.last()));
}
}
public void removeAt() {
FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
for (int i = 1; i <= 320; i++) {
set.add(i);
}
for (int i = 193; i < 225; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
}
for (int i = 160; i < 192; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
}
for (int i = 225; i < 257; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
}
for (int i = 288; i < 320; i++) {
assertEquals(true, set.remove(i));
assertEquals(false, set.remove(i));
}
}
public void removeRandom() {
FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
int[] integers = new int[2048];
for (int i = 0; i < 2048; i++) {
int val = srand.nextInt();
while (set.contains(val)) {
val = srand.nextInt();
}
integers[i] = val;
set.add(val);
}
assertEquals(2048, set.size());
for (int val : integers) {
assertEquals(true, set.remove(val));
assertEquals(false, set.remove(val));
}
assertEquals(true, set.isEmpty());
}
/**
* Test of containsAll method, of class TreeSet.
*/
@Test
public void testContainsAll() {
Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
assertEquals(false, instance.containsAll(list));
instance.addAll(list);
assertEquals(true, instance.containsAll(list));
}
/**
* Test of addAll method, of class TreeSet.
*/
@Test
public void testAddAll() {
Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
assertEquals(true, instance.addAll(list));
assertEquals(false, instance.addAll(list)); // add same entries again
}
/**
* Test of retainAll method, of class TreeSet.
*/
@Test
public void testRetainAll() {
Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
instance.addAll(list);
assertEquals(false, instance.retainAll(list));
assertEquals(2, instance.size());
Collection<String> list2 = Arrays.asList(new String[]{"apa"});
assertEquals(true, instance.retainAll(list2));
assertEquals(1, instance.size());
}
/**
* Test of removeAll method, of class TreeSet.
*/
@Test
public void testRemoveAll() {
Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
assertEquals(false, instance.removeAll(list));
instance.addAll(list);
assertEquals(true, instance.removeAll(list));
assertEquals(true, instance.isEmpty());
}
/**
* Test of clear method, of class TreeSet.
*/
@Test
public void testClear() {
FoldedTreeSet<String> instance = new FoldedTreeSet<>();
instance.clear();
assertEquals(true, instance.isEmpty());
instance.add("apa");
assertEquals(false, instance.isEmpty());
instance.clear();
assertEquals(true, instance.isEmpty());
}
@Test
public void testFillRatio() {
FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
final int size = 1024;
for (int i = 1; i <= size; i++) {
set.add(i);
assertEquals("Iteration: " + i, 1.0, set.fillRatio(), 0.0);
}
for (int i = 1; i <= size / 2; i++) {
set.remove(i * 2);
// Need the max since all the removes from the last node doesn't
// affect the fill ratio
assertEquals("Iteration: " + i,
Math.max((size - i) / (double) size, 0.53125),
set.fillRatio(), 0.0);
}
}
@Test
public void testCompact() {
FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
long[] longs = new long[24553];
for (int i = 0; i < longs.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
}
longs[i] = val.getId();
set.add(val);
}
assertEquals(longs.length, set.size());
long[] longs2 = new long[longs.length];
for (int i = 0; i < longs2.length; i++) {
Holder val = new Holder(srand.nextLong());
while (set.contains(val)) {
val = new Holder(srand.nextLong());
}
longs2[i] = val.getId();
set.add(val);
}
assertEquals(longs.length + longs2.length, set.size());
// Create fragementation
for (long val : longs) {
assertTrue(set.remove(new Holder(val)));
}
assertEquals(longs2.length, set.size());
assertFalse(set.compact(0));
assertTrue(set.compact(Long.MAX_VALUE));
assertEquals(longs2.length, set.size());
for (long val : longs) {
assertFalse(set.remove(new Holder(val)));
}
for (long val : longs2) {
assertEquals(val, set.get(new Holder(val)).getId());
}
}
}