diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 5411b5c8f4..8be3fb05ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -491,7 +491,7 @@ public class Dispatcher { public static class DBlockStriped extends DBlock { - final byte[] indices; + private byte[] indices; final short dataBlockNum; final int cellSize; @@ -528,6 +528,29 @@ public class Dispatcher { } return block.getNumBytes(); } + + public void setIndices(byte[] indices) { + this.indices = indices; + } + + /** + * Adjust EC block indices,it will remove the element of adjustList from indices. + * @param adjustList the list will be removed from indices + */ + public void adjustIndices(List adjustList) { + if (adjustList.isEmpty()) { + return; + } + + byte[] newIndices = new byte[indices.length - adjustList.size()]; + for (int i = 0, j = 0; i < indices.length; ++i) { + if (!adjustList.contains(i)) { + newIndices[j] = indices[i]; + ++j; + } + } + this.indices = newIndices; + } } /** The class represents a desired move. */ @@ -804,7 +827,7 @@ public class Dispatcher { * * @return the total size of the received blocks in the number of bytes. */ - private long getBlockList() throws IOException { + private long getBlockList() throws IOException, IllegalArgumentException { final long size = Math.min(getBlocksSize, blocksToReceive); final BlocksWithLocations newBlksLocs = nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize); @@ -841,7 +864,14 @@ public class Dispatcher { synchronized (block) { block.clearLocations(); + if (blkLocs instanceof StripedBlockWithLocations) { + // EC block may adjust indices before, avoid repeated adjustments + ((DBlockStriped) block).setIndices( + ((StripedBlockWithLocations) blkLocs).getIndices()); + } + // update locations + List adjustList = new ArrayList<>(); final String[] datanodeUuids = blkLocs.getDatanodeUuids(); final StorageType[] storageTypes = blkLocs.getStorageTypes(); for (int i = 0; i < datanodeUuids.length; i++) { @@ -849,8 +879,20 @@ public class Dispatcher { datanodeUuids[i], storageTypes[i]); if (g != null) { // not unknown block.addLocation(g); + } else if (blkLocs instanceof StripedBlockWithLocations) { + // some datanode may not in storageGroupMap due to decommission operation + // or balancer cli with "-exclude" parameter + adjustList.add(i); } } + + if (!adjustList.isEmpty()) { + // block.locations mismatch with block.indices + // adjust indices to get correct internalBlock for Datanode in #getInternalBlock + ((DBlockStriped) block).adjustIndices(adjustList); + Preconditions.checkArgument(((DBlockStriped) block).indices.length + == block.locations.size()); + } } if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) { if (LOG.isTraceEnabled()) { @@ -970,7 +1012,7 @@ public class Dispatcher { } blocksToReceive -= received; continue; - } catch (IOException e) { + } catch (IOException | IllegalArgumentException e) { LOG.warn("Exception while getting reportedBlock list", e); return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 3c624cde7b..1d6d4b9344 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -468,6 +468,19 @@ public class TestBalancer { static void waitForBalancer(long totalUsedSpace, long totalCapacity, ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p, int expectedExcludedNodes) throws IOException, TimeoutException { + waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes, true); + } + + /** + * Wait until balanced: each datanode gives utilization within. + * BALANCE_ALLOWED_VARIANCE of average + * @throws IOException + * @throws TimeoutException + */ + static void waitForBalancer(long totalUsedSpace, long totalCapacity, + ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p, + int expectedExcludedNodes, boolean checkExcludeNodesUtilization) + throws IOException, TimeoutException { long timeout = TIMEOUT; long failtime = (timeout <= 0L) ? Long.MAX_VALUE : Time.monotonicNow() + timeout; @@ -489,7 +502,9 @@ public class TestBalancer { double nodeUtilization = ((double)datanode.getDfsUsed()) / datanode.getCapacity(); if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) { - assertTrue(nodeUtilization == 0); + if (checkExcludeNodesUtilization) { + assertTrue(nodeUtilization == 0); + } actualExcludedNodeCount++; continue; } @@ -774,6 +789,12 @@ public class TestBalancer { private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity, BalancerParameters p, int excludedNodes) throws Exception { + runBalancer(conf, totalUsedSpace, totalCapacity, p, excludedNodes, true); + } + + private void runBalancer(Configuration conf, long totalUsedSpace, + long totalCapacity, BalancerParameters p, int excludedNodes, + boolean checkExcludeNodesUtilization) throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); int retry = 5; @@ -794,7 +815,7 @@ public class TestBalancer { LOG.info(" ."); try { waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, - excludedNodes); + excludedNodes, checkExcludeNodesUtilization); } catch (TimeoutException e) { // See HDFS-11682. NN may not get heartbeat to reflect the newest // block changes. @@ -1628,6 +1649,103 @@ public class TestBalancer { } } + @Test + public void testBalancerWithExcludeListWithStripedFile() throws Exception { + Configuration conf = new Configuration(); + initConfWithStripe(conf); + NameNodeConnector.setWrite2IdFile(true); + doTestBalancerWithExcludeListWithStripedFile(conf); + NameNodeConnector.setWrite2IdFile(false); + } + + private void doTestBalancerWithExcludeListWithStripedFile(Configuration conf) throws Exception { + int numOfDatanodes = dataBlocks + parityBlocks + 5; + int numOfRacks = dataBlocks; + long capacity = 20 * defaultBlockSize; + long[] capacities = new long[numOfDatanodes]; + Arrays.fill(capacities, capacity); + String[] racks = new String[numOfDatanodes]; + for (int i = 0; i < numOfDatanodes; i++) { + racks[i] = "/rack" + (i % numOfRacks); + } + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numOfDatanodes) + .racks(racks) + .simulatedCapacities(capacities) + .build(); + + try { + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), + ClientProtocol.class).getProxy(); + client.enableErasureCodingPolicy( + StripedFileTestUtil.getDefaultECPolicy().getName()); + client.setErasureCodingPolicy("/", + StripedFileTestUtil.getDefaultECPolicy().getName()); + + long totalCapacity = sum(capacities); + + // fill up the cluster with 30% data. It'll be 45% full plus parity. + long fileLen = totalCapacity * 3 / 10; + long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks; + FileSystem fs = cluster.getFileSystem(0); + DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong()); + + // verify locations of striped blocks + LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen); + StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); + + // get datanode report + DatanodeInfo[] datanodeReport = client.getDatanodeReport(DatanodeReportType.ALL); + long totalBlocks = 0; + for (DatanodeInfo dn : datanodeReport) { + totalBlocks += dn.getNumBlocks(); + } + + // add datanode in new rack + String newRack = "/rack" + (++numOfRacks); + cluster.startDataNodes(conf, 2, true, null, + new String[]{newRack, newRack}, null, + new long[]{capacity, capacity}); + totalCapacity += capacity*2; + cluster.triggerHeartbeats(); + + // add datanode to exclude list + Set excludedList = new HashSet<>(); + excludedList.add(datanodeReport[0].getXferAddr()); + BalancerParameters.Builder pBuilder = new BalancerParameters.Builder(); + pBuilder.setExcludedNodes(excludedList); + + // start balancer and check the failed num of moving task + runBalancer(conf, totalUsedSpace, totalCapacity, pBuilder.build(), + excludedList.size(), false); + + // check total blocks, max wait time 60s + final long blocksBeforeBalancer = totalBlocks; + GenericTestUtils.waitFor(() -> { + DatanodeInfo[] datanodeInfos = null; + try { + cluster.triggerHeartbeats(); + datanodeInfos = client.getDatanodeReport(DatanodeReportType.ALL); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + long blocksAfterBalancer = 0; + for (DatanodeInfo dn : datanodeInfos) { + blocksAfterBalancer += dn.getNumBlocks(); + } + return blocksBeforeBalancer == blocksAfterBalancer; + }, 3000, 60000); + + // verify locations of striped blocks + locatedBlocks = client.getBlockLocations(fileName, 0, fileLen); + StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); + + } finally { + cluster.shutdown(); + } + } + private void testNullStripedBlocks(Configuration conf) throws IOException { NameNodeConnector nnc = NameNodeConnector.newNameNodeConnectors( DFSUtil.getInternalNsRpcUris(conf),