diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 9ec28f9c02..5dc40fa10e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -891,7 +891,15 @@ public boolean commitOrCompleteLastBlock(BlockCollection bc, lastBlock.getUnderConstructionFeature() .updateStorageScheduledSize((BlockInfoStriped) lastBlock); } - if (hasMinStorage(lastBlock)) { + + // Count replicas on decommissioning nodes, as these will not be + // decommissioned unless recovery/completing last block has finished + NumberReplicas numReplicas = countNodes(lastBlock); + int numUsableReplicas = numReplicas.liveReplicas() + + numReplicas.decommissioning() + + numReplicas.liveEnteringMaintenanceReplicas(); + + if (hasMinStorage(lastBlock, numUsableReplicas)) { if (committed) { addExpectedReplicasToPending(lastBlock); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index 94e894619d..dc0edccdb1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -33,6 +33,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -646,6 +647,53 @@ public void testDecommissionWithOpenfile() throws IOException, InterruptedExcept fdos.close(); } + + @Test(timeout = 360000) + public void testDecommissionWithOpenFileAndBlockRecovery() + throws IOException, InterruptedException { + startCluster(1, 6); + getCluster().waitActive(); + + Path file = new Path("/testRecoveryDecommission"); + + // Create a file and never close the output stream to trigger recovery + DistributedFileSystem dfs = getCluster().getFileSystem(); + FSDataOutputStream out = dfs.create(file, true, + getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), + (short) 3, blockSize); + + // Write data to the file + long writtenBytes = 0; + while (writtenBytes < fileSize) { + out.writeLong(writtenBytes); + writtenBytes += 8; + } + out.hsync(); + + DatanodeInfo[] lastBlockLocations = NameNodeAdapter.getBlockLocations( + getCluster().getNameNode(), "/testRecoveryDecommission", 0, fileSize) + .getLastLocatedBlock().getLocations(); + + // Decommission all nodes of the last block + ArrayList toDecom = new ArrayList<>(); + for (DatanodeInfo dnDecom : lastBlockLocations) { + toDecom.add(dnDecom.getXferAddr()); + } + initExcludeHosts(toDecom); + refreshNodes(0); + + // Make sure hard lease expires to trigger replica recovery + getCluster().setLeasePeriod(300L, 300L); + Thread.sleep(2 * BLOCKREPORT_INTERVAL_MSEC); + + for (DatanodeInfo dnDecom : lastBlockLocations) { + DatanodeInfo datanode = NameNodeAdapter.getDatanode( + getCluster().getNamesystem(), dnDecom); + waitNodeState(datanode, AdminStates.DECOMMISSIONED); + } + + assertEquals(dfs.getFileStatus(file).getLen(), writtenBytes); + } /** * Tests restart of namenode while datanode hosts are added to exclude file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java index 24321533d1..a37bdb8cb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java @@ -31,6 +31,8 @@ import java.util.Map; import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +43,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.util.Time; import org.junit.Test; @@ -940,6 +944,53 @@ public void testInvalidation() throws IOException { cleanupFile(fileSys, file); } + @Test(timeout = 120000) + public void testFileCloseAfterEnteringMaintenance() throws Exception { + LOG.info("Starting testFileCloseAfterEnteringMaintenance"); + int expirationInMs = 30 * 1000; + int numDataNodes = 3; + int numNameNodes = 1; + getConf().setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 2); + + startCluster(numNameNodes, numDataNodes); + getCluster().waitActive(); + + FSNamesystem fsn = getCluster().getNameNode().getNamesystem(); + List hosts = new ArrayList<>(); + for (DataNode dn : getCluster().getDataNodes()) { + hosts.add(dn.getDisplayName()); + putNodeInService(0, dn.getDatanodeUuid()); + } + assertEquals(numDataNodes, fsn.getNumLiveDataNodes()); + + Path openFile = new Path("/testClosingFileInMaintenance.dat"); + // Lets write 2 blocks of data to the openFile + writeFile(getCluster().getFileSystem(), openFile, (short) 3); + + // Lets write some more data and keep the file open + FSDataOutputStream fsDataOutputStream = getCluster().getFileSystem() + .append(openFile); + byte[] bytes = new byte[1024]; + fsDataOutputStream.write(bytes); + fsDataOutputStream.hsync(); + + LocatedBlocks lbs = NameNodeAdapter.getBlockLocations( + getCluster().getNameNode(0), openFile.toString(), 0, 3 * blockSize); + DatanodeInfo[] dnInfos4LastBlock = lbs.getLastLocatedBlock().getLocations(); + + // Request maintenance for DataNodes 1 and 2 which has the last block. + takeNodeOutofService(0, + Lists.newArrayList(dnInfos4LastBlock[0].getDatanodeUuid(), + dnInfos4LastBlock[1].getDatanodeUuid()), + Time.now() + expirationInMs, + null, null, AdminStates.ENTERING_MAINTENANCE); + + // Closing the file should succeed even when the + // last blocks' nodes are entering maintenance. + fsDataOutputStream.close(); + cleanupFile(getCluster().getFileSystem(), openFile); + } + static String getFirstBlockFirstReplicaUuid(FileSystem fileSys, Path name) throws IOException { DatanodeInfo[] nodes = getFirstBlockReplicasDatanodeInfos(fileSys, name);