HDFS-2592. Balancer support for HA namenodes. Contributed by Uma Maheswara Rao G.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1232531 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4c7a6c6c3f
commit
a380dc8732
@ -113,3 +113,5 @@ HDFS-2772. On transition to active, standby should not swallow ELIE. (atm)
|
||||
HDFS-2767. ConfiguredFailoverProxyProvider should support NameNodeProtocol. (Uma Maheswara Rao G via todd)
|
||||
|
||||
HDFS-2795. Standby NN takes a long time to recover from a dead DN starting up. (todd)
|
||||
|
||||
HDFS-2592. Balancer support for HA namenodes. (Uma Maheswara Rao G via todd)
|
||||
|
@ -22,11 +22,9 @@
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
@ -34,11 +32,10 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
@ -46,13 +43,7 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryProxy;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
@ -83,13 +74,24 @@ class NameNodeConnector {
|
||||
|
||||
NameNodeConnector(Collection<InetSocketAddress> haNNs,
|
||||
Configuration conf) throws IOException {
|
||||
InetSocketAddress nn = Lists.newArrayList(haNNs).get(0);
|
||||
// TODO(HA): need to deal with connecting to HA NN pair here
|
||||
this.namenodeAddress = nn;
|
||||
this.namenode = DFSUtil.createNNProxyWithNamenodeProtocol(nn, conf,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
this.client = DFSUtil.createNamenode(conf);
|
||||
this.fs = FileSystem.get(NameNode.getUri(nn), conf);
|
||||
this.namenodeAddress = Lists.newArrayList(haNNs).get(0);
|
||||
URI nameNodeUri = NameNode.getUri(this.namenodeAddress);
|
||||
NamenodeProtocol failoverNamenode = (NamenodeProtocol) HAUtil
|
||||
.createFailoverProxy(conf, nameNodeUri, NamenodeProtocol.class);
|
||||
if (null != failoverNamenode) {
|
||||
this.namenode = failoverNamenode;
|
||||
} else {
|
||||
this.namenode = DFSUtil.createNNProxyWithNamenodeProtocol(
|
||||
this.namenodeAddress, conf, UserGroupInformation.getCurrentUser());
|
||||
}
|
||||
ClientProtocol failOverClient = (ClientProtocol) HAUtil
|
||||
.createFailoverProxy(conf, nameNodeUri, ClientProtocol.class);
|
||||
if (null != failOverClient) {
|
||||
this.client = failOverClient;
|
||||
} else {
|
||||
this.client = DFSUtil.createNamenode(conf);
|
||||
}
|
||||
this.fs = FileSystem.get(nameNodeUri, conf);
|
||||
|
||||
final NamespaceInfo namespaceinfo = namenode.versionRequest();
|
||||
this.blockpoolID = namespaceinfo.getBlockPoolID();
|
||||
|
@ -329,7 +329,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
|
||||
throw new IllegalArgumentException(
|
||||
"Unexpected not positive size: "+size);
|
||||
}
|
||||
|
||||
namesystem.checkOperation(OperationCategory.READ);
|
||||
return namesystem.getBlockManager().getBlocks(datanode, size);
|
||||
}
|
||||
|
||||
|
@ -42,24 +42,23 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
|
||||
/**
|
||||
* This class tests if a balancer schedules tasks correctly.
|
||||
*/
|
||||
public class TestBalancer extends TestCase {
|
||||
private static final Log LOG = LogFactory.getLog(
|
||||
"org.apache.hadoop.hdfs.TestReplication");
|
||||
"org.apache.hadoop.hdfs.TestBalancer");
|
||||
|
||||
final private static long CAPACITY = 500L;
|
||||
final private static String RACK0 = "/rack0";
|
||||
final private static String RACK1 = "/rack1";
|
||||
final private static String RACK2 = "/rack2";
|
||||
final static private String fileName = "/tmp.txt";
|
||||
final static private Path filePath = new Path(fileName);
|
||||
final static long CAPACITY = 500L;
|
||||
final static String RACK0 = "/rack0";
|
||||
final static String RACK1 = "/rack1";
|
||||
final static String RACK2 = "/rack2";
|
||||
final private static String fileName = "/tmp.txt";
|
||||
final static Path filePath = new Path(fileName);
|
||||
private MiniDFSCluster cluster;
|
||||
|
||||
ClientProtocol client;
|
||||
@ -83,9 +82,10 @@ static void initConf(Configuration conf) {
|
||||
}
|
||||
|
||||
/* create a file with a length of <code>fileLen</code> */
|
||||
private void createFile(long fileLen, short replicationFactor)
|
||||
static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
|
||||
short replicationFactor, int nnIndex)
|
||||
throws IOException {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
FileSystem fs = cluster.getFileSystem(nnIndex);
|
||||
DFSTestUtil.createFile(fs, filePath, fileLen,
|
||||
replicationFactor, r.nextLong());
|
||||
DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
|
||||
@ -104,7 +104,7 @@ private ExtendedBlock[] generateBlocks(Configuration conf, long size,
|
||||
|
||||
short replicationFactor = (short)(numNodes-1);
|
||||
long fileLen = size/replicationFactor;
|
||||
createFile(fileLen, replicationFactor);
|
||||
createFile(cluster , filePath, fileLen, replicationFactor, 0);
|
||||
|
||||
List<LocatedBlock> locatedBlocks = client.
|
||||
getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
|
||||
@ -212,7 +212,8 @@ private void testUnevenDistribution(Configuration conf,
|
||||
* @throws IOException - if getStats() fails
|
||||
* @throws TimeoutException
|
||||
*/
|
||||
private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)
|
||||
static void waitForHeartBeat(long expectedUsedSpace,
|
||||
long expectedTotalSpace, ClientProtocol client, MiniDFSCluster cluster)
|
||||
throws IOException, TimeoutException {
|
||||
long timeout = TIMEOUT;
|
||||
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
|
||||
@ -249,7 +250,8 @@ private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)
|
||||
* @throws IOException
|
||||
* @throws TimeoutException
|
||||
*/
|
||||
private void waitForBalancer(long totalUsedSpace, long totalCapacity)
|
||||
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
ClientProtocol client, MiniDFSCluster cluster)
|
||||
throws IOException, TimeoutException {
|
||||
long timeout = TIMEOUT;
|
||||
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
|
||||
@ -312,7 +314,8 @@ private void doTest(Configuration conf, long[] capacities, String[] racks,
|
||||
|
||||
// fill up the cluster to be 30% full
|
||||
long totalUsedSpace = totalCapacity*3/10;
|
||||
createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
|
||||
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
|
||||
(short) numOfDatanodes, 0);
|
||||
// start up an empty node with the same capacity and on the same rack
|
||||
cluster.startDataNodes(conf, 1, true, null,
|
||||
new String[]{newRack}, new long[]{newCapacity});
|
||||
@ -328,7 +331,7 @@ private void doTest(Configuration conf, long[] capacities, String[] racks,
|
||||
|
||||
private void runBalancer(Configuration conf,
|
||||
long totalUsedSpace, long totalCapacity) throws Exception {
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
|
||||
// start rebalancing
|
||||
Map<String, Map<String, InetSocketAddress>> namenodes =
|
||||
@ -336,9 +339,9 @@ private void runBalancer(Configuration conf,
|
||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
|
||||
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
||||
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
LOG.info("Rebalancing with default ctor.");
|
||||
waitForBalancer(totalUsedSpace, totalCapacity);
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
|
||||
}
|
||||
|
||||
/** one-node cluster test*/
|
||||
@ -403,7 +406,8 @@ private void testBalancerDefaultConstructor(Configuration conf,
|
||||
|
||||
// fill up the cluster to be 30% full
|
||||
long totalUsedSpace = totalCapacity * 3 / 10;
|
||||
createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);
|
||||
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
|
||||
(short) numOfDatanodes, 0);
|
||||
// start up an empty node with the same capacity and on the same rack
|
||||
cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
|
||||
new long[] { newCapacity });
|
||||
|
@ -0,0 +1,105 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.balancer;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test balancer with HA NameNodes
|
||||
*/
|
||||
public class TestBalancerWithHANameNodes {
|
||||
private MiniDFSCluster cluster;
|
||||
ClientProtocol client;
|
||||
|
||||
static {
|
||||
Balancer.setBlockMoveWaitTime(1000L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution, then a new empty node is added to
|
||||
* the cluster. Test start a cluster with specified number of nodes, and fills
|
||||
* it to be 30% full (with a single file replicated identically to all
|
||||
* datanodes); It then adds one new empty node and starts balancing.
|
||||
*/
|
||||
@Test(timeout = 60000)
|
||||
public void testBalancerWithHANameNodes() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
TestBalancer.initConf(conf);
|
||||
long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
|
||||
String newNodeRack = TestBalancer.RACK2; // new node's rack
|
||||
// array of racks for original nodes in cluster
|
||||
String[] racks = new String[] { TestBalancer.RACK0, TestBalancer.RACK1 };
|
||||
// array of capacities of original nodes in cluster
|
||||
long[] capacities = new long[] { TestBalancer.CAPACITY,
|
||||
TestBalancer.CAPACITY };
|
||||
assertEquals(capacities.length, racks.length);
|
||||
int numOfDatanodes = capacities.length;
|
||||
NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
|
||||
nn1Conf.setIpcPort(NameNode.DEFAULT_PORT);
|
||||
MiniDFSNNTopology simpleHATopology = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf(null).addNN(nn1Conf)
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn2")));
|
||||
cluster = new MiniDFSCluster.Builder(conf).nnTopology(simpleHATopology)
|
||||
.numDataNodes(capacities.length).racks(racks).simulatedCapacities(
|
||||
capacities).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
cluster.transitionToActive(1);
|
||||
Thread.sleep(500);
|
||||
client = DFSUtil.createNamenode(cluster.getNameNode(1)
|
||||
.getNameNodeAddress(), conf);
|
||||
long totalCapacity = TestBalancer.sum(capacities);
|
||||
// fill up the cluster to be 30% full
|
||||
long totalUsedSpace = totalCapacity * 3 / 10;
|
||||
TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
|
||||
/ numOfDatanodes, (short) numOfDatanodes, 1);
|
||||
|
||||
// start up an empty node with the same capacity and on the same rack
|
||||
cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack },
|
||||
new long[] { newNodeCapacity });
|
||||
|
||||
HATestUtil.setFailoverConfigurations(cluster, conf, NameNode.getUri(
|
||||
cluster.getNameNode(0).getNameNodeAddress()).getHost());
|
||||
totalCapacity += newNodeCapacity;
|
||||
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
|
||||
cluster);
|
||||
Map<String, Map<String, InetSocketAddress>> namenodes = DFSUtil
|
||||
.getNNServiceRpcAddresses(conf);
|
||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
|
||||
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
||||
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
|
||||
cluster);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
@ -128,33 +128,35 @@ public CouldNotCatchUpException(String message) {
|
||||
}
|
||||
}
|
||||
|
||||
/** Gets the filesystem instance by setting the failover configurations */
|
||||
public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
|
||||
throws IOException, URISyntaxException {
|
||||
conf = new Configuration(conf);
|
||||
String logicalName = getLogicalHostname(cluster);
|
||||
setFailoverConfigurations(cluster, conf, logicalName);
|
||||
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
|
||||
return fs;
|
||||
}
|
||||
|
||||
/** Sets the required configurations for performing failover */
|
||||
public static void setFailoverConfigurations(MiniDFSCluster cluster,
|
||||
Configuration conf, String logicalName) {
|
||||
InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress();
|
||||
InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress();
|
||||
|
||||
String nsId = "nameserviceId1";
|
||||
|
||||
String nameNodeId1 = "nn1";
|
||||
String nameNodeId2 = "nn2";
|
||||
String logicalName = getLogicalHostname(cluster);
|
||||
|
||||
conf = new Configuration(conf);
|
||||
String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
|
||||
String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
nsId, nameNodeId1), address1);
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
nsId, nameNodeId2), address2);
|
||||
|
||||
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nsId);
|
||||
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, nsId),
|
||||
nameNodeId1 + "," + nameNodeId2);
|
||||
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
|
||||
ConfiguredFailoverProxyProvider.class.getName());
|
||||
|
||||
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
|
||||
return fs;
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user