diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java index 250bdcd5ec..928424ba20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java @@ -86,4 +86,9 @@ public void convertLastBlockToUC(BlockInfo lastBlock, * @return whether the block collection is under construction. */ public boolean isUnderConstruction(); + + /** + * @return whether the block collection is in striping format + */ + public boolean isStriped(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7d4139c737..bbead63f31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; @@ -534,9 +535,9 @@ private void dumpBlockMeta(Block block, PrintWriter out) { NumberReplicas numReplicas = new NumberReplicas(); // source node returned is not used - chooseSourceDatanode(block, containingNodes, + chooseSourceDatanodes(getStoredBlock(block), containingNodes, containingLiveReplicasNodes, numReplicas, - UnderReplicatedBlocks.LEVEL); + new LinkedList(), 1, UnderReplicatedBlocks.LEVEL); // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are // not included in the numReplicas.liveReplicas() count @@ -1337,15 +1338,15 @@ int computeInvalidateWork(int nodesToProcess) { } /** - * Scan blocks in {@link #neededReplications} and assign replication - * work to data-nodes they belong to. + * Scan blocks in {@link #neededReplications} and assign recovery + * (replication or erasure coding) work to data-nodes they belong to. * * The number of process blocks equals either twice the number of live * data-nodes or the number of under-replicated blocks whichever is less. * * @return number of blocks scheduled for replication during this iteration. */ - int computeReplicationWork(int blocksToProcess) { + int computeBlockRecoveryWork(int blocksToProcess) { List> blocksToReplicate = null; namesystem.writeLock(); try { @@ -1355,30 +1356,32 @@ int computeReplicationWork(int blocksToProcess) { } finally { namesystem.writeUnlock(); } - return computeReplicationWorkForBlocks(blocksToReplicate); + return computeRecoveryWorkForBlocks(blocksToReplicate); } - /** Replicate a set of blocks + /** + * Recover a set of blocks to full strength through replication or + * erasure coding * - * @param blocksToReplicate blocks to be replicated, for each priority + * @param blocksToRecover blocks to be recovered, for each priority * @return the number of blocks scheduled for replication */ @VisibleForTesting - int computeReplicationWorkForBlocks(List> blocksToReplicate) { + int computeRecoveryWorkForBlocks(List> blocksToRecover) { int requiredReplication, numEffectiveReplicas; List containingNodes; - DatanodeDescriptor srcNode; BlockCollection bc = null; int additionalReplRequired; int scheduledWork = 0; - List work = new LinkedList(); + List recovWork = new LinkedList<>(); + // Step 1: categorize at-risk blocks into replication and EC tasks namesystem.writeLock(); try { synchronized (neededReplications) { - for (int priority = 0; priority < blocksToReplicate.size(); priority++) { - for (BlockInfo block : blocksToReplicate.get(priority)) { + for (int priority = 0; priority < blocksToRecover.size(); priority++) { + for (BlockInfo block : blocksToRecover.get(priority)) { // block should belong to a file bc = blocksMap.getBlockCollection(block); // abandoned block or block reopened for append @@ -1392,25 +1395,31 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { requiredReplication = bc.getPreferredBlockReplication(); // get a source data-node - containingNodes = new ArrayList(); - List liveReplicaNodes = new ArrayList(); + containingNodes = new ArrayList<>(); + List liveReplicaNodes = new ArrayList<>(); NumberReplicas numReplicas = new NumberReplicas(); - srcNode = chooseSourceDatanode( + List missingBlockIndices = new LinkedList<>(); + DatanodeDescriptor[] srcNodes; + int numSourceNodes = bc.isStriped() ? + HdfsConstants.NUM_DATA_BLOCKS : 1; + srcNodes = chooseSourceDatanodes( block, containingNodes, liveReplicaNodes, numReplicas, - priority); - if(srcNode == null) { // block can not be replicated from any node - LOG.debug("Block " + block + " cannot be repl from any node"); + missingBlockIndices, numSourceNodes, priority); + if(srcNodes == null || srcNodes.length == 0) { + // block can not be replicated from any node + LOG.debug("Block " + block + " cannot be recovered " + + "from any node"); continue; } - // liveReplicaNodes can include READ_ONLY_SHARED replicas which are + // liveReplicaNodes can include READ_ONLY_SHARED replicas which are // not included in the numReplicas.liveReplicas() count assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); // do not schedule more if enough replicas is already pending numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplications.getNumReplicas(block); - + if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || (blockHasEnoughRacks(block)) ) { @@ -1427,9 +1436,21 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { } else { additionalReplRequired = 1; // Needed on a new rack } - work.add(new ReplicationWork(block, bc, srcNode, - containingNodes, liveReplicaNodes, additionalReplRequired, - priority)); + if (bc.isStriped()) { + ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes, + containingNodes, liveReplicaNodes, additionalReplRequired, + priority); + short[] missingBlockArray = new short[missingBlockIndices.size()]; + for (int i = 0 ; i < missingBlockIndices.size(); i++) { + missingBlockArray[i] = missingBlockIndices.get(i); + } + ecw.setMissingBlockIndices(missingBlockArray); + recovWork.add(ecw); + } else { + recovWork.add(new ReplicationWork(block, bc, srcNodes, + containingNodes, liveReplicaNodes, additionalReplRequired, + priority)); + } } } } @@ -1437,8 +1458,9 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { namesystem.writeUnlock(); } + // Step 2: choose target nodes for each recovery task final Set excludedNodes = new HashSet(); - for(ReplicationWork rw : work){ + for(BlockRecoveryWork rw : recovWork){ // Exclude all of the containing nodes from being targets. // This list includes decommissioning or corrupt nodes. excludedNodes.clear(); @@ -1452,9 +1474,10 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes); } + // Step 3: add tasks to the DN namesystem.writeLock(); try { - for(ReplicationWork rw : work){ + for(BlockRecoveryWork rw : recovWork){ final DatanodeStorageInfo[] targets = rw.targets; if(targets == null || targets.length == 0){ rw.targets = null; @@ -1493,7 +1516,7 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { if ( (numReplicas.liveReplicas() >= requiredReplication) && (!blockHasEnoughRacks(block)) ) { - if (rw.srcNode.getNetworkLocation().equals( + if (rw.srcNodes[0].getNetworkLocation().equals( targets[0].getDatanodeDescriptor().getNetworkLocation())) { //No use continuing, unless a new rack in this case continue; @@ -1501,7 +1524,17 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { } // Add block to the to be replicated list - rw.srcNode.addBlockToBeReplicated(block, targets); + if (bc.isStriped()) { + assert rw instanceof ErasureCodingWork; + assert rw.targets.length > 0; + rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( + new ExtendedBlock(namesystem.getBlockPoolId(), block), + rw.srcNodes, rw.targets, + ((ErasureCodingWork)rw).getMissingBlockIndicies()); + } + else { + rw.srcNodes[0].addBlockToBeReplicated(block, targets); + } scheduledWork++; DatanodeStorageInfo.incrementBlocksScheduled(targets); @@ -1525,7 +1558,7 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { if (blockLog.isInfoEnabled()) { // log which blocks have been scheduled for replication - for(ReplicationWork rw : work){ + for(BlockRecoveryWork rw : recovWork){ DatanodeStorageInfo[] targets = rw.targets; if (targets != null && targets.length != 0) { StringBuilder targetList = new StringBuilder("datanode(s)"); @@ -1533,7 +1566,7 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { targetList.append(' '); targetList.append(targets[k].getDatanodeDescriptor()); } - blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNode, + blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNodes, rw.block, targetList); } } @@ -1619,55 +1652,66 @@ List getDatanodeDescriptors(List nodes) { } /** - * Parse the data-nodes the block belongs to and choose one, - * which will be the replication source. + * Parse the data-nodes the block belongs to and choose a certain number + * from them to be the recovery sources. * * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes * since the former do not have write traffic and hence are less busy. * We do not use already decommissioned nodes as a source. - * Otherwise we choose a random node among those that did not reach their - * replication limits. However, if the replication is of the highest priority - * and all nodes have reached their replication limits, we will choose a - * random node despite the replication limit. + * Otherwise we randomly choose nodes among those that did not reach their + * replication limits. However, if the recovery work is of the highest + * priority and all nodes have reached their replication limits, we will + * randomly choose the desired number of nodes despite the replication limit. * * In addition form a list of all nodes containing the block * and calculate its replication numbers. * * @param block Block for which a replication source is needed - * @param containingNodes List to be populated with nodes found to contain the - * given block - * @param nodesContainingLiveReplicas List to be populated with nodes found to - * contain live replicas of the given block - * @param numReplicas NumberReplicas instance to be initialized with the - * counts of live, corrupt, excess, and - * decommissioned replicas of the given - * block. + * @param containingNodes List to be populated with nodes found to contain + * the given block + * @param nodesContainingLiveReplicas List to be populated with nodes found + * to contain live replicas of the given + * block + * @param numReplicas NumberReplicas instance to be initialized with the + * counts of live, corrupt, excess, and decommissioned + * replicas of the given block. + * @param missingBlockIndices List to be populated with indices of missing + * blocks in a striped block group or missing + * replicas of a replicated block + * @param numSourceNodes integer specifying the number of source nodes to + * choose * @param priority integer representing replication priority of the given * block - * @return the DatanodeDescriptor of the chosen node from which to replicate - * the given block + * @return the array of DatanodeDescriptor of the chosen nodes from which to + * recover the given block */ - @VisibleForTesting - DatanodeDescriptor chooseSourceDatanode(Block block, - List containingNodes, - List nodesContainingLiveReplicas, - NumberReplicas numReplicas, - int priority) { + @VisibleForTesting + DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, + List containingNodes, + List nodesContainingLiveReplicas, + NumberReplicas numReplicas, + List missingBlockIndices, int numSourceNodes, int priority) { containingNodes.clear(); nodesContainingLiveReplicas.clear(); - DatanodeDescriptor srcNode = null; + LinkedList srcNodes = new LinkedList<>(); int live = 0; int decommissioned = 0; int decommissioning = 0; int corrupt = 0; int excess = 0; - + missingBlockIndices.clear(); + Set healthyIndices = new HashSet<>(); + Collection nodesCorrupt = corruptReplicas.getNodes(block); for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + if (block.isStriped()) { + healthyIndices.add((short) ((BlockInfoStriped) block). + getStorageBlockIndex(storage)); + } final DatanodeDescriptor node = storage.getDatanodeDescriptor(); LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node.getDatanodeUuid()); - int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; + int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) corrupt += countableReplica; else if (node.isDecommissionInProgress()) { @@ -1703,20 +1747,32 @@ else if (node.isDecommissionInProgress()) { continue; // We got this far, current node is a reasonable choice - if (srcNode == null) { - srcNode = node; + if(srcNodes.size() < numSourceNodes) { + srcNodes.add(node); continue; } // switch to a different node randomly // this to prevent from deterministically selecting the same node even // if the node failed to replicate the block on previous iterations - if(ThreadLocalRandom.current().nextBoolean()) - srcNode = node; + if(ThreadLocalRandom.current().nextBoolean()) { + int pos = ThreadLocalRandom.current().nextInt(numSourceNodes); + if(!srcNodes.get(pos).isDecommissionInProgress()) { + srcNodes.set(pos, node); + } + } + } + if (block.isStriped()) { + for (short i = 0; i < HdfsConstants.NUM_DATA_BLOCKS + + HdfsConstants.NUM_PARITY_BLOCKS; i++) { + if (!healthyIndices.contains(i)) { + missingBlockIndices.add(i); + } + } } if(numReplicas != null) numReplicas.initialize(live, decommissioned, decommissioning, corrupt, excess, 0); - return srcNode; + return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]); } /** @@ -1751,7 +1807,7 @@ private void processPendingReplications() { */ } } - + /** * StatefulBlockInfo is used to build the "toUC" list, which is a list of * updates to the information about under-construction blocks. @@ -3716,7 +3772,7 @@ public int numOfUnderReplicatedBlocks() { } /** - * Periodically calls computeReplicationWork(). + * Periodically calls computeBlockRecoveryWork(). */ private class ReplicationMonitor implements Runnable { @@ -3774,7 +3830,7 @@ int computeDatanodeWork() { final int nodesToProcess = (int) Math.ceil(numlive * this.blocksInvalidateWorkPct); - int workFound = this.computeReplicationWork(blocksToProcess); + int workFound = this.computeBlockRecoveryWork(blocksToProcess); // Update counters namesystem.writeLock(); @@ -3814,47 +3870,117 @@ public static LocatedBlock newLocatedBlock( null); } - private static class ReplicationWork { + /** + * This class is used internally by {@link this#computeRecoveryWorkForBlocks} + * to represent a task to recover a block through replication or erasure + * coding. Recovery is done by transferring data from {@link srcNodes} to + * {@link targets} + */ + private static class BlockRecoveryWork { + protected final BlockInfo block; + protected final BlockCollection bc; - private final BlockInfo block; - private final BlockCollection bc; + /** + * An erasure coding recovery task has multiple source nodes. + * A replication task only has 1 source node, stored on top of the array + */ + protected final DatanodeDescriptor[] srcNodes; + /** Nodes containing the block; avoid them in choosing new targets */ + protected final List containingNodes; + /** Required by {@link BlockPlacementPolicy#chooseTarget} */ + protected final List liveReplicaStorages; + protected final int additionalReplRequired; - private final DatanodeDescriptor srcNode; - private final List containingNodes; - private final List liveReplicaStorages; - private final int additionalReplRequired; + protected DatanodeStorageInfo[] targets; + protected final int priority; - private DatanodeStorageInfo targets[]; - private final int priority; - - public ReplicationWork(BlockInfo block, + public BlockRecoveryWork(BlockInfo block, BlockCollection bc, - DatanodeDescriptor srcNode, + DatanodeDescriptor[] srcNodes, List containingNodes, List liveReplicaStorages, int additionalReplRequired, int priority) { this.block = block; this.bc = bc; - this.srcNode = srcNode; - this.srcNode.incrementPendingReplicationWithoutTargets(); + this.srcNodes = srcNodes; this.containingNodes = containingNodes; this.liveReplicaStorages = liveReplicaStorages; this.additionalReplRequired = additionalReplRequired; this.priority = priority; this.targets = null; } - - private void chooseTargets(BlockPlacementPolicy blockplacement, + + protected void chooseTargets(BlockPlacementPolicy blockplacement, BlockStoragePolicySuite storagePolicySuite, Set excludedNodes) { + } + } + + private static class ReplicationWork extends BlockRecoveryWork { + + public ReplicationWork(BlockInfo block, + BlockCollection bc, + DatanodeDescriptor[] srcNodes, + List containingNodes, + List liveReplicaStorages, + int additionalReplRequired, + int priority) { + super(block, bc, srcNodes, containingNodes, + liveReplicaStorages, additionalReplRequired, priority); + LOG.debug("Creating a ReplicationWork to recover " + block); + } + + protected void chooseTargets(BlockPlacementPolicy blockplacement, + BlockStoragePolicySuite storagePolicySuite, + Set excludedNodes) { + assert srcNodes.length > 0 + : "At least 1 source node should have been selected"; try { targets = blockplacement.chooseTarget(bc.getName(), - additionalReplRequired, srcNode, liveReplicaStorages, false, + additionalReplRequired, srcNodes[0], liveReplicaStorages, false, + excludedNodes, block.getNumBytes(), + storagePolicySuite.getPolicy(bc.getStoragePolicyID())); + } finally { + srcNodes[0].decrementPendingReplicationWithoutTargets(); + } + } + } + + private static class ErasureCodingWork extends BlockRecoveryWork { + + private short[] missingBlockIndicies = null; + + public ErasureCodingWork(BlockInfo block, + BlockCollection bc, + DatanodeDescriptor[] srcNodes, + List containingNodes, + List liveReplicaStorages, + int additionalReplRequired, + int priority) { + super(block, bc, srcNodes, containingNodes, + liveReplicaStorages, additionalReplRequired, priority); + LOG.debug("Creating an ErasureCodingWork to recover " + block); + } + + public short[] getMissingBlockIndicies() { + return missingBlockIndicies; + } + + public void setMissingBlockIndices(short[] missingBlockIndicies) { + this.missingBlockIndicies = missingBlockIndicies; + } + + protected void chooseTargets(BlockPlacementPolicy blockplacement, + BlockStoragePolicySuite storagePolicySuite, + Set excludedNodes) { + try { + // TODO: new placement policy for EC considering multiple writers + targets = blockplacement.chooseTarget(bc.getName(), + additionalReplRequired, srcNodes[0], liveReplicaStorages, false, excludedNodes, block.getNumBytes(), storagePolicySuite.getPolicy(bc.getStoragePolicyID())); } finally { - srcNode.decrementPendingReplicationWithoutTargets(); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index b7a3489b08..7bc5e7e48a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.Arrays; import com.google.common.annotations.VisibleForTesting; @@ -41,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -97,6 +99,33 @@ public static class BlockTargetPair { } } + /** Block and targets pair */ + @InterfaceAudience.Private + @InterfaceStability.Evolving + public static class BlockECRecoveryInfo { + public final ExtendedBlock block; + public final DatanodeDescriptor[] sources; + public final DatanodeStorageInfo[] targets; + public final short[] missingBlockIndices; + + BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources, + DatanodeStorageInfo[] targets, short[] missingBlockIndices) { + this.block = block; + this.sources = sources; + this.targets = targets; + this.missingBlockIndices = missingBlockIndices; + } + + @Override + public String toString() { + return new StringBuilder().append("BlockECRecoveryInfo(\n "). + append("Recovering ").append(block). + append(" From: ").append(Arrays.asList(sources)). + append(" To: ").append(Arrays.asList(targets)).append(")\n"). + toString(); + } + } + /** A BlockTargetPair queue. */ private static class BlockQueue { private final Queue blockq = new LinkedList(); @@ -217,12 +246,17 @@ public CachedBlocksList getPendingUncached() { private long bandwidth; /** A queue of blocks to be replicated by this datanode */ - private final BlockQueue replicateBlocks = new BlockQueue(); + private final BlockQueue replicateBlocks = + new BlockQueue<>(); + /** A queue of blocks to be erasure coded by this datanode */ + private final BlockQueue erasurecodeBlocks = + new BlockQueue<>(); /** A queue of blocks to be recovered by this datanode */ - private final BlockQueue recoverBlocks = - new BlockQueue(); + private final BlockQueue + recoverBlocks = new BlockQueue<>(); /** A set of blocks to be invalidated by this datanode */ - private final LightWeightHashSet invalidateBlocks = new LightWeightHashSet(); + private final LightWeightHashSet invalidateBlocks = + new LightWeightHashSet<>(); /* Variables for maintaining number of blocks scheduled to be written to * this storage. This count is approximate and might be slightly bigger @@ -375,6 +409,7 @@ public void clearBlockQueues() { this.invalidateBlocks.clear(); this.recoverBlocks.clear(); this.replicateBlocks.clear(); + this.erasurecodeBlocks.clear(); } // pendingCached, cached, and pendingUncached are protected by the // FSN lock. @@ -596,6 +631,20 @@ void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) { replicateBlocks.offer(new BlockTargetPair(block, targets)); } + /** + * Store block erasure coding work. + */ + void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources, + DatanodeStorageInfo[] targets, short[] missingBlockIndicies) { + assert(block != null && sources != null && sources.length > 0); + BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets, + missingBlockIndicies); + erasurecodeBlocks.offer(task); + BlockManager.LOG.debug("Adding block recovery task " + task + + "to " + getName() + ", current queue size is " + + erasurecodeBlocks.size()); + } + /** * Store block recovery work. */ @@ -627,6 +676,13 @@ int getNumberOfBlocksToBeReplicated() { return PendingReplicationWithoutTargets + replicateBlocks.size(); } + /** + * The number of work items that are pending to be replicated + */ + int getNumberOfBlocksToBeErasureCoded() { + return erasurecodeBlocks.size(); + } + /** * The number of block invalidation items that are pending to * be sent to the datanode @@ -641,6 +697,10 @@ public List getReplicationCommand(int maxTransfers) { return replicateBlocks.poll(maxTransfers); } + public List getErasureCodeCommand(int maxTransfers) { + return erasurecodeBlocks.poll(maxTransfers); + } + public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) { List blocks = recoverBlocks.poll(maxTransfers); if(blocks == null) @@ -841,6 +901,10 @@ public String dumpDatanode() { if (repl > 0) { sb.append(" ").append(repl).append(" blocks to be replicated;"); } + int ec = erasurecodeBlocks.size(); + if(ec > 0) { + sb.append(" ").append(ec).append(" blocks to be erasure coded;"); + } int inval = invalidateBlocks.size(); if (inval > 0) { sb.append(" ").append(inval).append(" blocks to be invalidated;"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 01f7972f66..c63e657df6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -1349,7 +1350,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, VolumeFailureSummary volumeFailureSummary) throws IOException { synchronized (heartbeatManager) { synchronized (datanodeMap) { - DatanodeDescriptor nodeinfo = null; + DatanodeDescriptor nodeinfo; try { nodeinfo = getDatanode(nodeReg); } catch(UnregisteredNodeException e) { @@ -1387,10 +1388,10 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations(); // Skip stale nodes during recovery - not heart beated for some time (30s by default). final List recoveryLocations = - new ArrayList(storages.length); - for (int i = 0; i < storages.length; i++) { - if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) { - recoveryLocations.add(storages[i]); + new ArrayList<>(storages.length); + for (DatanodeStorageInfo storage : storages) { + if (!storage.getDatanodeDescriptor().isStale(staleInterval)) { + recoveryLocations.add(storage); } } // If we are performing a truncate recovery than set recovery fields @@ -1429,7 +1430,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, return new DatanodeCommand[] { brCommand }; } - final List cmds = new ArrayList(); + final List cmds = new ArrayList<>(); //check pending replication List pendingList = nodeinfo.getReplicationCommand( maxTransfers); @@ -1437,6 +1438,13 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList)); } + // checking pending erasure coding tasks + List pendingECList = + nodeinfo.getErasureCodeCommand(maxTransfers); + if (pendingECList != null) { + cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_CODEC, + pendingECList)); + } //check block invalidation Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); if (blks != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 88ba2cd90e..9489f8645d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -419,7 +419,8 @@ public short getPreferredBlockReplication() { } max = maxInSnapshot > max ? maxInSnapshot : max; } - return max; + return isStriped()? + HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : max; } /** Set the replication factor of this file. */ @@ -1107,11 +1108,12 @@ boolean isBlockInLatestSnapshot(BlockInfoContiguous block) { Arrays.asList(snapshotBlocks).contains(block); } - @VisibleForTesting /** * @return true if the file is in the striping layout. */ - // TODO: move erasure coding policy to file XAttr (HDFS-7337) + @VisibleForTesting + @Override + // TODO: move erasure coding policy to file XAttr public boolean isStriped() { return getStoragePolicyID() == HdfsConstants.EC_STORAGE_POLICY_ID; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java new file mode 100644 index 0000000000..f7f02fdea4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import com.google.common.base.Joiner; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo; + +import java.util.Collection; + +/** + * A BlockECRecoveryCommand is an instruction to a DataNode to reconstruct a + * striped block group with missing blocks. + * + * Upon receiving this command, the DataNode pulls data from other DataNodes + * hosting blocks in this group and reconstructs the lost blocks through codec + * calculation. + * + * After the reconstruction, the DataNode pushes the reconstructed blocks to + * their final destinations if necessary (e.g., the destination is different + * from the reconstruction node, or multiple blocks in a group are to be + * reconstructed). + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockECRecoveryCommand extends DatanodeCommand { + final Collection ecTasks; + + /** + * Create BlockECRecoveryCommand from a collection of + * {@link BlockECRecoveryInfo}, each representing a recovery task + */ + public BlockECRecoveryCommand(int action, + Collection blockECRecoveryInfoList) { + super(action); + this.ecTasks = blockECRecoveryInfoList; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("BlockECRecoveryCommand(\n "); + Joiner.on("\n ").appendTo(sb, ecTasks); + sb.append("\n)"); + return sb.toString(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index a3b6004644..b8ac165db5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -76,6 +76,7 @@ public interface DatanodeProtocol { final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth final static int DNA_CACHE = 9; // cache blocks final static int DNA_UNCACHE = 10; // uncache blocks + final static int DNA_CODEC = 11; // uncache blocks /** * Register Datanode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 148135bae9..e25ee31643 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -161,7 +161,7 @@ public static int computeInvalidationWork(BlockManager bm) { */ public static int computeAllPendingWork(BlockManager bm) { int work = computeInvalidationWork(bm); - work += bm.computeReplicationWork(Integer.MAX_VALUE); + work += bm.computeBlockRecoveryWork(Integer.MAX_VALUE); return work; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 080f693628..a33a4e366f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -453,8 +453,8 @@ private DatanodeStorageInfo[] scheduleSingleReplication(BlockInfo block) { assertEquals("Block not initially pending replication", 0, bm.pendingReplications.getNumReplicas(block)); assertEquals( - "computeReplicationWork should indicate replication is needed", 1, - bm.computeReplicationWorkForBlocks(list_all)); + "computeBlockRecoveryWork should indicate replication is needed", 1, + bm.computeRecoveryWorkForBlocks(list_all)); assertTrue("replication is pending after work is computed", bm.pendingReplications.getNumReplicas(block) > 0); @@ -508,22 +508,22 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception { assertNotNull("Chooses source node for a highest-priority replication" + " even if all available source nodes have reached their replication" + " limits below the hard limit.", - bm.chooseSourceDatanode( - aBlock, + bm.chooseSourceDatanodes( + bm.getStoredBlock(aBlock), cntNodes, liveNodes, new NumberReplicas(), - UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)); + new LinkedList(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]); assertNull("Does not choose a source node for a less-than-highest-priority" + " replication since all available source nodes have reached" + " their replication limits.", - bm.chooseSourceDatanode( - aBlock, + bm.chooseSourceDatanodes( + bm.getStoredBlock(aBlock), cntNodes, liveNodes, new NumberReplicas(), - UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)); + new LinkedList(), 1, UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)[0]); // Increase the replication count to test replication count > hard limit DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] }; @@ -531,12 +531,12 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception { assertNull("Does not choose a source node for a highest-priority" + " replication when all available nodes exceed the hard limit.", - bm.chooseSourceDatanode( - aBlock, + bm.chooseSourceDatanodes( + bm.getStoredBlock(aBlock), cntNodes, liveNodes, new NumberReplicas(), - UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)); + new LinkedList(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java new file mode 100644 index 0000000000..d883c9b323 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.client.HdfsAdmin; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Iterator; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME; +import static org.junit.Assert.assertTrue; + +public class TestRecoverStripedBlocks { + private final short GROUP_SIZE = + HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS; + private final short NUM_OF_DATANODES = GROUP_SIZE + 1; + private Configuration conf; + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private static final int BLOCK_SIZE = 1024; + private HdfsAdmin dfsAdmin; + private FSNamesystem namesystem; + private Path ECFilePath; + + @Before + public void setupCluster() throws IOException { + conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + // Large value to make sure the pending replication request can stay in + // DatanodeDescriptor.replicateBlocks before test timeout. + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100); + // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via + // chooseUnderReplicatedBlocks at once. + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5); + + cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(NUM_OF_DATANODES).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + dfsAdmin = new HdfsAdmin(cluster.getURI(), conf); + namesystem = cluster.getNamesystem(); + ECFilePath = new Path("/ecfile"); + DFSTestUtil.createFile(fs, ECFilePath, 4 * BLOCK_SIZE, GROUP_SIZE, 0); + dfsAdmin.setStoragePolicy(ECFilePath, EC_STORAGE_POLICY_NAME); + } + + @Test + public void testMissingStripedBlock() throws Exception { + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, ECFilePath); + Iterator storageInfos = + bm.blocksMap.getStorages(b.getLocalBlock()) + .iterator(); + + DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor(); + Iterator it = firstDn.getBlockIterator(); + int missingBlkCnt = 0; + while (it.hasNext()) { + BlockInfo blk = it.next(); + BlockManager.LOG.debug("Block " + blk + " will be lost"); + missingBlkCnt++; + } + BlockManager.LOG.debug("Missing in total " + missingBlkCnt + " blocks"); + + bm.getDatanodeManager().removeDatanode(firstDn); + + bm.computeDatanodeWork(); + + short cnt = 0; + for (DataNode dn : cluster.getDataNodes()) { + DatanodeDescriptor dnDescriptor = + bm.getDatanodeManager().getDatanode(dn.getDatanodeUuid()); + cnt += dnDescriptor.getNumberOfBlocksToBeErasureCoded(); + } + + assertTrue("Counting the number of outstanding EC tasks", cnt == missingBlkCnt); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 9e2e1727d8..1916b3cd3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -1162,7 +1162,7 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication() when(mockNS.hasWriteLock()).thenReturn(true); BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; - + BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong()); BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());