diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 35038a7a8a..6ab98e5880 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -319,6 +319,9 @@ public void commitBlockSynchronization(ExtendedBlock block, .setNewLength(newlength).setCloseFile(closeFile) .setDeleteBlock(deleteblock); for (int i = 0; i < newtargets.length; i++) { + if (newtargets[i] == null) { + continue; + } builder.addNewTaragets(PBHelperClient.convert(newtargets[i])); builder.addNewTargetStorages(newtargetstorages[i]); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java index fe0c7f7555..db52d073fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java @@ -467,7 +467,7 @@ protected void recover() throws IOException { // notify Namenode the new size and locations final DatanodeID[] newLocs = new DatanodeID[totalBlkNum]; final String[] newStorages = new String[totalBlkNum]; - for (int i = 0; i < totalBlkNum; i++) { + for (int i = 0; i < blockIndices.length; i++) { newLocs[blockIndices[i]] = DatanodeID.EMPTY_DATANODE_ID; newStorages[blockIndices[i]] = ""; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index f12285cb9b..6ddd880d8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -32,6 +32,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -42,6 +44,7 @@ import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.ArrayList; @@ -96,6 +99,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.test.GenericTestUtils; @@ -703,7 +707,67 @@ public void testNotMatchedReplicaID() throws IOException { streams.close(); } } - + + @Test(timeout = 60000) + public void testEcRecoverBlocks() throws Throwable { + // Stop the Mocked DN started in startup() + tearDown(); + ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(8).build(); + + try { + cluster.waitActive(); + NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); + NamenodeProtocols spyNN = spy(preSpyNN); + + // Delay completeFile + GenericTestUtils.DelayAnswer delayer = + new GenericTestUtils.DelayAnswer(LOG); + doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), any(), + anyLong()); + String topDir = "/myDir"; + DFSClient client = new DFSClient(null, spyNN, conf, null); + Path file = new Path(topDir + "/testECLeaseRecover"); + client.mkdirs(topDir, null, false); + client.enableErasureCodingPolicy(ecPolicy.getName()); + client.setErasureCodingPolicy(topDir, ecPolicy.getName()); + OutputStream stm = client.create(file.toString(), true); + + // write 5MB File + AppendTestUtil.write(stm, 0, 1024 * 1024 * 5); + final AtomicReference err = new AtomicReference(); + Thread t = new Thread() { + @Override + public void run() { + try { + stm.close(); + } catch (Throwable t) { + err.set(t); + } + } + }; + t.start(); + + // Waiting for close to get to latch + delayer.waitForCall(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + return client.getNamenode().recoverLease(file.toString(), + client.getClientName()); + } catch (IOException e) { + return false; + } + } + }, 5000, 24000); + delayer.proceed(); + } finally { + cluster.shutdown(); + } + } + /** * Test to verify the race between finalizeBlock and Lease recovery *