diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 6ad0e4d22a..acac65d774 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -539,6 +539,10 @@ public void setIndices(byte[] indices) { this.indices = indices; } + public byte[] getIndices() { + return this.indices; + } + /** * Adjust EC block indices,it will remove the element of adjustList from indices. * @param adjustList the list will be removed from indices @@ -889,8 +893,8 @@ private long getBlockList() throws IOException, IllegalArgumentException { if (g != null) { // not unknown block.addLocation(g); } else if (blkLocs instanceof StripedBlockWithLocations) { - // some datanode may not in storageGroupMap due to decommission operation - // or balancer cli with "-exclude" parameter + // some datanode may not in storageGroupMap due to decommission or maintenance + // operation or balancer cli with "-exclude" parameter adjustList.add(i); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 63fe238cd5..dbe10cca92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -49,6 +49,7 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Lists; +import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -222,12 +223,27 @@ DBlock newDBlock(LocatedBlock lb, List locations, } else { db = new DBlock(blk); } - for(MLocation ml : locations) { + + List adjustList = new ArrayList<>(); + for (int i = 0; i < locations.size(); i++) { + MLocation ml = locations.get(i); StorageGroup source = storages.getSource(ml); if (source != null) { db.addLocation(source); + } else if (lb.isStriped()) { + // some datanode may not in storages due to decommission or maintenance operation + // or balancer cli with "-exclude" parameter + adjustList.add(i); } } + + if (!adjustList.isEmpty()) { + // block.locations mismatch with block.indices + // adjust indices to get correct internalBlock + ((DBlockStriped) db).adjustIndices(adjustList); + Preconditions.checkArgument(((DBlockStriped) db).getIndices().length + == db.getLocations().size()); + } return db; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index 90353c352e..9794ea9762 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_KERBEROS_PRINCIPAL_KEY; @@ -73,6 +74,7 @@ import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -82,10 +84,13 @@ import org.apache.hadoop.hdfs.server.balancer.ExitStatus; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.balancer.TestBalancer; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.mover.Mover.MLocation; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -98,6 +103,7 @@ import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.MetricsAsserts; +import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.ToolRunner; import org.junit.Assert; import org.junit.Test; @@ -1005,6 +1011,148 @@ public void testMoverWithStripedFile() throws Exception { } } + @Test(timeout = 300000) + public void testMoverWithStripedFileMaintenance() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConfWithStripe(conf); + + // Start 9 datanodes + int numOfDatanodes = 9; + int storagesPerDatanode = 2; + long capacity = 9 * defaultBlockSize; + long[][] capacities = new long[numOfDatanodes][storagesPerDatanode]; + for (int i = 0; i < numOfDatanodes; i++) { + for(int j = 0; j < storagesPerDatanode; j++){ + capacities[i][j] = capacity; + } + } + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numOfDatanodes) + .storagesPerDatanode(storagesPerDatanode) + .storageTypes(new StorageType[][]{ + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}, + {StorageType.SSD, StorageType.SSD}}) + .storageCapacities(capacities) + .build(); + + try { + cluster.waitActive(); + cluster.getFileSystem().enableErasureCodingPolicy( + StripedFileTestUtil.getDefaultECPolicy().getName()); + + ClientProtocol client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); + String barDir = "/bar"; + client.mkdirs(barDir, new FsPermission((short) 777), true); + // Set "/bar" directory with ALL_SSD storage policy. + client.setStoragePolicy(barDir, "ALL_SSD"); + // Set an EC policy on "/bar" directory + client.setErasureCodingPolicy(barDir, + StripedFileTestUtil.getDefaultECPolicy().getName()); + + // Write file to barDir + final String fooFile = "/bar/foo"; + long fileLen = 6 * defaultBlockSize; + DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile), + fileLen, (short) 3, 0); + + // Verify storage types and locations + LocatedBlocks locatedBlocks = + client.getBlockLocations(fooFile, 0, fileLen); + DatanodeInfoWithStorage location = null; + for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){ + location = lb.getLocations()[8]; + for(StorageType type : lb.getStorageTypes()){ + Assert.assertEquals(StorageType.SSD, type); + } + } + + // Maintain the last datanode later + FSNamesystem ns = cluster.getNamesystem(0); + DatanodeManager datanodeManager = ns.getBlockManager().getDatanodeManager(); + DatanodeDescriptor dn = datanodeManager.getDatanode(location.getDatanodeUuid()); + + StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, + dataBlocks + parityBlocks); + + // Start 5 more datanodes for mover + capacities = new long[5][storagesPerDatanode]; + for (int i = 0; i < 5; i++) { + for(int j = 0; j < storagesPerDatanode; j++){ + capacities[i][j] = capacity; + } + } + cluster.startDataNodes(conf, 5, + new StorageType[][]{ + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}}, + true, null, null, null, capacities, + null, false, false, false, null, null, null); + cluster.triggerHeartbeats(); + + // Move blocks to DISK + client.setStoragePolicy(barDir, "HOT"); + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[]{"-p", barDir}); + // Verify the number of DISK storage types + waitForLocatedBlockWithDiskStorageType(cluster.getFileSystem(), fooFile, 5); + + // Maintain a datanode that simulates that one node in the location list + // is in ENTERING_MAINTENANCE status. + datanodeManager.getDatanode(dn.getDatanodeUuid()).startMaintenance(); + waitNodeState(dn, DatanodeInfo.AdminStates.ENTERING_MAINTENANCE); + + // Move blocks back to SSD. + // Without HDFS-17599, locations and indices lengths might not match, + // resulting in getting the wrong blockId in DBlockStriped#getInternalBlock, + // and mover will fail to run. + client.setStoragePolicy(barDir, "ALL_SSD"); + rc = ToolRunner.run(conf, new Mover.Cli(), + new String[]{"-p", barDir}); + + Assert.assertEquals("Movement to HOT should be successful", 0, rc); + } finally { + cluster.shutdown(); + } + } + + /** + * Wait till DataNode is transitioned to the expected state. + */ + protected void waitNodeState(DatanodeInfo node, DatanodeInfo.AdminStates state) { + waitNodeState(Lists.newArrayList(node), state); + } + + /** + * Wait till all DataNodes are transitioned to the expected state. + */ + protected void waitNodeState(List nodes, DatanodeInfo.AdminStates state) { + for (DatanodeInfo node : nodes) { + boolean done = (state == node.getAdminState()); + while (!done) { + LOG.info("Waiting for node " + node + " to change state to " + + state + " current state: " + node.getAdminState()); + try { + Thread.sleep(DFS_HEARTBEAT_INTERVAL_DEFAULT * 10); + } catch (InterruptedException e) { + // nothing + } + done = (state == node.getAdminState()); + } + LOG.info("node " + node + " reached the state " + state); + } + } + /** * Wait until Namenode reports expected storage type for all blocks of * given file.