HDFS-12033. DatanodeManager picking EC recovery tasks should also consider the number of regular replication tasks. Contributed by Lei (Eddy) Xu.
This commit is contained in:
parent
a9d3412b4c
commit
144753e87f
@ -1661,6 +1661,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
|
|||||||
if (pendingList != null) {
|
if (pendingList != null) {
|
||||||
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
|
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
|
||||||
pendingList));
|
pendingList));
|
||||||
|
maxTransfers -= pendingList.size();
|
||||||
}
|
}
|
||||||
// check pending erasure coding tasks
|
// check pending erasure coding tasks
|
||||||
List<BlockECReconstructionInfo> pendingECList = nodeinfo
|
List<BlockECReconstructionInfo> pendingECList = nodeinfo
|
||||||
|
@ -44,13 +44,21 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -491,4 +499,47 @@ public void testRemoveIncludedNode() throws IOException {
|
|||||||
Assert.assertEquals("Unexpected host or host in unexpected position",
|
Assert.assertEquals("Unexpected host or host in unexpected position",
|
||||||
"127.0.0.1:23456", bothAgain.get(1).getInfoAddr());
|
"127.0.0.1:23456", bothAgain.get(1).getInfoAddr());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPendingRecoveryTasks() throws IOException {
|
||||||
|
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
|
||||||
|
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
DatanodeManager dm = Mockito.spy(mockDatanodeManager(fsn, conf));
|
||||||
|
|
||||||
|
int maxTransfers = 20;
|
||||||
|
int numPendingTasks = 7;
|
||||||
|
int numECTasks = maxTransfers - numPendingTasks;
|
||||||
|
|
||||||
|
DatanodeDescriptor nodeInfo = Mockito.mock(DatanodeDescriptor.class);
|
||||||
|
Mockito.when(nodeInfo.isRegistered()).thenReturn(true);
|
||||||
|
Mockito.when(nodeInfo.getStorageInfos())
|
||||||
|
.thenReturn(new DatanodeStorageInfo[0]);
|
||||||
|
|
||||||
|
List<BlockTargetPair> pendingList =
|
||||||
|
Collections.nCopies(numPendingTasks, new BlockTargetPair(null, null));
|
||||||
|
Mockito.when(nodeInfo.getReplicationCommand(maxTransfers))
|
||||||
|
.thenReturn(pendingList);
|
||||||
|
List<BlockECReconstructionInfo> ecPendingList =
|
||||||
|
Collections.nCopies(numECTasks, null);
|
||||||
|
|
||||||
|
Mockito.when(nodeInfo.getErasureCodeCommand(numECTasks))
|
||||||
|
.thenReturn(ecPendingList);
|
||||||
|
DatanodeRegistration dnReg = Mockito.mock(DatanodeRegistration.class);
|
||||||
|
Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo);
|
||||||
|
|
||||||
|
DatanodeCommand[] cmds = dm.handleHeartbeat(
|
||||||
|
dnReg, new StorageReport[1], "bp-123", 0, 0, 10, maxTransfers, 0, null,
|
||||||
|
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
|
||||||
|
|
||||||
|
assertEquals(2, cmds.length);
|
||||||
|
assertTrue(cmds[0] instanceof BlockCommand);
|
||||||
|
BlockCommand replicaCmd = (BlockCommand) cmds[0];
|
||||||
|
assertEquals(numPendingTasks, replicaCmd.getBlocks().length);
|
||||||
|
assertEquals(numPendingTasks, replicaCmd.getTargets().length);
|
||||||
|
assertTrue(cmds[1] instanceof BlockECReconstructionCommand);
|
||||||
|
BlockECReconstructionCommand ecRecoveryCmd =
|
||||||
|
(BlockECReconstructionCommand) cmds[1];
|
||||||
|
assertEquals(numECTasks, ecRecoveryCmd.getECTasks().size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user