Revert "HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He."
This reverts commit acae31aa28
.
This commit is contained in:
parent
032ccba67c
commit
53d22fdb88
@ -37,7 +37,6 @@
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
@ -689,7 +688,7 @@ Result runOneIteration() {
|
|||||||
* execute a {@link Balancer} to work through all datanodes once.
|
* execute a {@link Balancer} to work through all datanodes once.
|
||||||
*/
|
*/
|
||||||
static private int doBalance(Collection<URI> namenodes,
|
static private int doBalance(Collection<URI> namenodes,
|
||||||
Collection<String> nsIds, final BalancerParameters p, Configuration conf)
|
final BalancerParameters p, Configuration conf)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
final long sleeptime =
|
final long sleeptime =
|
||||||
conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
||||||
@ -708,12 +707,13 @@ static private int doBalance(Collection<URI> namenodes,
|
|||||||
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
|
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
|
||||||
|
|
||||||
List<NameNodeConnector> connectors = Collections.emptyList();
|
List<NameNodeConnector> connectors = Collections.emptyList();
|
||||||
boolean done = false;
|
try {
|
||||||
for(int iteration = 0; !done; iteration++) {
|
connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
|
||||||
try {
|
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
|
||||||
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
|
p.getMaxIdleIteration());
|
||||||
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
|
|
||||||
p.getMaxIdleIteration());
|
boolean done = false;
|
||||||
|
for(int iteration = 0; !done; iteration++) {
|
||||||
done = true;
|
done = true;
|
||||||
Collections.shuffle(connectors);
|
Collections.shuffle(connectors);
|
||||||
for(NameNodeConnector nnc : connectors) {
|
for(NameNodeConnector nnc : connectors) {
|
||||||
@ -741,25 +741,19 @@ static private int doBalance(Collection<URI> namenodes,
|
|||||||
if (!done) {
|
if (!done) {
|
||||||
Thread.sleep(sleeptime);
|
Thread.sleep(sleeptime);
|
||||||
}
|
}
|
||||||
} finally {
|
}
|
||||||
for(NameNodeConnector nnc : connectors) {
|
} finally {
|
||||||
IOUtils.cleanupWithLogger(LOG, nnc);
|
for(NameNodeConnector nnc : connectors) {
|
||||||
}
|
IOUtils.cleanupWithLogger(LOG, nnc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ExitStatus.SUCCESS.getExitCode();
|
return ExitStatus.SUCCESS.getExitCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
static int run(Collection<URI> namenodes, final BalancerParameters p,
|
static int run(Collection<URI> namenodes, final BalancerParameters p,
|
||||||
Configuration conf) throws IOException, InterruptedException {
|
Configuration conf) throws IOException, InterruptedException {
|
||||||
return run(namenodes, null, p, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int run(Collection<URI> namenodes, Collection<String> nsIds,
|
|
||||||
final BalancerParameters p, Configuration conf)
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
if (!p.getRunAsService()) {
|
if (!p.getRunAsService()) {
|
||||||
return doBalance(namenodes, nsIds, p, conf);
|
return doBalance(namenodes, p, conf);
|
||||||
}
|
}
|
||||||
if (!serviceRunning) {
|
if (!serviceRunning) {
|
||||||
serviceRunning = true;
|
serviceRunning = true;
|
||||||
@ -778,7 +772,7 @@ static int run(Collection<URI> namenodes, Collection<String> nsIds,
|
|||||||
|
|
||||||
while (serviceRunning) {
|
while (serviceRunning) {
|
||||||
try {
|
try {
|
||||||
int retCode = doBalance(namenodes, nsIds, p, conf);
|
int retCode = doBalance(namenodes, p, conf);
|
||||||
if (retCode < 0) {
|
if (retCode < 0) {
|
||||||
LOG.info("Balance failed, error code: " + retCode);
|
LOG.info("Balance failed, error code: " + retCode);
|
||||||
failedTimesSinceLastSuccessfulBalance++;
|
failedTimesSinceLastSuccessfulBalance++;
|
||||||
@ -862,8 +856,7 @@ public int run(String[] args) {
|
|||||||
checkReplicationPolicyCompatibility(conf);
|
checkReplicationPolicyCompatibility(conf);
|
||||||
|
|
||||||
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
final Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
|
return Balancer.run(namenodes, parse(args), conf);
|
||||||
return Balancer.run(namenodes, nsIds, parse(args), conf);
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
System.out.println(e + ". Exiting ...");
|
System.out.println(e + ". Exiting ...");
|
||||||
return ExitStatus.IO_EXCEPTION.getExitCode();
|
return ExitStatus.IO_EXCEPTION.getExitCode();
|
||||||
|
@ -25,7 +25,6 @@
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
@ -33,12 +32,7 @@
|
|||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.util.concurrent.RateLimiter;
|
import com.google.common.util.concurrent.RateLimiter;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.HAUtil;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
||||||
import org.apache.hadoop.ipc.RPC;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
@ -106,32 +100,6 @@ public static List<NameNodeConnector> newNameNodeConnectors(
|
|||||||
return connectors;
|
return connectors;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<NameNodeConnector> newNameNodeConnectors(
|
|
||||||
Collection<URI> namenodes, Collection<String> nsIds, String name,
|
|
||||||
Path idPath, Configuration conf, int maxIdleIterations)
|
|
||||||
throws IOException {
|
|
||||||
final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
|
|
||||||
namenodes.size());
|
|
||||||
Map<URI, String> uriToNsId = new HashMap<>();
|
|
||||||
if (nsIds != null) {
|
|
||||||
for (URI uri : namenodes) {
|
|
||||||
for (String nsId : nsIds) {
|
|
||||||
if (uri.getAuthority().equals(nsId)) {
|
|
||||||
uriToNsId.put(uri, nsId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (URI uri : namenodes) {
|
|
||||||
String nsId = uriToNsId.get(uri);
|
|
||||||
NameNodeConnector nnc = new NameNodeConnector(name, uri, nsId, idPath,
|
|
||||||
null, conf, maxIdleIterations);
|
|
||||||
nnc.getKeyManager().startBlockKeyUpdater();
|
|
||||||
connectors.add(nnc);
|
|
||||||
}
|
|
||||||
return connectors;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static void setWrite2IdFile(boolean write2IdFile) {
|
public static void setWrite2IdFile(boolean write2IdFile) {
|
||||||
NameNodeConnector.write2IdFile = write2IdFile;
|
NameNodeConnector.write2IdFile = write2IdFile;
|
||||||
@ -146,13 +114,6 @@ public static void checkOtherInstanceRunning(boolean toCheck) {
|
|||||||
private final String blockpoolID;
|
private final String blockpoolID;
|
||||||
|
|
||||||
private final BalancerProtocols namenode;
|
private final BalancerProtocols namenode;
|
||||||
/**
|
|
||||||
* If set balancerShouldRequestStandby true, Balancer will getBlocks from
|
|
||||||
* Standby NameNode only and it can reduce the performance impact of Active
|
|
||||||
* NameNode, especially in a busy HA mode cluster.
|
|
||||||
*/
|
|
||||||
private boolean balancerShouldRequestStandby;
|
|
||||||
private NamenodeProtocol standbyNameNode;
|
|
||||||
private final KeyManager keyManager;
|
private final KeyManager keyManager;
|
||||||
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
|
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
|
||||||
|
|
||||||
@ -188,11 +149,6 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
|
|||||||
|
|
||||||
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
|
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
|
||||||
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
|
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
|
||||||
this.balancerShouldRequestStandby = conf.getBoolean(
|
|
||||||
DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
|
|
||||||
DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT);
|
|
||||||
this.standbyNameNode = null;
|
|
||||||
|
|
||||||
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
|
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
|
||||||
|
|
||||||
final NamespaceInfo namespaceinfo = namenode.versionRequest();
|
final NamespaceInfo namespaceinfo = namenode.versionRequest();
|
||||||
@ -211,31 +167,6 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public NameNodeConnector(String name, URI nameNodeUri, String nsId,
|
|
||||||
Path idPath, List<Path> targetPaths,
|
|
||||||
Configuration conf, int maxNotChangedIterations)
|
|
||||||
throws IOException {
|
|
||||||
this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations);
|
|
||||||
if (nsId != null && HAUtil.isHAEnabled(conf, nsId)) {
|
|
||||||
List<ClientProtocol> namenodes =
|
|
||||||
HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId);
|
|
||||||
for (ClientProtocol proxy : namenodes) {
|
|
||||||
try {
|
|
||||||
if (proxy.getHAServiceState().equals(
|
|
||||||
HAServiceProtocol.HAServiceState.STANDBY)) {
|
|
||||||
this.standbyNameNode = NameNodeProxies.createNonHAProxy(
|
|
||||||
conf, RPC.getServerAddress(proxy), NamenodeProtocol.class,
|
|
||||||
UserGroupInformation.getCurrentUser(), false).getProxy();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
//Ignore the exception while connecting to a namenode.
|
|
||||||
LOG.debug("Error while connecting to namenode", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public DistributedFileSystem getDistributedFileSystem() {
|
public DistributedFileSystem getDistributedFileSystem() {
|
||||||
return fs;
|
return fs;
|
||||||
}
|
}
|
||||||
@ -255,22 +186,6 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
|
|||||||
if (getBlocksRateLimiter != null) {
|
if (getBlocksRateLimiter != null) {
|
||||||
getBlocksRateLimiter.acquire();
|
getBlocksRateLimiter.acquire();
|
||||||
}
|
}
|
||||||
boolean isRequestStandby = true;
|
|
||||||
try {
|
|
||||||
if (balancerShouldRequestStandby && standbyNameNode != null) {
|
|
||||||
return standbyNameNode.getBlocks(datanode, size, minBlockSize);
|
|
||||||
} else {
|
|
||||||
isRequestStandby = false;
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.warn("Request #getBlocks to Standby NameNode but meet exception, " +
|
|
||||||
"will fallback to normal way", e);
|
|
||||||
isRequestStandby = false;
|
|
||||||
} finally {
|
|
||||||
if (isRequestStandby) {
|
|
||||||
LOG.info("Request #getBlocks to Standby NameNode success.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return namenode.getBlocks(datanode, size, minBlockSize);
|
return namenode.getBlocks(datanode, size, minBlockSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,11 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.balancer;
|
package org.apache.hadoop.hdfs.server.balancer;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyLong;
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
@ -35,7 +31,6 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
@ -49,9 +44,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
|
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
|
||||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test balancer with HA NameNodes
|
* Test balancer with HA NameNodes
|
||||||
@ -113,12 +106,6 @@ void doTest(Configuration conf) throws Exception {
|
|||||||
TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
|
TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
|
||||||
/ numOfDatanodes, (short) numOfDatanodes, 0);
|
/ numOfDatanodes, (short) numOfDatanodes, 0);
|
||||||
|
|
||||||
boolean isRequestStandby = conf.getBoolean(
|
|
||||||
DFS_HA_ALLOW_STALE_READ_KEY, DFS_HA_ALLOW_STALE_READ_DEFAULT);
|
|
||||||
if (isRequestStandby) {
|
|
||||||
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
|
|
||||||
cluster.getNameNode(1));
|
|
||||||
}
|
|
||||||
// start up an empty node with the same capacity and on the same rack
|
// start up an empty node with the same capacity and on the same rack
|
||||||
long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
|
long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
|
||||||
String newNodeRack = TestBalancer.RACK2; // new node's rack
|
String newNodeRack = TestBalancer.RACK2; // new node's rack
|
||||||
@ -128,54 +115,13 @@ void doTest(Configuration conf) throws Exception {
|
|||||||
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
|
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
|
||||||
cluster);
|
cluster);
|
||||||
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
|
|
||||||
assertEquals(1, namenodes.size());
|
assertEquals(1, namenodes.size());
|
||||||
final int r = Balancer.run(namenodes, nsIds, BalancerParameters.DEFAULT,
|
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
||||||
conf);
|
|
||||||
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
||||||
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
|
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
|
||||||
cluster, BalancerParameters.DEFAULT);
|
cluster, BalancerParameters.DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Test Balancer request Standby NameNode when enable this feature.
|
|
||||||
*/
|
|
||||||
@Test(timeout = 60000)
|
|
||||||
public void testBalancerRequestSBNWithHA() throws Exception {
|
|
||||||
Configuration conf = new HdfsConfiguration();
|
|
||||||
conf.setBoolean(DFS_HA_ALLOW_STALE_READ_KEY, true);
|
|
||||||
conf.setLong(DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
|
||||||
//conf.setBoolean(DFS_HA_BALANCER_REQUEST_STANDBY_KEY, true);
|
|
||||||
TestBalancer.initConf(conf);
|
|
||||||
assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
|
|
||||||
NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
|
|
||||||
nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
|
|
||||||
Configuration copiedConf = new Configuration(conf);
|
|
||||||
cluster = new MiniDFSCluster.Builder(copiedConf)
|
|
||||||
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
|
||||||
.numDataNodes(TEST_CAPACITIES.length)
|
|
||||||
.racks(TEST_RACKS)
|
|
||||||
.simulatedCapacities(TEST_CAPACITIES)
|
|
||||||
.build();
|
|
||||||
// Try capture NameNodeConnector log.
|
|
||||||
LogCapturer log =LogCapturer.captureLogs(
|
|
||||||
LoggerFactory.getLogger(NameNodeConnector.class));
|
|
||||||
HATestUtil.setFailoverConfigurations(cluster, conf);
|
|
||||||
try {
|
|
||||||
cluster.waitActive();
|
|
||||||
cluster.transitionToActive(0);
|
|
||||||
Thread.sleep(500);
|
|
||||||
client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
|
|
||||||
ClientProtocol.class).getProxy();
|
|
||||||
doTest(conf);
|
|
||||||
// Check getBlocks request to Standby NameNode.
|
|
||||||
assertTrue(log.getOutput().contains(
|
|
||||||
"Request #getBlocks to Standby NameNode success."));
|
|
||||||
} finally {
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test Balancer with ObserverNodes.
|
* Test Balancer with ObserverNodes.
|
||||||
*/
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user