From 8f5bd5f1bc25447f79400866fa921e2107a1fe73 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Mon, 10 Dec 2012 03:31:34 +0000 Subject: [PATCH] HDFS-4261. Fix bugs in Balancer that it does not terminate in some cases and it checks BlockPlacementPolicy instance incorrectly. Contributed by Junping Du git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1419192 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 4 ++ .../hadoop/hdfs/server/balancer/Balancer.java | 17 ++---- .../server/balancer/NameNodeConnector.java | 17 ++++++ .../balancer/TestBalancerWithNodeGroup.java | 59 +++++++++++++++++++ 4 files changed, 84 insertions(+), 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c89eac3723..c0d6a6a4ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -269,6 +269,10 @@ Trunk (Unreleased) HDFS-4260 Fix HDFS tests to set test dir to a valid HDFS path as opposed to the local build path (Chri Nauroth via Sanjay) + HDFS-4261. Fix bugs in Balancer that it does not terminate in some cases + and it checks BlockPlacementPolicy instance incorrectly. (Junping Du via + szetszwo) + Release 2.0.3-alpha - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 473f259234..b586a313dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -224,7 +224,6 @@ public class Balancer { = new HashMap(); private NetworkTopology cluster; - final static private int MOVER_THREAD_POOL_SIZE = 1000; final private ExecutorService moverExecutor = Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE); @@ -801,8 +800,8 @@ private void dispatchBlocks() { */ private static void checkReplicationPolicyCompatibility(Configuration conf ) throws UnsupportedActionException { - if (BlockPlacementPolicy.getInstance(conf, null, null) instanceof - BlockPlacementPolicyDefault) { + if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof + BlockPlacementPolicyDefault)) { throw new UnsupportedActionException( "Balancer without BlockPlacementPolicyDefault"); } @@ -1085,7 +1084,6 @@ private long get() { } }; private BytesMoved bytesMoved = new BytesMoved(); - private int notChangedIterations = 0; /* Start a thread to dispatch block moves for each source. * The thread selects blocks to move & sends request to proxy source to @@ -1384,15 +1382,8 @@ private ReturnStatus run(int iteration, Formatter formatter, * available to move. * Exit no byte has been moved for 5 consecutive iterations. */ - if (dispatchBlockMoves() > 0) { - notChangedIterations = 0; - } else { - notChangedIterations++; - if (notChangedIterations >= 5) { - System.out.println( - "No block has been moved for 5 iterations. Exiting..."); - return ReturnStatus.NO_MOVE_PROGRESS; - } + if (!this.nnc.shouldContinue(dispatchBlockMoves())) { + return ReturnStatus.NO_MOVE_PROGRESS; } // clean all lists diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 13709458aa..530a3b7e78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -52,6 +52,7 @@ class NameNodeConnector { private static final Log LOG = Balancer.LOG; private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); + private static final int MAX_NOT_CHANGED_INTERATIONS = 5; final URI nameNodeUri; final String blockpoolID; @@ -65,6 +66,8 @@ class NameNodeConnector { private final boolean encryptDataTransfer; private boolean shouldRun; private long keyUpdaterInterval; + // used for balancer + private int notChangedIterations = 0; private BlockTokenSecretManager blockTokenSecretManager; private Daemon keyupdaterthread; // AccessKeyUpdater thread private DataEncryptionKey encryptionKey; @@ -119,6 +122,20 @@ class NameNodeConnector { } } + boolean shouldContinue(long dispatchBlockMoveBytes) { + if (dispatchBlockMoveBytes > 0) { + notChangedIterations = 0; + } else { + notChangedIterations++; + if (notChangedIterations >= MAX_NOT_CHANGED_INTERATIONS) { + System.out.println("No block has been moved for " + + notChangedIterations + " iterations. Exiting..."); + return false; + } + } + return true; + } + /** Get an access token for a block. */ Token getAccessToken(ExtendedBlock eb ) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java index 15cd7d7086..41d17afb30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.net.NetworkTopology; import org.junit.Test; +import junit.framework.Assert; /** * This class tests if a balancer schedules tasks correctly. @@ -174,6 +175,19 @@ private void runBalancer(Configuration conf, LOG.info("Rebalancing with default factor."); waitForBalancer(totalUsedSpace, totalCapacity); } + + private void runBalancerCanFinish(Configuration conf, + long totalUsedSpace, long totalCapacity) throws Exception { + waitForHeartBeat(totalUsedSpace, totalCapacity); + + // start rebalancing + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); + Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code || + (r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code)); + waitForHeartBeat(totalUsedSpace, totalCapacity); + LOG.info("Rebalancing with default factor."); + } /** * Create a cluster with even distribution, and a new empty node is added to @@ -289,4 +303,49 @@ public void testBalancerWithNodeGroup() throws Exception { cluster.shutdown(); } } + + /** + * Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2) + * in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster + * to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according + * to replica placement policy with NodeGroup. As a result, n2 and n3 will be + * filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3 + * to n0 or n1 as balancer policy with node group. Thus, we expect the balancer + * to end in 5 iterations without move block process. + */ + @Test + public void testBalancerEndInNoMoveProgress() throws Exception { + Configuration conf = createConf(); + long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY}; + String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1}; + String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2}; + + int numOfDatanodes = capacities.length; + assertEquals(numOfDatanodes, racks.length); + assertEquals(numOfDatanodes, nodeGroups.length); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf) + .numDataNodes(capacities.length) + .racks(racks) + .simulatedCapacities(capacities); + MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups); + cluster = new MiniDFSClusterWithNodeGroup(builder); + try { + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), + ClientProtocol.class).getProxy(); + + long totalCapacity = TestBalancer.sum(capacities); + // fill up the cluster to be 60% full + long totalUsedSpace = totalCapacity * 6 / 10; + TestBalancer.createFile(cluster, filePath, totalUsedSpace / 3, + (short) (3), 0); + + // run balancer which can finish in 5 iterations with no block movement. + runBalancerCanFinish(conf, totalUsedSpace, totalCapacity); + + } finally { + cluster.shutdown(); + } + } }