From c6107f566ff01e9bfee9052f86f6e5b21d5e89f3 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Fri, 5 Sep 2014 10:40:02 -0700 Subject: [PATCH] HDFS-6376. Distcp data between two HA clusters requires another configuration. Contributed by Dave Marion and Haohui Mai. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 1 + .../java/org/apache/hadoop/hdfs/DFSUtil.java | 80 +++++++++++++++++-- .../server/datanode/BlockPoolManager.java | 12 +-- .../org/apache/hadoop/hdfs/tools/GetConf.java | 4 +- .../src/main/resources/hdfs-default.xml | 10 +++ .../org/apache/hadoop/hdfs/TestDFSUtil.java | 26 ++++++ .../server/datanode/TestBlockPoolManager.java | 22 +++++ .../apache/hadoop/hdfs/tools/TestGetConf.java | 26 +++++- 9 files changed, 164 insertions(+), 20 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6a4cf2864b..d4059de9e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -441,6 +441,9 @@ Release 2.6.0 - UNRELEASED HDFS-6886. Use single editlog record for creating file + overwrite. (Yi Liu via jing9) + HDFS-6376. Distcp data between two HA clusters requires another configuration. + (Dave Marion and Haohui Mai via jing9) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b5b4f3c234..2f86ed6f1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -530,6 +530,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMESERVICES = "dfs.nameservices"; public static final String DFS_NAMESERVICE_ID = "dfs.nameservice.id"; + public static final String DFS_INTERNAL_NAMESERVICES_KEY = "dfs.internal.nameservices"; public static final String DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY = "dfs.namenode.resource.check.interval"; public static final int DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT = 5000; public static final String DFS_NAMENODE_DU_RESERVED_KEY = "dfs.namenode.resource.du.reserved"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 5559e0dbc9..021890b98c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -60,6 +60,7 @@ import javax.net.SocketFactory; +import com.google.common.collect.Sets; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Option; @@ -612,7 +613,7 @@ public static String addKeySuffixes(String key, String... suffixes) { String keySuffix = concatSuffixes(suffixes); return addSuffix(key, keySuffix); } - + /** * Returns the configured address for all NameNodes in the cluster. * @param conf configuration @@ -621,14 +622,25 @@ public static String addKeySuffixes(String key, String... suffixes) { * @return a map(nameserviceId to map(namenodeId to InetSocketAddress)) */ private static Map> - getAddresses(Configuration conf, - String defaultAddress, String... keys) { + getAddresses(Configuration conf, String defaultAddress, String... keys) { Collection nameserviceIds = getNameServiceIds(conf); - + return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys); + } + + /** + * Returns the configured address for all NameNodes in the cluster. + * @param conf configuration + * @param nsIds + *@param defaultAddress default address to return in case key is not found. + * @param keys Set of keys to look for in the order of preference @return a map(nameserviceId to map(namenodeId to InetSocketAddress)) + */ + private static Map> + getAddressesForNsIds(Configuration conf, Collection nsIds, + String defaultAddress, String... keys) { // Look for configurations of the form [.][.] // across all of the configured nameservices and namenodes. Map> ret = Maps.newLinkedHashMap(); - for (String nsId : emptyAsSingletonNull(nameserviceIds)) { + for (String nsId : emptyAsSingletonNull(nsIds)) { Map isas = getAddressesForNameserviceId(conf, nsId, defaultAddress, keys); if (!isas.isEmpty()) { @@ -773,8 +785,7 @@ public static Map> getSecondaryNameNodeAd /** * Returns list of InetSocketAddresses corresponding to namenodes from the - * configuration. Note this is to be used by datanodes to get the list of - * namenode addresses to talk to. + * configuration. * * Returns namenode address specifically configured for datanodes (using * service ports), if found. If not, regular RPC address configured for other @@ -805,7 +816,60 @@ public static Map> getNNServiceRpcAddress } return addressList; } - + + /** + * Returns list of InetSocketAddresses corresponding to the namenode + * that manages this cluster. Note this is to be used by datanodes to get + * the list of namenode addresses to talk to. + * + * Returns namenode address specifically configured for datanodes (using + * service ports), if found. If not, regular RPC address configured for other + * clients is returned. + * + * @param conf configuration + * @return list of InetSocketAddress + * @throws IOException on error + */ + public static Map> + getNNServiceRpcAddressesForCluster(Configuration conf) throws IOException { + // Use default address as fall back + String defaultAddress; + try { + defaultAddress = NetUtils.getHostPortString(NameNode.getAddress(conf)); + } catch (IllegalArgumentException e) { + defaultAddress = null; + } + + Collection parentNameServices = conf.getTrimmedStringCollection + (DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY); + + if (parentNameServices.isEmpty()) { + parentNameServices = conf.getTrimmedStringCollection + (DFSConfigKeys.DFS_NAMESERVICES); + } else { + // Ensure that the internal service is ineed in the list of all available + // nameservices. + Set availableNameServices = Sets.newHashSet(conf + .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES)); + for (String nsId : parentNameServices) { + if (!availableNameServices.contains(nsId)) { + throw new IOException("Unknown nameservice: " + nsId); + } + } + } + + Map> addressList = + getAddressesForNsIds(conf, parentNameServices, defaultAddress, + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY); + if (addressList.isEmpty()) { + throw new IOException("Incorrect configuration: namenode address " + + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or " + + DFS_NAMENODE_RPC_ADDRESS_KEY + + " is not configured."); + } + return addressList; + } + /** * Flatten the given map, as returned by other functions in this class, * into a flat list of {@link ConfiguredNNAddress} instances. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java index d14aab94c8..9f389952af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java @@ -149,12 +149,12 @@ void joinAll() { void refreshNamenodes(Configuration conf) throws IOException { - LOG.info("Refresh request received for nameservices: " - + conf.get(DFSConfigKeys.DFS_NAMESERVICES)); - - Map> newAddressMap = - DFSUtil.getNNServiceRpcAddresses(conf); - + LOG.info("Refresh request received for nameservices: " + conf.get + (DFSConfigKeys.DFS_NAMESERVICES)); + + Map> newAddressMap = DFSUtil + .getNNServiceRpcAddressesForCluster(conf); + synchronized (refreshNamenodesLock) { doRefreshNamenodes(newAddressMap); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java index 552123beda..92a16cd282 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java @@ -186,7 +186,7 @@ int doWorkInternal(GetConf tool, String[] args) throws Exception { static class NameNodesCommandHandler extends CommandHandler { @Override int doWorkInternal(GetConf tool, String []args) throws IOException { - tool.printMap(DFSUtil.getNNServiceRpcAddresses(tool.getConf())); + tool.printMap(DFSUtil.getNNServiceRpcAddressesForCluster(tool.getConf())); return 0; } } @@ -223,7 +223,7 @@ static class NNRpcAddressesCommandHandler extends CommandHandler { public int doWorkInternal(GetConf tool, String []args) throws IOException { Configuration config = tool.getConf(); List cnnlist = DFSUtil.flattenAddressMap( - DFSUtil.getNNServiceRpcAddresses(config)); + DFSUtil.getNNServiceRpcAddressesForCluster(config)); if (!cnnlist.isEmpty()) { for (ConfiguredNNAddress cnn : cnnlist) { InetSocketAddress rpc = cnn.getAddress(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 6d1eeccf19..9170ec69ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1115,6 +1115,16 @@ + + dfs.internal.nameservices + + + Comma-separated list of nameservices that belong to this cluster. + Datanode will report to all the nameservices in this list. By default + this is set to the value of dfs.nameservices. + + + dfs.ha.namenodes.EXAMPLENAMESERVICE diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java index caedca038e..5ffd3b5bd8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; @@ -865,4 +866,29 @@ public void testGetPassword() throws Exception { // let's make sure that a password that doesn't exist returns null Assert.assertEquals(null, DFSUtil.getPassword(conf,"invalid-alias")); } + + @Test + public void testGetNNServiceRpcAddressesForNsIds() throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.set(DFS_NAMESERVICES, "nn1,nn2"); + conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn1"); + // Test - configured list of namenodes are returned + final String NN1_ADDRESS = "localhost:9000"; + final String NN2_ADDRESS = "localhost:9001"; + conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn1"), + NN1_ADDRESS); + conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"), + NN2_ADDRESS); + + Map> nnMap = DFSUtil + .getNNServiceRpcAddressesForCluster(conf); + assertEquals(1, nnMap.size()); + assertTrue(nnMap.containsKey("nn1")); + conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn3"); + try { + DFSUtil.getNNServiceRpcAddressesForCluster(conf); + fail("Should fail for misconfiguration"); + } catch (IOException ignored) { + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java index 88d0c7dbcd..27e99dbf9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java @@ -23,15 +23,18 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -130,6 +133,25 @@ public void testFederationRefresh() throws Exception { "refresh #2\n", log.toString()); } + @Test + public void testInternalNameService() throws Exception { + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1,ns2,ns3"); + addNN(conf, "ns1", "mock1:8020"); + addNN(conf, "ns2", "mock1:8020"); + addNN(conf, "ns3", "mock1:8020"); + conf.set(DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY, "ns1"); + bpm.refreshNamenodes(conf); + assertEquals("create #1\n", log.toString()); + @SuppressWarnings("unchecked") + Map map = (Map) Whitebox + .getInternalState(bpm, "bpByNameserviceId"); + Assert.assertFalse(map.containsKey("ns2")); + Assert.assertFalse(map.containsKey("ns3")); + Assert.assertTrue(map.containsKey("ns1")); + log.setLength(0); + } + private static void addNN(Configuration conf, String ns, String addr) { String key = DFSUtil.addKeySuffixes( DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, ns); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java index 80b176f4bd..94ce6b22fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hdfs.tools; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -121,13 +123,13 @@ private Map> getAddressListFromConf( TestType type, HdfsConfiguration conf) throws IOException { switch (type) { case NAMENODE: - return DFSUtil.getNNServiceRpcAddresses(conf); + return DFSUtil.getNNServiceRpcAddressesForCluster(conf); case BACKUP: return DFSUtil.getBackupNodeAddresses(conf); case SECONDARY: return DFSUtil.getSecondaryNameNodeAddresses(conf); case NNRPCADDRESSES: - return DFSUtil.getNNServiceRpcAddresses(conf); + return DFSUtil.getNNServiceRpcAddressesForCluster(conf); } return null; } @@ -226,7 +228,7 @@ private void verifyAddresses(HdfsConfiguration conf, TestType type, String[] actual = toStringArray(list); Arrays.sort(actual); Arrays.sort(expected); - assertTrue(Arrays.equals(expected, actual)); + assertArrayEquals(expected, actual); // Test GetConf returned addresses getAddressListFromTool(type, conf, checkPort, list); @@ -425,7 +427,23 @@ public void TestGetConfIncludeCommand() throws Exception{ assertEquals(hostsFile.toUri().getPath(),ret.trim()); cleanupFile(localFileSys, excludeFile.getParent()); } - + + @Test + public void testIncludeInternalNameServices() throws Exception { + final int nsCount = 10; + final int remoteNsCount = 4; + HdfsConfiguration conf = new HdfsConfiguration(); + setupNameServices(conf, nsCount); + setupAddress(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsCount, 1000); + setupAddress(conf, DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1500); + conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "ns1"); + setupStaticHostResolution(nsCount); + + String[] includedNN = new String[] {"nn1:1001"}; + verifyAddresses(conf, TestType.NAMENODE, false, includedNN); + verifyAddresses(conf, TestType.NNRPCADDRESSES, true, includedNN); + } + private void writeConfigFile(Path name, ArrayList nodes) throws IOException { // delete if it already exists