HDFS-17094. EC: Fix bug in block recovery when there are stale datanodes. (#5854)
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org> Signed-off-by: Tao Li <tomscut@apache.org>
This commit is contained in:
parent
23ecc32d3a
commit
7ba2bd6305
@ -147,6 +147,14 @@ public byte[] getBlockIndices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
public byte[] getBlockIndicesForSpecifiedStorages(List<Integer> storageIdx) {
|
||||
byte[] indices = new byte[storageIdx.size()];
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
indices[i] = BlockIdManager.getBlockIndex(replicas[storageIdx.get(i)]);
|
||||
}
|
||||
return indices;
|
||||
}
|
||||
|
||||
public int getNumExpectedLocations() {
|
||||
return replicas.length;
|
||||
}
|
||||
|
@ -1720,9 +1720,11 @@ private BlockRecoveryCommand getBlockRecoveryCommand(String blockPoolId,
|
||||
// Skip stale nodes during recovery
|
||||
final List<DatanodeStorageInfo> recoveryLocations =
|
||||
new ArrayList<>(storages.length);
|
||||
for (DatanodeStorageInfo storage : storages) {
|
||||
if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
|
||||
recoveryLocations.add(storage);
|
||||
final List<Integer> storageIdx = new ArrayList<>(storages.length);
|
||||
for (int i = 0; i < storages.length; ++i) {
|
||||
if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
|
||||
recoveryLocations.add(storages[i]);
|
||||
storageIdx.add(i);
|
||||
}
|
||||
}
|
||||
// If we are performing a truncate recovery than set recovery fields
|
||||
@ -1755,7 +1757,8 @@ private BlockRecoveryCommand getBlockRecoveryCommand(String blockPoolId,
|
||||
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
|
||||
uc.getBlockRecoveryId());
|
||||
if (b.isStriped()) {
|
||||
rBlock = new RecoveringStripedBlock(rBlock, uc.getBlockIndices(),
|
||||
rBlock = new RecoveringStripedBlock(rBlock,
|
||||
uc.getBlockIndicesForSpecifiedStorages(storageIdx),
|
||||
((BlockInfoStriped) b).getErasureCodingPolicy());
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,10 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
@ -188,6 +192,62 @@ public void testLeaseRecovery() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test lease recovery for EC policy when one internal block located on
|
||||
* stale datanode.
|
||||
*/
|
||||
@Test
|
||||
public void testLeaseRecoveryWithStaleDataNode() {
|
||||
LOG.info("blockLengthsSuite: " +
|
||||
Arrays.toString(blockLengthsSuite));
|
||||
long staleInterval = conf.getLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
|
||||
|
||||
for (int i = 0; i < blockLengthsSuite.length; i++) {
|
||||
BlockLengths blockLengths = blockLengthsSuite[i];
|
||||
try {
|
||||
writePartialBlocks(blockLengths.getBlockLengths());
|
||||
|
||||
// Get block info for the last block and mark corresponding datanode
|
||||
// as stale.
|
||||
LocatedBlock locatedblock =
|
||||
TestInterDatanodeProtocol.getLastLocatedBlock(
|
||||
dfs.dfs.getNamenode(), p.toString());
|
||||
DatanodeInfo firstDataNode = locatedblock.getLocations()[0];
|
||||
DatanodeDescriptor dnDes = cluster.getNameNode().getNamesystem()
|
||||
.getBlockManager().getDatanodeManager()
|
||||
.getDatanode(firstDataNode);
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(
|
||||
cluster.getDataNode(dnDes.getIpcPort()), true);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, -(staleInterval + 1));
|
||||
|
||||
long[] longArray = new long[blockLengths.getBlockLengths().length - 1];
|
||||
for (int j = 0; j < longArray.length; ++j) {
|
||||
longArray[j] = blockLengths.getBlockLengths()[j + 1];
|
||||
}
|
||||
int safeLength = (int) StripedBlockUtil.getSafeLength(ecPolicy,
|
||||
longArray);
|
||||
int checkDataLength = Math.min(testFileLength, safeLength);
|
||||
recoverLease();
|
||||
List<Long> oldGS = new ArrayList<>();
|
||||
oldGS.add(1001L);
|
||||
StripedFileTestUtil.checkData(dfs, p, checkDataLength,
|
||||
new ArrayList<>(), oldGS, blockGroupSize);
|
||||
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(
|
||||
cluster.getDataNode(dnDes.getIpcPort()), false);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, 0);
|
||||
|
||||
} catch (Throwable e) {
|
||||
String msg = "failed testCase at i=" + i + ", blockLengths="
|
||||
+ blockLengths + "\n"
|
||||
+ StringUtils.stringifyException(e);
|
||||
Assert.fail(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSafeLength() {
|
||||
checkSafeLength(0, 0); // Length of: 0
|
||||
|
Loading…
Reference in New Issue
Block a user