From 1804a31515e541b3371925aa895589919b54d443 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Mon, 2 Jul 2018 10:48:20 -0700 Subject: [PATCH] HDFS-13536. [PROVIDED Storage] HA for InMemoryAliasMap. Contributed by Virajith Jalaparti. --- .../org/apache/hadoop/hdfs/DFSUtilClient.java | 4 +- .../hdfs/client/HdfsClientConfigKeys.java | 3 + .../ha/ConfiguredFailoverProxyProvider.java | 9 +- ...InMemoryAliasMapFailoverProxyProvider.java | 38 ++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 +- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 37 +- .../apache/hadoop/hdfs/NameNodeProxies.java | 15 +- ...liasMapProtocolClientSideTranslatorPB.java | 95 ++++- .../aliasmap/InMemoryAliasMapProtocol.java | 5 + .../InMemoryLevelDBAliasMapServer.java | 19 +- .../impl/InMemoryLevelDBAliasMapClient.java | 80 ++-- .../src/main/resources/hdfs-default.xml | 22 +- .../apache/hadoop/hdfs/MiniDFSCluster.java | 13 +- .../apache/hadoop/hdfs/MiniDFSNNTopology.java | 2 +- .../TestInMemoryLevelDBAliasMapClient.java | 7 + .../namenode/ITestProvidedImplementation.java | 371 +++++++++++++++--- 16 files changed, 615 insertions(+), 110 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/InMemoryAliasMapFailoverProxyProvider.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 313b973550..3fac7c8c10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -396,7 +396,7 @@ static String concatSuffixes(String... suffixes) { * @param keys Set of keys to look for in the order of preference * @return a map(nameserviceId to map(namenodeId to InetSocketAddress)) */ - static Map> getAddresses( + public static Map> getAddresses( Configuration conf, String defaultAddress, String... keys) { Collection nameserviceIds = getNameServiceIds(conf); return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys); @@ -426,7 +426,7 @@ static Map> getAddressesForNsIds( return ret; } - static Map getAddressesForNameserviceId( + public static Map getAddressesForNameserviceId( Configuration conf, String nsId, String defaultValue, String... keys) { Collection nnIds = getNameNodeIds(conf, nsId); Map ret = Maps.newLinkedHashMap(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index f2cec314ff..a8126700d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -184,6 +184,9 @@ public interface HdfsClientConfigKeys { "dfs.namenode.snapshot.capture.openfiles"; boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT = false; + String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS = + "dfs.provided.aliasmap.inmemory.dnrpc-address"; + /** * These are deprecated config keys to client code. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index 96722fcfab..f46532ad97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -37,6 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; + /** * A FailoverProxyProvider implementation which allows one to configure * multiple URIs to connect to during fail-over. A random configured address is @@ -60,6 +62,11 @@ public class ConfiguredFailoverProxyProvider extends public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, Class xface, HAProxyFactory factory) { + this(conf, uri, xface, factory, DFS_NAMENODE_RPC_ADDRESS_KEY); + } + + public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, + Class xface, HAProxyFactory factory, String addressKey) { this.xface = xface; this.conf = new Configuration(conf); int maxRetries = this.conf.getInt( @@ -81,7 +88,7 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, ugi = UserGroupInformation.getCurrentUser(); Map> map = - DFSUtilClient.getHaNnRpcAddresses(conf); + DFSUtilClient.getAddresses(conf, null, addressKey); Map addressesInNN = map.get(uri.getHost()); if (addressesInNN == null || addressesInNN.size() == 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/InMemoryAliasMapFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/InMemoryAliasMapFailoverProxyProvider.java new file mode 100644 index 0000000000..6525942341 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/InMemoryAliasMapFailoverProxyProvider.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.conf.Configuration; + +import java.net.URI; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS; + +/** + * A {@link ConfiguredFailoverProxyProvider} implementation used to connect + * to an InMemoryAliasMap. + */ +public class InMemoryAliasMapFailoverProxyProvider + extends ConfiguredFailoverProxyProvider { + + public InMemoryAliasMapFailoverProxyProvider( + Configuration conf, URI uri, Class xface, HAProxyFactory factory) { + super(conf, uri, xface, factory, + DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index dde7eb79c2..cc902b0077 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -86,8 +86,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY; public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50105"; public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address"; - public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS = "dfs.provided.aliasmap.inmemory.dnrpc-address"; + public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS = + HdfsClientConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS; public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT = "0.0.0.0:50200"; + public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST = "dfs.provided.aliasmap.inmemory.rpc.bind-host"; + public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR = "dfs.provided.aliasmap.inmemory.leveldb.dir"; public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE = "dfs.provided.aliasmap.inmemory.batch-size"; public static final int DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE_DEFAULT = 500; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 4c94e380ca..f7cd32b558 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -1130,7 +1130,42 @@ private static String getNameServiceId(Configuration conf, String addressKey) { return getSuffixIDs(conf, addressKey, null, nnId, LOCAL_ADDRESS_MATCHER)[0]; } - + + /** + * Determine the {@link InetSocketAddress} to bind to, for any service. + * In case of HA or federation, the address is assumed to specified as + * {@code confKey}.NAMESPACEID.NAMENODEID as appropriate. + * + * @param conf configuration. + * @param confKey configuration key (prefix if HA/federation) used to + * specify the address for the service. + * @param defaultValue default value for the address. + * @param bindHostKey configuration key (prefix if HA/federation) + * specifying host to bind to. + * @return the address to bind to. + */ + public static InetSocketAddress getBindAddress(Configuration conf, + String confKey, String defaultValue, String bindHostKey) { + InetSocketAddress address; + String nsId = DFSUtil.getNamenodeNameServiceId(conf); + String bindHostActualKey; + if (nsId != null) { + String namenodeId = HAUtil.getNameNodeId(conf, nsId); + address = DFSUtilClient.getAddressesForNameserviceId( + conf, nsId, null, confKey).get(namenodeId); + bindHostActualKey = DFSUtil.addKeySuffixes(bindHostKey, nsId, namenodeId); + } else { + address = NetUtils.createSocketAddr(conf.get(confKey, defaultValue)); + bindHostActualKey = bindHostKey; + } + + String bindHost = conf.get(bindHostActualKey); + if (bindHost == null || bindHost.isEmpty()) { + bindHost = address.getHostName(); + } + return new InetSocketAddress(bindHost, address.getPort()); + } + /** * Returns nameservice Id and namenode Id when the local host matches the * configuration parameter {@code addressKey}.. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index d556c907c4..b63d26b85a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -31,10 +31,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocolPB.AliasMapProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.InMemoryAliasMapProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol; import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; @@ -184,6 +187,8 @@ public static ProxyAndInfo createNonHAProxy( conf, ugi); } else if (xface == RefreshCallQueueProtocol.class) { proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi); + } else if (xface == InMemoryAliasMapProtocol.class) { + proxy = (T) createNNProxyWithInMemoryAliasMapProtocol(nnAddr, conf, ugi); } else { String message = "Unsupported protocol found when creating the proxy " + "connection to NameNode: " + @@ -194,7 +199,15 @@ public static ProxyAndInfo createNonHAProxy( return new ProxyAndInfo(proxy, dtService, nnAddr); } - + + private static InMemoryAliasMapProtocol createNNProxyWithInMemoryAliasMapProtocol( + InetSocketAddress address, Configuration conf, UserGroupInformation ugi) + throws IOException { + AliasMapProtocolPB proxy = (AliasMapProtocolPB) createNameNodeProxy( + address, conf, ugi, AliasMapProtocolPB.class, 30000); + return new InMemoryAliasMapProtocolClientSideTranslatorPB(proxy); + } + private static JournalProtocol createNNProxyWithJournalProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java index fc23c88c18..2025c16d1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java @@ -20,27 +20,38 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap; import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol; import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.InMemoryAliasMapFailoverProxyProvider; import org.apache.hadoop.ipc.ProtobufHelper; -import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.io.Closeable; import java.io.IOException; -import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSUtil.addKeySuffixes; +import static org.apache.hadoop.hdfs.DFSUtil.createUri; +import static org.apache.hadoop.hdfs.DFSUtilClient.getNameServiceIds; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX; import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*; import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.*; @@ -52,7 +63,7 @@ @InterfaceAudience.Private @InterfaceStability.Unstable public class InMemoryAliasMapProtocolClientSideTranslatorPB - implements InMemoryAliasMapProtocol { + implements InMemoryAliasMapProtocol, Closeable { private static final Logger LOG = LoggerFactory @@ -60,22 +71,61 @@ public class InMemoryAliasMapProtocolClientSideTranslatorPB private AliasMapProtocolPB rpcProxy; - public InMemoryAliasMapProtocolClientSideTranslatorPB(Configuration conf) { - String addr = conf.getTrimmed(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, - DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT); - InetSocketAddress aliasMapAddr = NetUtils.createSocketAddr(addr); + public InMemoryAliasMapProtocolClientSideTranslatorPB( + AliasMapProtocolPB rpcProxy) { + this.rpcProxy = rpcProxy; + } - RPC.setProtocolEngine(conf, AliasMapProtocolPB.class, - ProtobufRpcEngine.class); - LOG.info("Connecting to address: " + addr); - try { - rpcProxy = RPC.getProxy(AliasMapProtocolPB.class, - RPC.getProtocolVersion(AliasMapProtocolPB.class), aliasMapAddr, null, - conf, NetUtils.getDefaultSocketFactory(conf), 0); - } catch (IOException e) { - throw new RuntimeException( - "Error in connecting to " + addr + " Got: " + e); + public static Collection init(Configuration conf) { + Collection aliasMaps = new ArrayList<>(); + // Try to connect to all configured nameservices as it is not known which + // nameservice supports the AliasMap. + for (String nsId : getNameServiceIds(conf)) { + try { + URI namenodeURI = null; + Configuration newConf = new Configuration(conf); + if (HAUtil.isHAEnabled(conf, nsId)) { + // set the failover-proxy provider if HA is enabled. + newConf.setClass( + addKeySuffixes(PROXY_PROVIDER_KEY_PREFIX, nsId), + InMemoryAliasMapFailoverProxyProvider.class, + AbstractNNFailoverProxyProvider.class); + namenodeURI = new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId); + } else { + String key = + addKeySuffixes(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, nsId); + String addr = conf.get(key); + if (addr != null) { + namenodeURI = createUri(HdfsConstants.HDFS_URI_SCHEME, + NetUtils.createSocketAddr(addr)); + } + } + if (namenodeURI != null) { + aliasMaps.add(NameNodeProxies + .createProxy(newConf, namenodeURI, InMemoryAliasMapProtocol.class) + .getProxy()); + LOG.info("Connected to InMemoryAliasMap at {}", namenodeURI); + } + } catch (IOException | URISyntaxException e) { + LOG.warn("Exception in connecting to InMemoryAliasMap for nameservice " + + "{}: {}", nsId, e); + } } + // if a separate AliasMap is configured using + // DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, try to connect it. + if (conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS) != null) { + URI uri = createUri("hdfs", NetUtils.createSocketAddr( + conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS))); + try { + aliasMaps.add(NameNodeProxies + .createProxy(conf, uri, InMemoryAliasMapProtocol.class).getProxy()); + LOG.info("Connected to InMemoryAliasMap at {}", uri); + } catch (IOException e) { + LOG.warn("Exception in connecting to InMemoryAliasMap at {}: {}", uri, + e); + } + } + return aliasMaps; } @Override @@ -168,7 +218,12 @@ public String getBlockPoolId() throws IOException { } } - public void stop() { - RPC.stopProxy(rpcProxy); + @Override + public void close() throws IOException { + LOG.info("Stopping rpcProxy in" + + "InMemoryAliasMapProtocolClientSideTranslatorPB"); + if (rpcProxy != null) { + RPC.stopProxy(rpcProxy); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java index 89f590cf29..c3824e5826 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.apache.hadoop.io.retry.Idempotent; import javax.annotation.Nonnull; import java.io.IOException; @@ -69,6 +70,7 @@ public Optional getNextBlock() { * FileRegions and the next marker. * @throws IOException */ + @Idempotent InMemoryAliasMap.IterationResult list(Optional marker) throws IOException; @@ -80,6 +82,7 @@ InMemoryAliasMap.IterationResult list(Optional marker) * @throws IOException */ @Nonnull + @Idempotent Optional read(@Nonnull Block block) throws IOException; @@ -90,6 +93,7 @@ Optional read(@Nonnull Block block) * @param providedStorageLocation * @throws IOException */ + @Idempotent void write(@Nonnull Block block, @Nonnull ProvidedStorageLocation providedStorageLocation) throws IOException; @@ -99,5 +103,6 @@ void write(@Nonnull Block block, * @return the block pool id associated with the Namenode running * the in-memory alias map. */ + @Idempotent String getBlockPoolId() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java index 4edc9a2913..1d06f13285 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java @@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; import org.apache.hadoop.hdfs.protocolPB.AliasMapProtocolPB; @@ -34,9 +33,13 @@ import javax.annotation.Nonnull; import java.io.Closeable; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Optional; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST; +import static org.apache.hadoop.hdfs.DFSUtil.getBindAddress; import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*; import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.CheckedFunction2; @@ -79,18 +82,16 @@ public void start() throws IOException { AliasMapProtocolService .newReflectiveBlockingService(aliasMapProtocolXlator); - String rpcAddress = - conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, - DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT); - String[] split = rpcAddress.split(":"); - String bindHost = split[0]; - Integer port = Integer.valueOf(split[1]); + InetSocketAddress rpcAddress = getBindAddress(conf, + DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, + DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT, + DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST); aliasMapServer = new RPC.Builder(conf) .setProtocol(AliasMapProtocolPB.class) .setInstance(aliasMapProtocolService) - .setBindAddress(bindHost) - .setPort(port) + .setBindAddress(rpcAddress.getHostName()) + .setPort(rpcAddress.getPort()) .setNumHandlers(1) .setVerbose(true) .build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java index d3891846ce..fb5ee93c19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java @@ -24,11 +24,17 @@ import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; import org.apache.hadoop.hdfs.protocolPB.InMemoryAliasMapProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap; +import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol; import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -44,17 +50,28 @@ public class InMemoryLevelDBAliasMapClient extends BlockAliasMap implements Configurable { + private static final Logger LOG = + LoggerFactory.getLogger(InMemoryLevelDBAliasMapClient.class); private Configuration conf; - private InMemoryAliasMapProtocolClientSideTranslatorPB aliasMap; - private String blockPoolID; + private Collection aliasMaps; @Override public void close() { - aliasMap.stop(); + if (aliasMaps != null) { + for (InMemoryAliasMapProtocol aliasMap : aliasMaps) { + RPC.stopProxy(aliasMap); + } + } } class LevelDbReader extends BlockAliasMap.Reader { + private InMemoryAliasMapProtocol aliasMap; + + LevelDbReader(InMemoryAliasMapProtocol aliasMap) { + this.aliasMap = aliasMap; + } + @Override public Optional resolve(Block block) throws IOException { Optional read = aliasMap.read(block); @@ -114,6 +131,13 @@ public Iterator iterator() { } class LevelDbWriter extends BlockAliasMap.Writer { + + private InMemoryAliasMapProtocol aliasMap; + + LevelDbWriter(InMemoryAliasMapProtocol aliasMap) { + this.aliasMap = aliasMap; + } + @Override public void store(FileRegion fileRegion) throws IOException { aliasMap.write(fileRegion.getBlock(), @@ -130,40 +154,53 @@ public void close() throws IOException { throw new UnsupportedOperationException("Unable to start " + "InMemoryLevelDBAliasMapClient as security is enabled"); } + aliasMaps = new ArrayList<>(); } + private InMemoryAliasMapProtocol getAliasMap(String blockPoolID) + throws IOException { + if (blockPoolID == null) { + throw new IOException("Block pool id required to get aliasmap reader"); + } + // if a block pool id has been supplied, and doesn't match the associated + // block pool ids, return null. + for (InMemoryAliasMapProtocol aliasMap : aliasMaps) { + try { + String aliasMapBlockPoolId = aliasMap.getBlockPoolId(); + if (aliasMapBlockPoolId != null && + aliasMapBlockPoolId.equals(blockPoolID)) { + return aliasMap; + } + } catch (IOException e) { + LOG.error("Exception in retrieving block pool id {}", e); + } + } + throw new IOException( + "Unable to retrive InMemoryAliasMap for block pool id " + blockPoolID); + } @Override public Reader getReader(Reader.Options opts, String blockPoolID) throws IOException { - if (this.blockPoolID == null) { - this.blockPoolID = aliasMap.getBlockPoolId(); - } - // if a block pool id has been supplied, and doesn't match the associated - // block pool id, return null. - if (blockPoolID != null && this.blockPoolID != null - && !this.blockPoolID.equals(blockPoolID)) { - return null; - } - return new LevelDbReader(); + InMemoryAliasMapProtocol aliasMap = getAliasMap(blockPoolID); + LOG.info("Loading InMemoryAliasMapReader for block pool id {}", + blockPoolID); + return new LevelDbReader(aliasMap); } @Override public Writer getWriter(Writer.Options opts, String blockPoolID) throws IOException { - if (this.blockPoolID == null) { - this.blockPoolID = aliasMap.getBlockPoolId(); - } - if (blockPoolID != null && !this.blockPoolID.equals(blockPoolID)) { - return null; - } - return new LevelDbWriter(); + InMemoryAliasMapProtocol aliasMap = getAliasMap(blockPoolID); + LOG.info("Loading InMemoryAliasMapWriter for block pool id {}", + blockPoolID); + return new LevelDbWriter(aliasMap); } @Override public void setConf(Configuration conf) { this.conf = conf; - this.aliasMap = new InMemoryAliasMapProtocolClientSideTranslatorPB(conf); + aliasMaps = InMemoryAliasMapProtocolClientSideTranslatorPB.init(conf); } @Override @@ -174,5 +211,4 @@ public Configuration getConf() { @Override public void refresh() throws IOException { } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 146ae6c9c6..6dd2d92796 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4817,9 +4817,27 @@ dfs.provided.aliasmap.inmemory.dnrpc-address - 0.0.0.0:50200 + - The address where the aliasmap server will be running + The address where the aliasmap server will be running. In the case of + HA/Federation where multiple namenodes exist, and if the Namenode is + configured to run the aliasmap server + (dfs.provided.aliasmap.inmemory.enabled is set to true), + the name service id is added to the name, e.g., + dfs.provided.aliasmap.inmemory.rpc.address.EXAMPLENAMESERVICE. + The value of this property will take the form of host:rpc-port. + + + + + dfs.provided.aliasmap.inmemory.rpc.bind-host + + + The actual address the in-memory aliasmap server will bind to. + If this optional address is set, it overrides the hostname portion of + dfs.provided.aliasmap.inmemory.rpc.address. + This is useful for making the name node listen on all interfaces by + setting it to 0.0.0.0. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index c2e2a68af2..a2e59515d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1187,7 +1187,7 @@ public NameNodeInfo[] getNameNodeInfos(String nameservice) { } - private void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId, + protected void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId, boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy, int nnIndex) throws IOException { if (nameserviceId != null) { @@ -1379,6 +1379,17 @@ private NameNodeInfo getNN(int nnIndex) { return null; } + public List getNNIndexes(String nameserviceId) { + int count = 0; + List nnIndexes = new ArrayList<>(); + for (NameNodeInfo nn : namenodes.values()) { + if (nn.getNameserviceId().equals(nameserviceId)) { + nnIndexes.add(count); + } + count++; + } + return nnIndexes; + } /** * wait for the given namenode to get out of safemode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java index b9786a32a7..c21ff809b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java @@ -227,7 +227,7 @@ public NNConf(String nnId) { this.nnId = nnId; } - String getNnId() { + public String getNnId() { return nnId; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java index 61a1558978..f0626335bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java @@ -32,6 +32,7 @@ import org.junit.Before; import org.junit.Test; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -341,4 +342,10 @@ public void multipleReads() throws IOException { assertThat(actualFileRegions).containsExactlyInAnyOrder( expectedFileRegions.toArray(new FileRegion[0])); } + + @Test + public void testServerBindHost() throws Exception { + conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY, "0.0.0.0"); + writeRead(); + } } \ No newline at end of file diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java index 7d3ab0ea8f..e3f4decd59 100644 --- a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java +++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java @@ -24,12 +24,18 @@ import java.io.OutputStreamWriter; import java.io.Writer; import java.net.InetSocketAddress; +import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; -import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; @@ -44,14 +50,14 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap; -import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; @@ -60,8 +66,10 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAliasMapClient; +import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.LevelDBFileRegionAliasMap; import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -71,6 +79,8 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NodeBase; import org.junit.After; import org.junit.Before; @@ -80,6 +90,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LEVELDB_PATH; +import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI; import static org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.fileNameFromBlockPoolID; import static org.apache.hadoop.net.NodeBase.PATH_SEPARATOR_STR; import static org.junit.Assert.*; @@ -106,6 +122,7 @@ public class ITestProvidedImplementation { private final int baseFileLen = 1024; private long providedDataSize = 0; private final String bpid = "BP-1234-10.1.1.1-1224"; + private static final String clusterID = "CID-PROVIDED"; private Configuration conf; private MiniDFSCluster cluster; @@ -214,36 +231,78 @@ void startCluster(Path nspath, int numDatanodes, StorageType[] storageTypes, StorageType[][] storageTypesPerDatanode, boolean doFormat, String[] racks) throws IOException { + startCluster(nspath, numDatanodes, + storageTypes, storageTypesPerDatanode, + doFormat, racks, null, + new MiniDFSCluster.Builder(conf)); + } + + void startCluster(Path nspath, int numDatanodes, + StorageType[] storageTypes, + StorageType[][] storageTypesPerDatanode, + boolean doFormat, String[] racks, + MiniDFSNNTopology topo, + MiniDFSCluster.Builder builder) throws IOException { conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString()); + builder.format(doFormat) + .manageNameDfsDirs(doFormat) + .numDataNodes(numDatanodes) + .racks(racks); if (storageTypesPerDatanode != null) { - cluster = new MiniDFSCluster.Builder(conf) - .format(doFormat) - .manageNameDfsDirs(doFormat) - .numDataNodes(numDatanodes) - .storageTypes(storageTypesPerDatanode) - .racks(racks) - .build(); + builder.storageTypes(storageTypesPerDatanode); } else if (storageTypes != null) { - cluster = new MiniDFSCluster.Builder(conf) - .format(doFormat) - .manageNameDfsDirs(doFormat) - .numDataNodes(numDatanodes) - .storagesPerDatanode(storageTypes.length) - .storageTypes(storageTypes) - .racks(racks) - .build(); - } else { - cluster = new MiniDFSCluster.Builder(conf) - .format(doFormat) - .manageNameDfsDirs(doFormat) - .numDataNodes(numDatanodes) - .racks(racks) - .build(); + builder.storagesPerDatanode(storageTypes.length) + .storageTypes(storageTypes); } + if (topo != null) { + builder.nnTopology(topo); + // If HA or Federation is enabled and formatting is set to false, + // copy the FSImage to all Namenode directories. + if ((topo.isHA() || topo.isFederated()) && !doFormat) { + builder.manageNameDfsDirs(true); + builder.enableManagedDfsDirsRedundancy(false); + builder.manageNameDfsSharedDirs(false); + List nnDirs = + getProvidedNamenodeDirs(MiniDFSCluster.getBaseDirectory(), topo); + for (File nnDir : nnDirs) { + MiniDFSCluster.copyNameDirs( + Collections.singletonList(nspath.toUri()), + Collections.singletonList(fileAsURI(nnDir)), + conf); + } + } + } + cluster = builder.build(); cluster.waitActive(); } + private static List getProvidedNamenodeDirs(String baseDir, + MiniDFSNNTopology topo) { + List nnDirs = new ArrayList<>(); + int nsCounter = 0; + for (MiniDFSNNTopology.NSConf nsConf : topo.getNameservices()) { + int nnCounter = nsCounter; + for (MiniDFSNNTopology.NNConf nnConf : nsConf.getNNs()) { + if (providedNameservice.equals(nsConf.getId())) { + // only add the first one + File[] nnFiles = + MiniDFSCluster.getNameNodeDirectory( + baseDir, nsCounter, nnCounter); + if (nnFiles == null || nnFiles.length == 0) { + throw new RuntimeException("Failed to get a location for the" + + "Namenode directory for namespace: " + nsConf.getId() + + " and namenodeId: " + nnConf.getNnId()); + } + nnDirs.add(nnFiles[0]); + } + nnCounter++; + } + nsCounter = nnCounter; + } + return nnDirs; + } + @Test(timeout=20000) public void testLoadImage() throws Exception { final long seed = r.nextLong(); @@ -405,8 +464,8 @@ static Path removePrefix(Path base, Path walk) { return ret; } - private void verifyFileSystemContents() throws Exception { - FileSystem fs = cluster.getFileSystem(); + private void verifyFileSystemContents(int nnIndex) throws Exception { + FileSystem fs = cluster.getFileSystem(nnIndex); int count = 0; // read NN metadata, verify contents match for (TreePath e : new FSTreeWalk(providedPath, conf)) { @@ -766,41 +825,255 @@ public void testNumberOfProvidedLocationsManyBlocks() throws Exception { } } - - @Test - public void testInMemoryAliasMap() throws Exception { - conf.setClass(ImageWriter.Options.UGI_CLASS, - FsUGIResolver.class, UGIResolver.class); + private File createInMemoryAliasMapImage() throws Exception { + conf.setClass(ImageWriter.Options.UGI_CLASS, FsUGIResolver.class, + UGIResolver.class); conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS, InMemoryLevelDBAliasMapClient.class, BlockAliasMap.class); - conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, - "localhost:32445"); + conf.set(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, "localhost:32445"); File tempDirectory = - Files.createTempDirectory("in-memory-alias-map").toFile(); - File leveDBPath = new File(tempDirectory, bpid); - leveDBPath.mkdirs(); - conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR, + new File(new Path(nnDirPath, "in-memory-alias-map").toUri()); + File levelDBDir = new File(tempDirectory, bpid); + levelDBDir.mkdirs(); + conf.set(DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR, tempDirectory.getAbsolutePath()); - conf.setBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true); conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10); - InMemoryLevelDBAliasMapServer levelDBAliasMapServer = - new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init, bpid); - levelDBAliasMapServer.setConf(conf); - levelDBAliasMapServer.start(); + conf.set(DFS_PROVIDED_ALIASMAP_LEVELDB_PATH, + tempDirectory.getAbsolutePath()); createImage(new FSTreeWalk(providedPath, conf), nnDirPath, - FixedBlockResolver.class, "", - InMemoryLevelDBAliasMapClient.class); - levelDBAliasMapServer.close(); + FixedBlockResolver.class, clusterID, + LevelDBFileRegionAliasMap.class); + return tempDirectory; + } + + @Test + public void testInMemoryAliasMap() throws Exception { + File aliasMapImage = createInMemoryAliasMapImage(); // start cluster with two datanodes, // each with 1 PROVIDED volume and other DISK volume + conf.setBoolean(DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true); + conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10); startCluster(nnDirPath, 2, new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null, false); - verifyFileSystemContents(); - FileUtils.deleteDirectory(tempDirectory); + verifyFileSystemContents(0); + FileUtils.deleteDirectory(aliasMapImage); + } + + /** + * Find a free port that hasn't been assigned yet. + * + * @param usedPorts set of ports that have already been assigned. + * @param maxTrials maximum number of random ports to try before failure. + * @return an unassigned port. + */ + private int getUnAssignedPort(Set usedPorts, int maxTrials) { + int count = 0; + while (count < maxTrials) { + int port = NetUtils.getFreeSocketPort(); + if (usedPorts.contains(port)) { + count++; + } else { + return port; + } + } + return -1; + } + + private static String providedNameservice; + + /** + * Extends the {@link MiniDFSCluster.Builder} to create instances of + * {@link MiniDFSClusterBuilderAliasMap}. + */ + private static class MiniDFSClusterBuilderAliasMap + extends MiniDFSCluster.Builder { + + MiniDFSClusterBuilderAliasMap(Configuration conf) { + super(conf); + } + + @Override + public MiniDFSCluster build() throws IOException { + return new MiniDFSClusterAliasMap(this); + } + } + + /** + * Extends {@link MiniDFSCluster} to correctly configure the InMemoryAliasMap. + */ + private static class MiniDFSClusterAliasMap extends MiniDFSCluster { + + private Map> formattedDirsByNamespaceId; + private Set completedNNs; + + MiniDFSClusterAliasMap(MiniDFSCluster.Builder builder) throws IOException { + super(builder); + } + + @Override + protected void initNameNodeConf(Configuration conf, String nameserviceId, + int nsIndex, String nnId, boolean manageNameDfsDirs, + boolean enableManagedDfsDirsRedundancy, int nnIndex) + throws IOException { + + if (formattedDirsByNamespaceId == null) { + formattedDirsByNamespaceId = new HashMap<>(); + completedNNs = new HashSet<>(); + } + + super.initNameNodeConf(conf, nameserviceId, nsIndex, nnId, + manageNameDfsDirs, enableManagedDfsDirsRedundancy, nnIndex); + + if (providedNameservice.equals(nameserviceId)) { + // configure the InMemoryAliasMp. + conf.setBoolean(DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true); + String directory = conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR); + if (directory == null) { + throw new IllegalArgumentException("In-memory alias map configured" + + "with the proper location; Set " + + DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR); + } + // get the name of the directory (final component in path) used for map. + // Assume that the aliasmap configured with the same final component + // name in all Namenodes but is located in the path specified by + // DFS_NAMENODE_NAME_DIR_KEY + String dirName = new Path(directory).getName(); + String nnDir = + conf.getTrimmedStringCollection(DFS_NAMENODE_NAME_DIR_KEY) + .iterator().next(); + conf.set(DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR, + new File(new Path(nnDir, dirName).toUri()).getAbsolutePath()); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true); + } else { + if (!completedNNs.contains(nnIndex)) { + // format the NN directories for non-provided namespaces + // if the directory for a namespace has been formatted, copy it over. + Collection namespaceDirs = FSNamesystem.getNamespaceDirs(conf); + if (formattedDirsByNamespaceId.containsKey(nameserviceId)) { + copyNameDirs(formattedDirsByNamespaceId.get(nameserviceId), + namespaceDirs, conf); + } else { + for (URI nameDirUri : namespaceDirs) { + File nameDir = new File(nameDirUri); + if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) { + throw new IOException("Could not fully delete " + nameDir); + } + } + HdfsServerConstants.StartupOption.FORMAT.setClusterId(clusterID); + DFSTestUtil.formatNameNode(conf); + formattedDirsByNamespaceId.put(nameserviceId, namespaceDirs); + } + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, false); + completedNNs.add(nnIndex); + } + } + } + } + + /** + * Configures the addresseses of the InMemoryAliasMap. + * + * @param topology the MiniDFS topology to use. + * @param providedNameservice the nameservice id that supports provided. + */ + private void configureAliasMapAddresses(MiniDFSNNTopology topology, + String providedNameservice) { + conf.unset(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS); + Set assignedPorts = new HashSet<>(); + for (MiniDFSNNTopology.NSConf nsConf : topology.getNameservices()) { + for (MiniDFSNNTopology.NNConf nnConf : nsConf.getNNs()) { + if (providedNameservice.equals(nsConf.getId())) { + String key = + DFSUtil.addKeySuffixes(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, + nsConf.getId(), nnConf.getNnId()); + int port = getUnAssignedPort(assignedPorts, 10); + if (port == -1) { + throw new RuntimeException("No free ports available"); + } + assignedPorts.add(port); + conf.set(key, "127.0.0.1:" + port); + + String binHostKey = + DFSUtil.addKeySuffixes( + DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST, + nsConf.getId(), nnConf.getNnId()); + conf.set(binHostKey, "0.0.0.0"); + } + } + } + } + + /** + * Verify the mounted contents of the Filesystem. + * + * @param topology the topology of the cluster. + * @param providedNameservice the namespace id of the provided namenodes. + * @throws Exception + */ + private void verifyPathsWithHAFailoverIfNecessary(MiniDFSNNTopology topology, + String providedNameservice) throws Exception { + List nnIndexes = cluster.getNNIndexes(providedNameservice); + if (topology.isHA()) { + int nn1 = nnIndexes.get(0); + int nn2 = nnIndexes.get(1); + try { + verifyFileSystemContents(nn1); + fail("Read operation should fail as no Namenode is active"); + } catch (RemoteException e) { + LOG.info("verifyPaths failed!. Expected exception: {}" + e); + } + cluster.transitionToActive(nn1); + LOG.info("Verifying data from NN with index = {}", nn1); + verifyFileSystemContents(nn1); + // transition to the second namenode. + cluster.transitionToStandby(nn1); + cluster.transitionToActive(nn2); + LOG.info("Verifying data from NN with index = {}", nn2); + verifyFileSystemContents(nn2); + + cluster.shutdownNameNodes(); + try { + verifyFileSystemContents(nn2); + fail("Read operation should fail as no Namenode is active"); + } catch (NullPointerException e) { + LOG.info("verifyPaths failed!. Expected exception: {}" + e); + } + } else { + verifyFileSystemContents(nnIndexes.get(0)); + } + } + + @Test + public void testInMemoryAliasMapMultiTopologies() throws Exception { + MiniDFSNNTopology[] topologies = + new MiniDFSNNTopology[] { + MiniDFSNNTopology.simpleHATopology(), + MiniDFSNNTopology.simpleFederatedTopology(3), + MiniDFSNNTopology.simpleHAFederatedTopology(3) + }; + + for (MiniDFSNNTopology topology : topologies) { + LOG.info("Starting test with topology with HA = {}, federation = {}", + topology.isHA(), topology.isFederated()); + setSeed(); + createInMemoryAliasMapImage(); + conf.setBoolean(DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true); + conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10); + providedNameservice = topology.getNameservices().get(0).getId(); + // configure the AliasMap addresses + configureAliasMapAddresses(topology, providedNameservice); + startCluster(nnDirPath, 2, + new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, + null, false, null, topology, + new MiniDFSClusterBuilderAliasMap(conf)); + + verifyPathsWithHAFailoverIfNecessary(topology, providedNameservice); + shutdown(); + } } private DatanodeDescriptor getDatanodeDescriptor(DatanodeManager dnm, @@ -919,7 +1192,7 @@ public void testProvidedWithHierarchicalTopology() throws Exception { startCluster(nnDirPath, racks.length, new StorageType[]{StorageType.PROVIDED, StorageType.DISK}, null, false, racks); - verifyFileSystemContents(); + verifyFileSystemContents(0); setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix); cluster.shutdown(); }