From 15af52954f32c0ab24691968ebfa771a0e50dd04 Mon Sep 17 00:00:00 2001 From: hfutatzhanghb Date: Tue, 27 Feb 2024 10:19:57 +0800 Subject: [PATCH] HDFS-17358. EC: infinite lease recovery caused by the length of RWR equals to zero or datanode does not have the replica. (#6509). Contributed by farmmamba. Reviewed-by: Tao Li Reviewed-by: Haiyang Hu Signed-off-by: Shuyan Zhang --- .../server/datanode/BlockRecoveryWorker.java | 44 +++++++++++++++---- .../erasurecode/StripedBlockReader.java | 4 +- .../hadoop/hdfs/TestLeaseRecoveryStriped.java | 29 ++++++++++++ 3 files changed, 66 insertions(+), 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java index b24849fb38..cbdd924e61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java @@ -386,6 +386,8 @@ protected void recover() throws IOException { Map syncBlocks = new HashMap<>(locs.length); final int dataBlkNum = ecPolicy.getNumDataUnits(); final int totalBlkNum = dataBlkNum + ecPolicy.getNumParityUnits(); + int zeroLenReplicaCnt = 0; + int dnNotHaveReplicaCnt = 0; //check generation stamps for (int i = 0; i < locs.length; i++) { DatanodeID id = locs[i]; @@ -419,10 +421,14 @@ protected void recover() throws IOException { if (info == null) { LOG.debug("Block recovery: DataNode: {} does not have " + "replica for block: (block={}, internalBlk={})", id, block, internalBlk); + dnNotHaveReplicaCnt++; } else { LOG.debug("Block recovery: Ignored replica with invalid " + "generation stamp or length: {} from DataNode: {} by block: {}", info, id, block); + if (info.getNumBytes() == 0) { + zeroLenReplicaCnt++; + } } } } catch (RecoveryInProgressException ripE) { @@ -436,9 +442,18 @@ protected void recover() throws IOException { "datanode={})", block, internalBlk, id, e); } } - checkLocations(syncBlocks.size()); - final long safeLength = getSafeLength(syncBlocks); + final long safeLength; + if (dnNotHaveReplicaCnt + zeroLenReplicaCnt <= locs.length - ecPolicy.getNumDataUnits()) { + checkLocations(syncBlocks.size()); + safeLength = getSafeLength(syncBlocks); + } else { + safeLength = 0; + LOG.warn("Block recovery: {} datanodes do not have the replica of block {}." + + " {} datanodes have zero-length replica. Will remove this block.", + dnNotHaveReplicaCnt, block, zeroLenReplicaCnt); + } + LOG.debug("Recovering block {}, length={}, safeLength={}, syncList={}", block, block.getNumBytes(), safeLength, syncBlocks); @@ -452,11 +467,13 @@ protected void recover() throws IOException { rurList.add(r); } } - assert rurList.size() >= dataBlkNum : "incorrect safe length"; - // Recovery the striped block by truncating internal blocks to the safe - // length. Abort if there is any failure in this step. - truncatePartialBlock(rurList, safeLength); + if (safeLength > 0) { + Preconditions.checkArgument(rurList.size() >= dataBlkNum, "incorrect safe length"); + // Recovery the striped block by truncating internal blocks to the safe + // length. Abort if there is any failure in this step. + truncatePartialBlock(rurList, safeLength); + } // notify Namenode the new size and locations final DatanodeID[] newLocs = new DatanodeID[totalBlkNum]; @@ -469,11 +486,20 @@ protected void recover() throws IOException { int index = (int) (r.rInfo.getBlockId() & HdfsServerConstants.BLOCK_GROUP_INDEX_MASK); newLocs[index] = r.id; - newStorages[index] = r.storageID; + if (r.storageID != null) { + newStorages[index] = r.storageID; + } } ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(), safeLength, recoveryId); DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(bpid); + if (safeLength == 0) { + nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(), + newBlock.getNumBytes(), true, true, newLocs, newStorages); + LOG.info("After block recovery, the length of new block is 0. " + + "Will remove this block: {} from file.", newBlock); + return; + } nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false, newLocs, newStorages); } @@ -527,8 +553,8 @@ long getSafeLength(Map syncBlocks) { private void checkLocations(int locationCount) throws IOException { if (locationCount < ecPolicy.getNumDataUnits()) { - throw new IOException(block + " has no enough internal blocks" + - ", unable to start recovery. Locations=" + Arrays.asList(locs)); + throw new IOException(block + " has no enough internal blocks(current: " + locationCount + + "), unable to start recovery. Locations=" + Arrays.asList(locs)); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java index 54302e3c25..04cef92fba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java @@ -133,8 +133,8 @@ private BlockReader createBlockReader(long offsetInBlock) { block.getNumBytes() - offsetInBlock, true, "", peer, source, null, stripedReader.getCachingStrategy(), -1, conf); } catch (IOException e) { - LOG.info("Exception while creating remote block reader, datanode {}", - source, e); + LOG.info("Exception while creating remote block reader for {}, datanode {}", + block, source, e); IOUtils.closeStream(peer); return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java index c02c04a616..01e19d8d51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java @@ -259,6 +259,35 @@ public void testSafeLength() { checkSafeLength(1024 * 1024 * 1024, 6442450944L); // Length of: 1 GiB } + /** + * 1. Write 1MB data, then flush it. + * 2. Mock client quiet exceptionally. + * 3. Trigger lease recovery. + * 4. Lease recovery successfully. + */ + @Test + public void testLeaseRecoveryWithManyZeroLengthReplica() { + int curCellSize = (int)1024 * 1024; + try { + final FSDataOutputStream out = dfs.create(p); + final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream) out + .getWrappedStream(); + for (int pos = 0; pos < curCellSize; pos++) { + out.write(StripedFileTestUtil.getByte(pos)); + } + for (int i = 0; i < dataBlocks + parityBlocks; i++) { + StripedDataStreamer s = stripedOut.getStripedDataStreamer(i); + waitStreamerAllAcked(s); + stopBlockStream(s); + } + recoverLease(); + LOG.info("Trigger recover lease manually successfully."); + } catch (Throwable e) { + String msg = "failed testCase" + StringUtils.stringifyException(e); + Assert.fail(msg); + } + } + private void checkSafeLength(int blockLength, long expectedSafeLength) { int[] blockLengths = new int[]{blockLength, blockLength, blockLength, blockLength, blockLength, blockLength};