diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto index d92dd4cb84..fd3618fe73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto @@ -108,6 +108,7 @@ message BlockECReconstructionInfoProto { required StorageTypesProto targetStorageTypes = 5; required bytes liveBlockIndices = 6; required ErasureCodingPolicyProto ecPolicy = 7; + optional bytes excludeReconstructedIndices = 8; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index ce27342729..b5f7b9c80f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -1049,11 +1049,17 @@ public static BlockECReconstructionInfo convertBlockECReconstructionInfo( byte[] liveBlkIndices = blockEcReconstructionInfoProto.getLiveBlockIndices() .toByteArray(); + byte[] excludeReconstructedIndices = + blockEcReconstructionInfoProto.hasExcludeReconstructedIndices() ? + blockEcReconstructionInfoProto.getExcludeReconstructedIndices() + .toByteArray() : new byte[0]; ErasureCodingPolicy ecPolicy = PBHelperClient.convertErasureCodingPolicy( blockEcReconstructionInfoProto.getEcPolicy()); - return new BlockECReconstructionInfo(block, sourceDnInfos, targetDnInfos, - targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy); + return new BlockECReconstructionInfo( + block, sourceDnInfos, targetDnInfos, + targetStorageUuids, convertStorageTypes, liveBlkIndices, + excludeReconstructedIndices, ecPolicy); } public static BlockECReconstructionInfoProto convertBlockECRecoveryInfo( @@ -1079,6 +1085,10 @@ public static BlockECReconstructionInfoProto convertBlockECRecoveryInfo( byte[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices(); builder.setLiveBlockIndices(PBHelperClient.getByteString(liveBlockIndices)); + byte[] excludeReconstructedIndices = blockEcRecoveryInfo.getExcludeReconstructedIndices(); + builder.setExcludeReconstructedIndices( + PBHelperClient.getByteString(excludeReconstructedIndices)); + builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy( blockEcRecoveryInfo.getErasureCodingPolicy())); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 4e5a3b435a..4d07910a68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -906,7 +906,7 @@ private void dumpBlockMeta(Block block, PrintWriter out) { // source node returned is not used chooseSourceDatanodes(blockInfo, containingNodes, containingLiveReplicasNodes, numReplicas, new ArrayList(), - new ArrayList(), LowRedundancyBlocks.LEVEL); + new ArrayList(), new ArrayList(), LowRedundancyBlocks.LEVEL); // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are // not included in the numReplicas.liveReplicas() count @@ -2112,9 +2112,10 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block, NumberReplicas numReplicas = new NumberReplicas(); List liveBlockIndices = new ArrayList<>(); List liveBusyBlockIndices = new ArrayList<>(); + List excludeReconstructed = new ArrayList<>(); final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block, containingNodes, liveReplicaNodes, numReplicas, - liveBlockIndices, liveBusyBlockIndices, priority); + liveBlockIndices, liveBusyBlockIndices, excludeReconstructed, priority); short requiredRedundancy = getExpectedLiveRedundancyNum(block, numReplicas); if(srcNodes == null || srcNodes.length == 0) { @@ -2182,9 +2183,13 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block, for (int i = 0; i < liveBusyBlockIndices.size(); i++) { busyIndices[i] = liveBusyBlockIndices.get(i); } + byte[] excludeReconstructedIndices = new byte[excludeReconstructed.size()]; + for (int i = 0; i < excludeReconstructed.size(); i++) { + excludeReconstructedIndices[i] = excludeReconstructed.get(i); + } return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes, containingNodes, liveReplicaNodes, additionalReplRequired, - priority, newIndices, busyIndices); + priority, newIndices, busyIndices, excludeReconstructedIndices); } else { return new ReplicationWork(block, bc, srcNodes, containingNodes, liveReplicaNodes, additionalReplRequired, @@ -2426,7 +2431,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, List containingNodes, List nodesContainingLiveReplicas, NumberReplicas numReplicas, List liveBlockIndices, - List liveBusyBlockIndices, int priority) { + List liveBusyBlockIndices, List excludeReconstructed, int priority) { containingNodes.clear(); nodesContainingLiveReplicas.clear(); List srcNodes = new ArrayList<>(); @@ -2496,6 +2501,8 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, if (isStriped && (state == StoredReplicaState.LIVE || state == StoredReplicaState.DECOMMISSIONING)) { liveBusyBlockIndices.add(blockIndex); + //HDFS-16566 ExcludeReconstructed won't be reconstructed. + excludeReconstructed.add(blockIndex); } continue; // already reached replication limit } @@ -2504,6 +2511,8 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, if (isStriped && (state == StoredReplicaState.LIVE || state == StoredReplicaState.DECOMMISSIONING)) { liveBusyBlockIndices.add(blockIndex); + //HDFS-16566 ExcludeReconstructed won't be reconstructed. + excludeReconstructed.add(blockIndex); } continue; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index d51d9d9235..69779a62da 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -672,10 +672,10 @@ public void addBlockToBeReplicated(Block block, */ void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets, - byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) { + byte[] liveBlockIndices, byte[] excludeReconstrutedIndices, ErasureCodingPolicy ecPolicy) { assert (block != null && sources != null && sources.length > 0); BlockECReconstructionInfo task = new BlockECReconstructionInfo(block, - sources, targets, liveBlockIndices, ecPolicy); + sources, targets, liveBlockIndices, excludeReconstrutedIndices, ecPolicy); erasurecodeBlocks.offer(task); BlockManager.LOG.debug("Adding block reconstruction task " + task + "to " + getName() + ", current queue size is " + erasurecodeBlocks.size()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index 6158677654..e5303a28d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@ -32,6 +32,7 @@ class ErasureCodingWork extends BlockReconstructionWork { private final byte[] liveBlockIndices; private final byte[] liveBusyBlockIndices; + private final byte[] excludeReconstructedIndices; private final String blockPoolId; public ErasureCodingWork(String blockPoolId, BlockInfo block, @@ -40,12 +41,14 @@ public ErasureCodingWork(String blockPoolId, BlockInfo block, List containingNodes, List liveReplicaStorages, int additionalReplRequired, int priority, - byte[] liveBlockIndices, byte[] liveBusyBlockIndices) { + byte[] liveBlockIndices, byte[] liveBusyBlockIndices, + byte[] excludeReconstrutedIndices) { super(block, bc, srcNodes, containingNodes, liveReplicaStorages, additionalReplRequired, priority); this.blockPoolId = blockPoolId; this.liveBlockIndices = liveBlockIndices; this.liveBusyBlockIndices = liveBusyBlockIndices; + this.excludeReconstructedIndices=excludeReconstrutedIndices; LOG.debug("Creating an ErasureCodingWork to {} reconstruct ", block); } @@ -147,7 +150,7 @@ void addTaskToDatanode(NumberReplicas numberReplicas) { } else { targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets, - getLiveBlockIndices(), stripedBlk.getErasureCodingPolicy()); + liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 9fb5d9e708..c2aa77f253 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -127,7 +127,7 @@ public void processErasureCodingTasks( reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(), reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(), reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(), - reconInfo.getTargetStorageIDs()); + reconInfo.getTargetStorageIDs(), reconInfo.getExcludeReconstructedIndices()); // It may throw IllegalArgumentException from task#stripedReader // constructor. final StripedBlockReconstructor task = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java index c166f5ec03..caf8dfa950 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java @@ -41,26 +41,28 @@ public class StripedReconstructionInfo { private final DatanodeInfo[] targets; private final StorageType[] targetStorageTypes; private final String[] targetStorageIds; + private final byte[] excludeReconstructedIndices; public StripedReconstructionInfo(ExtendedBlock blockGroup, ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources, byte[] targetIndices) { this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null, - null, null); + null, null, new byte[0]); } StripedReconstructionInfo(ExtendedBlock blockGroup, ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources, DatanodeInfo[] targets, StorageType[] targetStorageTypes, - String[] targetStorageIds) { + String[] targetStorageIds, byte[] excludeReconstructedIndices) { this(blockGroup, ecPolicy, liveIndices, sources, null, targets, - targetStorageTypes, targetStorageIds); + targetStorageTypes, targetStorageIds, excludeReconstructedIndices); } private StripedReconstructionInfo(ExtendedBlock blockGroup, ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources, byte[] targetIndices, DatanodeInfo[] targets, - StorageType[] targetStorageTypes, String[] targetStorageIds) { + StorageType[] targetStorageTypes, String[] targetStorageIds, + byte[] excludeReconstructedIndices) { this.blockGroup = blockGroup; this.ecPolicy = ecPolicy; @@ -70,6 +72,7 @@ private StripedReconstructionInfo(ExtendedBlock blockGroup, this.targets = targets; this.targetStorageTypes = targetStorageTypes; this.targetStorageIds = targetStorageIds; + this.excludeReconstructedIndices = excludeReconstructedIndices; } ExtendedBlock getBlockGroup() { @@ -104,5 +107,9 @@ String[] getTargetStorageIds() { return targetStorageIds; } + byte[] getExcludeReconstructedIndices() { + return excludeReconstructedIndices; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index 7f41dac591..89ee49a3c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -120,6 +120,7 @@ abstract class StripedReconstructor { private final CachingStrategy cachingStrategy; private long maxTargetLength = 0L; private final BitSet liveBitSet; + private final BitSet excludeBitSet; // metrics private AtomicLong bytesRead = new AtomicLong(0); @@ -137,6 +138,12 @@ abstract class StripedReconstructor { for (int i = 0; i < stripedReconInfo.getLiveIndices().length; i++) { liveBitSet.set(stripedReconInfo.getLiveIndices()[i]); } + excludeBitSet = new BitSet( + ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()); + for (int i = 0; i < stripedReconInfo.getExcludeReconstructedIndices().length; i++) { + excludeBitSet.set(stripedReconInfo.getExcludeReconstructedIndices()[i]); + } + blockGroup = stripedReconInfo.getBlockGroup(); stripedReader = new StripedReader(this, datanode, conf, stripedReconInfo); cachingStrategy = CachingStrategy.newDefaultStrategy(); @@ -261,6 +268,10 @@ BitSet getLiveBitSet() { return liveBitSet; } + BitSet getExcludeBitSet(){ + return excludeBitSet; + } + long getMaxTargetLength() { return maxTargetLength; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java index 4b7e1abdb1..b570c666a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java @@ -125,13 +125,14 @@ void init() throws IOException { private void initTargetIndices() { BitSet bitset = reconstructor.getLiveBitSet(); + BitSet excludebitset=reconstructor.getExcludeBitSet(); int m = 0; hasValidTargets = false; for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { if (!bitset.get(i)) { if (reconstructor.getBlockLen(i) > 0) { - if (m < targets.length) { + if (m < targets.length && !excludebitset.get(i)) { targetIndices[m++] = (short)i; hasValidTargets = true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECReconstructionCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECReconstructionCommand.java index b2495c8d6d..3b1e2d6084 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECReconstructionCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECReconstructionCommand.java @@ -78,21 +78,23 @@ public static class BlockECReconstructionInfo { private String[] targetStorageIDs; private StorageType[] targetStorageTypes; private final byte[] liveBlockIndices; + private final byte[] excludeReconstructedIndices; private final ErasureCodingPolicy ecPolicy; public BlockECReconstructionInfo(ExtendedBlock block, DatanodeInfo[] sources, DatanodeStorageInfo[] targetDnStorageInfo, - byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) { + byte[] liveBlockIndices, byte[] excludeReconstructedIndices, ErasureCodingPolicy ecPolicy) { this(block, sources, DatanodeStorageInfo .toDatanodeInfos(targetDnStorageInfo), DatanodeStorageInfo .toStorageIDs(targetDnStorageInfo), DatanodeStorageInfo - .toStorageTypes(targetDnStorageInfo), liveBlockIndices, ecPolicy); + .toStorageTypes(targetDnStorageInfo), liveBlockIndices, + excludeReconstructedIndices, ecPolicy); } public BlockECReconstructionInfo(ExtendedBlock block, DatanodeInfo[] sources, DatanodeInfo[] targets, String[] targetStorageIDs, StorageType[] targetStorageTypes, - byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) { + byte[] liveBlockIndices, byte[] excludeReconstructedIndices, ErasureCodingPolicy ecPolicy) { this.block = block; this.sources = sources; this.targets = targets; @@ -100,6 +102,7 @@ public BlockECReconstructionInfo(ExtendedBlock block, this.targetStorageTypes = targetStorageTypes; this.liveBlockIndices = liveBlockIndices == null ? new byte[]{} : liveBlockIndices; + this.excludeReconstructedIndices = excludeReconstructedIndices; this.ecPolicy = ecPolicy; } @@ -127,6 +130,10 @@ public byte[] getLiveBlockIndices() { return liveBlockIndices; } + public byte[] getExcludeReconstructedIndices() { + return excludeReconstructedIndices; + } + public ErasureCodingPolicy getErasureCodingPolicy() { return ecPolicy; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java index f139faa79a..3d35511d05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -472,7 +472,7 @@ public void testProcessErasureCodingTasksSubmitionShouldSucceed() BlockECReconstructionInfo invalidECInfo = new BlockECReconstructionInfo( new ExtendedBlock("bp-id", 123456), dataDNs, dnStorageInfo, liveIndices, - ecPolicy); + new byte[0], ecPolicy); List ecTasks = new ArrayList<>(); ecTasks.add(invalidECInfo); dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 7a889b2596..a608e6edd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -745,9 +745,10 @@ public void testBlockECRecoveryCommand() { DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] { targetDnInfos_0, targetDnInfos_1 }; byte[] liveBlkIndices0 = new byte[2]; + byte[] excludeReconstructedIndices0=new byte[2]; BlockECReconstructionInfo blkECRecoveryInfo0 = new BlockECReconstructionInfo( new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0, - liveBlkIndices0, StripedFileTestUtil.getDefaultECPolicy()); + liveBlkIndices0, excludeReconstructedIndices0, StripedFileTestUtil.getDefaultECPolicy()); DatanodeInfo[] dnInfos1 = new DatanodeInfo[] { DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() }; DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil @@ -759,9 +760,10 @@ public void testBlockECRecoveryCommand() { DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] { targetDnInfos_2, targetDnInfos_3 }; byte[] liveBlkIndices1 = new byte[2]; + byte[] excludeReconstructedIndices = new byte[2]; BlockECReconstructionInfo blkECRecoveryInfo1 = new BlockECReconstructionInfo( new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1, - liveBlkIndices1, StripedFileTestUtil.getDefaultECPolicy()); + liveBlkIndices1, excludeReconstructedIndices, StripedFileTestUtil.getDefaultECPolicy()); List blkRecoveryInfosList = new ArrayList(); blkRecoveryInfosList.add(blkECRecoveryInfo0); blkRecoveryInfosList.add(blkECRecoveryInfo1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 01288321c8..bf8af46cc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -700,6 +700,7 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception { new NumberReplicas(), new ArrayList(), new ArrayList(), + new ArrayList(), LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY)[0]); assertEquals("Does not choose a source node for a less-than-highest-priority" @@ -712,6 +713,7 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception { new NumberReplicas(), new ArrayList(), new ArrayList(), + new ArrayList(), LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY).length); // Increase the replication count to test replication count > hard limit @@ -727,6 +729,7 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception { new NumberReplicas(), new ArrayList(), new ArrayList(), + new ArrayList(), LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length); } @@ -773,6 +776,7 @@ public void testChooseSrcDatanodesWithDupEC() throws Exception { NumberReplicas numReplicas = new NumberReplicas(); List liveBlockIndices = new ArrayList<>(); List liveBusyBlockIndices = new ArrayList<>(); + List excludeReconstructedIndices = new ArrayList<>(); bm.chooseSourceDatanodes( aBlockInfoStriped, @@ -780,6 +784,7 @@ public void testChooseSrcDatanodesWithDupEC() throws Exception { liveNodes, numReplicas, liveBlockIndices, liveBusyBlockIndices, + excludeReconstructedIndices, LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY); assertEquals("Choose the source node for reconstruction with one node reach" @@ -836,6 +841,7 @@ public void testChooseSrcDNWithDupECInDecommissioningNode() throws Exception { NumberReplicas numReplicas = new NumberReplicas(); List liveBlockIndices = new ArrayList<>(); List liveBusyBlockIndices = new ArrayList<>(); + List excludeReconstructedIndices = new ArrayList<>(); bm.chooseSourceDatanodes( aBlockInfoStriped, @@ -843,6 +849,7 @@ public void testChooseSrcDNWithDupECInDecommissioningNode() throws Exception { nodesContainingLiveReplicas, numReplicas, liveBlockIndices, liveBusyBlockIndices, + excludeReconstructedIndices, LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY); assertEquals("There are 5 live replicas in " + "[ds2, ds3, ds4, ds5, ds6] datanodes ", @@ -975,6 +982,7 @@ public void testFavorDecomUntilHardLimit() throws Exception { new NumberReplicas(), new LinkedList(), new ArrayList(), + new ArrayList(), LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY)[0]); @@ -991,6 +999,7 @@ public void testFavorDecomUntilHardLimit() throws Exception { new NumberReplicas(), new LinkedList(), new ArrayList(), + new ArrayList(), LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY).length); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java index fc307bf84d..0a7874fc7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.StripedFileTestUtil; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -45,6 +46,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -84,6 +86,7 @@ private void initConf(Configuration conf) { // chooseUnderReplicatedBlocks at once. conf.setInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5); + } @Test @@ -430,4 +433,86 @@ public void testReconstructionWork() throws Exception { dfsCluster.shutdown(); } } + private byte[] writeStripedFile(DistributedFileSystem fs, Path ecFile, + int writeBytes) throws Exception { + byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes); + DFSTestUtil.writeFile(fs, ecFile, new String(bytes)); + StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString()); + + return bytes; + } + @Test + public void testReconstrutionWithBusyBlock1() throws Exception { + //When the index of busy block is smaller than the missing block + //[0(busy),1(busy),3,4,5,6,7,8] + int busyNodeIndex1 = 0; + int busyNodeIndex2 = 1; + int deadNodeIndex = 2; + final Path ecDir = new Path(GenericTestUtils.getRandomizedTempPath()); + final Path ecFile = new Path(ecDir, "testReconstrutionWithBusyBlock1"); + int writeBytes = cellSize * dataBlocks; + HdfsConfiguration conf = new HdfsConfiguration(); + initConf(conf); + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, false); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 2000); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + 1000); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, + 4); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + 1); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(groupSize + 5) + .build(); + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(0); + dfs.enableErasureCodingPolicy( + StripedFileTestUtil.getDefaultECPolicy().getName()); + dfs.mkdirs(ecDir); + dfs.setErasureCodingPolicy(ecDir, + StripedFileTestUtil.getDefaultECPolicy().getName()); + byte[] originBytesArray = writeStripedFile(dfs, ecFile, writeBytes); + List lbs = ((HdfsDataInputStream) dfs.open(ecFile)) + .getAllBlocks(); + LocatedStripedBlock lsb = (LocatedStripedBlock) lbs.get(0); + DatanodeInfo[] dnList = lsb.getLocations(); + BlockManager bm = cluster.getNamesystem().getBlockManager(); + BlockInfoStriped blockInfo = + (BlockInfoStriped) bm.getStoredBlock( + new Block(lsb.getBlock().getBlockId())); + + //1.Make nodes busy + DatanodeDescriptor busyNode = bm.getDatanodeManager() + .getDatanode(dnList[busyNodeIndex1].getDatanodeUuid()); + for (int j = 0; j < maxReplicationStreams; j++) { + busyNode.incrementPendingReplicationWithoutTargets(); + } + DatanodeDescriptor busyNode2 = bm.getDatanodeManager() + .getDatanode(dnList[busyNodeIndex2].getDatanodeUuid()); + for (int j = 0; j < maxReplicationStreams; j++) { + busyNode2.incrementPendingReplicationWithoutTargets(); + } + + //2.Make a node missing + DataNode dn = cluster.getDataNode(dnList[deadNodeIndex].getIpcPort()); + cluster.stopDataNode(dnList[deadNodeIndex].getXferAddr()); + cluster.setDataNodeDead(dn.getDatanodeId()); + + //3.Whether there is excess replicas or not during the recovery? + assertEquals(8, bm.countNodes(blockInfo).liveReplicas()); + + GenericTestUtils.waitFor( + () -> { + return bm.countNodes(blockInfo).liveReplicas() == 9|| + bm.countNodes(blockInfo).excessReplicas() >= 1|| + bm.countNodes(blockInfo).redundantInternalBlocks() >= 1; + }, + 10, 100000); + + assertEquals(0, bm.countNodes(blockInfo).excessReplicas()); + assertEquals(9, bm.countNodes(blockInfo).liveReplicas()); + } + }