diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index f643590f75..e8b49718fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -37,7 +37,6 @@ import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hdfs.DFSUtilClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -689,7 +688,7 @@ Result runOneIteration() { * execute a {@link Balancer} to work through all datanodes once. */ static private int doBalance(Collection namenodes, - Collection nsIds, final BalancerParameters p, Configuration conf) + final BalancerParameters p, Configuration conf) throws IOException, InterruptedException { final long sleeptime = conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, @@ -708,12 +707,13 @@ static private int doBalance(Collection namenodes, System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); List connectors = Collections.emptyList(); - boolean done = false; - for(int iteration = 0; !done; iteration++) { - try { - connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds, - Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, - p.getMaxIdleIteration()); + try { + connectors = NameNodeConnector.newNameNodeConnectors(namenodes, + Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, + p.getMaxIdleIteration()); + + boolean done = false; + for(int iteration = 0; !done; iteration++) { done = true; Collections.shuffle(connectors); for(NameNodeConnector nnc : connectors) { @@ -741,25 +741,19 @@ static private int doBalance(Collection namenodes, if (!done) { Thread.sleep(sleeptime); } - } finally { - for(NameNodeConnector nnc : connectors) { - IOUtils.cleanupWithLogger(LOG, nnc); - } + } + } finally { + for(NameNodeConnector nnc : connectors) { + IOUtils.cleanupWithLogger(LOG, nnc); } } return ExitStatus.SUCCESS.getExitCode(); } static int run(Collection namenodes, final BalancerParameters p, - Configuration conf) throws IOException, InterruptedException { - return run(namenodes, null, p, conf); - } - - static int run(Collection namenodes, Collection nsIds, - final BalancerParameters p, Configuration conf) - throws IOException, InterruptedException { + Configuration conf) throws IOException, InterruptedException { if (!p.getRunAsService()) { - return doBalance(namenodes, nsIds, p, conf); + return doBalance(namenodes, p, conf); } if (!serviceRunning) { serviceRunning = true; @@ -778,7 +772,7 @@ static int run(Collection namenodes, Collection nsIds, while (serviceRunning) { try { - int retCode = doBalance(namenodes, nsIds, p, conf); + int retCode = doBalance(namenodes, p, conf); if (retCode < 0) { LOG.info("Balance failed, error code: " + retCode); failedTimesSinceLastSuccessfulBalance++; @@ -862,8 +856,7 @@ public int run(String[] args) { checkReplicationPolicyCompatibility(conf); final Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); - final Collection nsIds = DFSUtilClient.getNameServiceIds(conf); - return Balancer.run(namenodes, nsIds, parse(args), conf); + return Balancer.run(namenodes, parse(args), conf); } catch (IOException e) { System.out.println(e + ". Exiting ..."); return ExitStatus.IO_EXCEPTION.getExitCode(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 8403f82c26..2844ad5a94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,12 +32,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.RateLimiter; -import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.HAUtil; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -106,32 +100,6 @@ public static List newNameNodeConnectors( return connectors; } - public static List newNameNodeConnectors( - Collection namenodes, Collection nsIds, String name, - Path idPath, Configuration conf, int maxIdleIterations) - throws IOException { - final List connectors = new ArrayList( - namenodes.size()); - Map uriToNsId = new HashMap<>(); - if (nsIds != null) { - for (URI uri : namenodes) { - for (String nsId : nsIds) { - if (uri.getAuthority().equals(nsId)) { - uriToNsId.put(uri, nsId); - } - } - } - } - for (URI uri : namenodes) { - String nsId = uriToNsId.get(uri); - NameNodeConnector nnc = new NameNodeConnector(name, uri, nsId, idPath, - null, conf, maxIdleIterations); - nnc.getKeyManager().startBlockKeyUpdater(); - connectors.add(nnc); - } - return connectors; - } - @VisibleForTesting public static void setWrite2IdFile(boolean write2IdFile) { NameNodeConnector.write2IdFile = write2IdFile; @@ -146,13 +114,6 @@ public static void checkOtherInstanceRunning(boolean toCheck) { private final String blockpoolID; private final BalancerProtocols namenode; - /** - * If set balancerShouldRequestStandby true, Balancer will getBlocks from - * Standby NameNode only and it can reduce the performance impact of Active - * NameNode, especially in a busy HA mode cluster. - */ - private boolean balancerShouldRequestStandby; - private NamenodeProtocol standbyNameNode; private final KeyManager keyManager; final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false); @@ -188,11 +149,6 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath, this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, BalancerProtocols.class, fallbackToSimpleAuth).getProxy(); - this.balancerShouldRequestStandby = conf.getBoolean( - DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY, - DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT); - this.standbyNameNode = null; - this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); @@ -211,31 +167,6 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath, } } - public NameNodeConnector(String name, URI nameNodeUri, String nsId, - Path idPath, List targetPaths, - Configuration conf, int maxNotChangedIterations) - throws IOException { - this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations); - if (nsId != null && HAUtil.isHAEnabled(conf, nsId)) { - List namenodes = - HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId); - for (ClientProtocol proxy : namenodes) { - try { - if (proxy.getHAServiceState().equals( - HAServiceProtocol.HAServiceState.STANDBY)) { - this.standbyNameNode = NameNodeProxies.createNonHAProxy( - conf, RPC.getServerAddress(proxy), NamenodeProtocol.class, - UserGroupInformation.getCurrentUser(), false).getProxy(); - break; - } - } catch (Exception e) { - //Ignore the exception while connecting to a namenode. - LOG.debug("Error while connecting to namenode", e); - } - } - } - } - public DistributedFileSystem getDistributedFileSystem() { return fs; } @@ -255,22 +186,6 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long if (getBlocksRateLimiter != null) { getBlocksRateLimiter.acquire(); } - boolean isRequestStandby = true; - try { - if (balancerShouldRequestStandby && standbyNameNode != null) { - return standbyNameNode.getBlocks(datanode, size, minBlockSize); - } else { - isRequestStandby = false; - } - } catch (Exception e) { - LOG.warn("Request #getBlocks to Standby NameNode but meet exception, " + - "will fallback to normal way", e); - isRequestStandby = false; - } finally { - if (isRequestStandby) { - LOG.info("Request #getBlocks to Standby NameNode success."); - } - } return namenode.getBlocks(datanode, size, minBlockSize); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java index 185df1246d..c604315fb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -17,11 +17,7 @@ */ package org.apache.hadoop.hdfs.server.balancer; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.times; @@ -35,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -49,9 +44,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; -import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.junit.Test; -import org.slf4j.LoggerFactory; /** * Test balancer with HA NameNodes @@ -113,12 +106,6 @@ void doTest(Configuration conf) throws Exception { TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace / numOfDatanodes, (short) numOfDatanodes, 0); - boolean isRequestStandby = conf.getBoolean( - DFS_HA_ALLOW_STALE_READ_KEY, DFS_HA_ALLOW_STALE_READ_DEFAULT); - if (isRequestStandby) { - HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0), - cluster.getNameNode(1)); - } // start up an empty node with the same capacity and on the same rack long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity String newNodeRack = TestBalancer.RACK2; // new node's rack @@ -128,54 +115,13 @@ void doTest(Configuration conf) throws Exception { TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); - Collection nsIds = DFSUtilClient.getNameServiceIds(conf); assertEquals(1, namenodes.size()); - final int r = Balancer.run(namenodes, nsIds, BalancerParameters.DEFAULT, - conf); + final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, BalancerParameters.DEFAULT); } - /** - * Test Balancer request Standby NameNode when enable this feature. - */ - @Test(timeout = 60000) - public void testBalancerRequestSBNWithHA() throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.setBoolean(DFS_HA_ALLOW_STALE_READ_KEY, true); - conf.setLong(DFS_HA_TAILEDITS_PERIOD_KEY, 1); - //conf.setBoolean(DFS_HA_BALANCER_REQUEST_STANDBY_KEY, true); - TestBalancer.initConf(conf); - assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length); - NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1"); - nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT); - Configuration copiedConf = new Configuration(conf); - cluster = new MiniDFSCluster.Builder(copiedConf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()) - .numDataNodes(TEST_CAPACITIES.length) - .racks(TEST_RACKS) - .simulatedCapacities(TEST_CAPACITIES) - .build(); - // Try capture NameNodeConnector log. - LogCapturer log =LogCapturer.captureLogs( - LoggerFactory.getLogger(NameNodeConnector.class)); - HATestUtil.setFailoverConfigurations(cluster, conf); - try { - cluster.waitActive(); - cluster.transitionToActive(0); - Thread.sleep(500); - client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), - ClientProtocol.class).getProxy(); - doTest(conf); - // Check getBlocks request to Standby NameNode. - assertTrue(log.getOutput().contains( - "Request #getBlocks to Standby NameNode success.")); - } finally { - cluster.shutdown(); - } - } - /** * Test Balancer with ObserverNodes. */