HDFS-13536. [PROVIDED Storage] HA for InMemoryAliasMap. Contributed by Virajith Jalaparti.

This commit is contained in:
Inigo Goiri 2018-07-02 10:48:20 -07:00
parent 5cc2541a16
commit 1804a31515
16 changed files with 615 additions and 110 deletions

View File

@ -396,7 +396,7 @@ static String concatSuffixes(String... suffixes) {
* @param keys Set of keys to look for in the order of preference
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
*/
static Map<String, Map<String, InetSocketAddress>> getAddresses(
public static Map<String, Map<String, InetSocketAddress>> getAddresses(
Configuration conf, String defaultAddress, String... keys) {
Collection<String> nameserviceIds = getNameServiceIds(conf);
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
@ -426,7 +426,7 @@ static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIds(
return ret;
}
static Map<String, InetSocketAddress> getAddressesForNameserviceId(
public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue, String... keys) {
Collection<String> nnIds = getNameNodeIds(conf, nsId);
Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();

View File

@ -184,6 +184,9 @@ public interface HdfsClientConfigKeys {
"dfs.namenode.snapshot.capture.openfiles";
boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT = false;
String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS =
"dfs.provided.aliasmap.inmemory.dnrpc-address";
/**
* These are deprecated config keys to client code.
*/

View File

@ -37,6 +37,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
/**
* A FailoverProxyProvider implementation which allows one to configure
* multiple URIs to connect to during fail-over. A random configured address is
@ -60,6 +62,11 @@ public class ConfiguredFailoverProxyProvider<T> extends
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory) {
this(conf, uri, xface, factory, DFS_NAMENODE_RPC_ADDRESS_KEY);
}
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory, String addressKey) {
this.xface = xface;
this.conf = new Configuration(conf);
int maxRetries = this.conf.getInt(
@ -81,7 +88,7 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
ugi = UserGroupInformation.getCurrentUser();
Map<String, Map<String, InetSocketAddress>> map =
DFSUtilClient.getHaNnRpcAddresses(conf);
DFSUtilClient.getAddresses(conf, null, addressKey);
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
if (addressesInNN == null || addressesInNN.size() == 0) {

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.conf.Configuration;
import java.net.URI;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
/**
* A {@link ConfiguredFailoverProxyProvider} implementation used to connect
* to an InMemoryAliasMap.
*/
public class InMemoryAliasMapFailoverProxyProvider<T>
extends ConfiguredFailoverProxyProvider<T> {
public InMemoryAliasMapFailoverProxyProvider(
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
super(conf, uri, xface, factory,
DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS);
}
}

View File

@ -86,8 +86,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50105";
public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS = "dfs.provided.aliasmap.inmemory.dnrpc-address";
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS =
HdfsClientConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT = "0.0.0.0:50200";
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST = "dfs.provided.aliasmap.inmemory.rpc.bind-host";
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR = "dfs.provided.aliasmap.inmemory.leveldb.dir";
public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE = "dfs.provided.aliasmap.inmemory.batch-size";
public static final int DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE_DEFAULT = 500;

View File

@ -1130,7 +1130,42 @@ private static String getNameServiceId(Configuration conf, String addressKey) {
return getSuffixIDs(conf, addressKey, null, nnId, LOCAL_ADDRESS_MATCHER)[0];
}
/**
* Determine the {@link InetSocketAddress} to bind to, for any service.
* In case of HA or federation, the address is assumed to specified as
* {@code confKey}.NAMESPACEID.NAMENODEID as appropriate.
*
* @param conf configuration.
* @param confKey configuration key (prefix if HA/federation) used to
* specify the address for the service.
* @param defaultValue default value for the address.
* @param bindHostKey configuration key (prefix if HA/federation)
* specifying host to bind to.
* @return the address to bind to.
*/
public static InetSocketAddress getBindAddress(Configuration conf,
String confKey, String defaultValue, String bindHostKey) {
InetSocketAddress address;
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String bindHostActualKey;
if (nsId != null) {
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
address = DFSUtilClient.getAddressesForNameserviceId(
conf, nsId, null, confKey).get(namenodeId);
bindHostActualKey = DFSUtil.addKeySuffixes(bindHostKey, nsId, namenodeId);
} else {
address = NetUtils.createSocketAddr(conf.get(confKey, defaultValue));
bindHostActualKey = bindHostKey;
}
String bindHost = conf.get(bindHostActualKey);
if (bindHost == null || bindHost.isEmpty()) {
bindHost = address.getHostName();
}
return new InetSocketAddress(bindHost, address.getPort());
}
/**
* Returns nameservice Id and namenode Id when the local host matches the
* configuration parameter {@code addressKey}.<nameservice Id>.<namenode Id>

View File

@ -31,10 +31,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocolPB.AliasMapProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InMemoryAliasMapProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
@ -184,6 +187,8 @@ public static <T> ProxyAndInfo<T> createNonHAProxy(
conf, ugi);
} else if (xface == RefreshCallQueueProtocol.class) {
proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
} else if (xface == InMemoryAliasMapProtocol.class) {
proxy = (T) createNNProxyWithInMemoryAliasMapProtocol(nnAddr, conf, ugi);
} else {
String message = "Unsupported protocol found when creating the proxy " +
"connection to NameNode: " +
@ -194,7 +199,15 @@ public static <T> ProxyAndInfo<T> createNonHAProxy(
return new ProxyAndInfo<T>(proxy, dtService, nnAddr);
}
private static InMemoryAliasMapProtocol createNNProxyWithInMemoryAliasMapProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
throws IOException {
AliasMapProtocolPB proxy = (AliasMapProtocolPB) createNameNodeProxy(
address, conf, ugi, AliasMapProtocolPB.class, 30000);
return new InMemoryAliasMapProtocolClientSideTranslatorPB(proxy);
}
private static JournalProtocol createNNProxyWithJournalProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
throws IOException {

View File

@ -20,27 +20,38 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.InMemoryAliasMapFailoverProxyProvider;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSUtil.addKeySuffixes;
import static org.apache.hadoop.hdfs.DFSUtil.createUri;
import static org.apache.hadoop.hdfs.DFSUtilClient.getNameServiceIds;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX;
import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.*;
@ -52,7 +63,7 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InMemoryAliasMapProtocolClientSideTranslatorPB
implements InMemoryAliasMapProtocol {
implements InMemoryAliasMapProtocol, Closeable {
private static final Logger LOG =
LoggerFactory
@ -60,22 +71,61 @@ public class InMemoryAliasMapProtocolClientSideTranslatorPB
private AliasMapProtocolPB rpcProxy;
public InMemoryAliasMapProtocolClientSideTranslatorPB(Configuration conf) {
String addr = conf.getTrimmed(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT);
InetSocketAddress aliasMapAddr = NetUtils.createSocketAddr(addr);
public InMemoryAliasMapProtocolClientSideTranslatorPB(
AliasMapProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
RPC.setProtocolEngine(conf, AliasMapProtocolPB.class,
ProtobufRpcEngine.class);
LOG.info("Connecting to address: " + addr);
try {
rpcProxy = RPC.getProxy(AliasMapProtocolPB.class,
RPC.getProtocolVersion(AliasMapProtocolPB.class), aliasMapAddr, null,
conf, NetUtils.getDefaultSocketFactory(conf), 0);
} catch (IOException e) {
throw new RuntimeException(
"Error in connecting to " + addr + " Got: " + e);
public static Collection<InMemoryAliasMapProtocol> init(Configuration conf) {
Collection<InMemoryAliasMapProtocol> aliasMaps = new ArrayList<>();
// Try to connect to all configured nameservices as it is not known which
// nameservice supports the AliasMap.
for (String nsId : getNameServiceIds(conf)) {
try {
URI namenodeURI = null;
Configuration newConf = new Configuration(conf);
if (HAUtil.isHAEnabled(conf, nsId)) {
// set the failover-proxy provider if HA is enabled.
newConf.setClass(
addKeySuffixes(PROXY_PROVIDER_KEY_PREFIX, nsId),
InMemoryAliasMapFailoverProxyProvider.class,
AbstractNNFailoverProxyProvider.class);
namenodeURI = new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId);
} else {
String key =
addKeySuffixes(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, nsId);
String addr = conf.get(key);
if (addr != null) {
namenodeURI = createUri(HdfsConstants.HDFS_URI_SCHEME,
NetUtils.createSocketAddr(addr));
}
}
if (namenodeURI != null) {
aliasMaps.add(NameNodeProxies
.createProxy(newConf, namenodeURI, InMemoryAliasMapProtocol.class)
.getProxy());
LOG.info("Connected to InMemoryAliasMap at {}", namenodeURI);
}
} catch (IOException | URISyntaxException e) {
LOG.warn("Exception in connecting to InMemoryAliasMap for nameservice "
+ "{}: {}", nsId, e);
}
}
// if a separate AliasMap is configured using
// DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, try to connect it.
if (conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS) != null) {
URI uri = createUri("hdfs", NetUtils.createSocketAddr(
conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS)));
try {
aliasMaps.add(NameNodeProxies
.createProxy(conf, uri, InMemoryAliasMapProtocol.class).getProxy());
LOG.info("Connected to InMemoryAliasMap at {}", uri);
} catch (IOException e) {
LOG.warn("Exception in connecting to InMemoryAliasMap at {}: {}", uri,
e);
}
}
return aliasMaps;
}
@Override
@ -168,7 +218,12 @@ public String getBlockPoolId() throws IOException {
}
}
public void stop() {
RPC.stopProxy(rpcProxy);
@Override
public void close() throws IOException {
LOG.info("Stopping rpcProxy in" +
"InMemoryAliasMapProtocolClientSideTranslatorPB");
if (rpcProxy != null) {
RPC.stopProxy(rpcProxy);
}
}
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.io.retry.Idempotent;
import javax.annotation.Nonnull;
import java.io.IOException;
@ -69,6 +70,7 @@ public Optional<Block> getNextBlock() {
* FileRegions and the next marker.
* @throws IOException
*/
@Idempotent
InMemoryAliasMap.IterationResult list(Optional<Block> marker)
throws IOException;
@ -80,6 +82,7 @@ InMemoryAliasMap.IterationResult list(Optional<Block> marker)
* @throws IOException
*/
@Nonnull
@Idempotent
Optional<ProvidedStorageLocation> read(@Nonnull Block block)
throws IOException;
@ -90,6 +93,7 @@ Optional<ProvidedStorageLocation> read(@Nonnull Block block)
* @param providedStorageLocation
* @throws IOException
*/
@Idempotent
void write(@Nonnull Block block,
@Nonnull ProvidedStorageLocation providedStorageLocation)
throws IOException;
@ -99,5 +103,6 @@ void write(@Nonnull Block block,
* @return the block pool id associated with the Namenode running
* the in-memory alias map.
*/
@Idempotent
String getBlockPoolId() throws IOException;
}

View File

@ -25,7 +25,6 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.protocolPB.AliasMapProtocolPB;
@ -34,9 +33,13 @@
import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Optional;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST;
import static org.apache.hadoop.hdfs.DFSUtil.getBindAddress;
import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.CheckedFunction2;
@ -79,18 +82,16 @@ public void start() throws IOException {
AliasMapProtocolService
.newReflectiveBlockingService(aliasMapProtocolXlator);
String rpcAddress =
conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT);
String[] split = rpcAddress.split(":");
String bindHost = split[0];
Integer port = Integer.valueOf(split[1]);
InetSocketAddress rpcAddress = getBindAddress(conf,
DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT,
DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST);
aliasMapServer = new RPC.Builder(conf)
.setProtocol(AliasMapProtocolPB.class)
.setInstance(aliasMapProtocolService)
.setBindAddress(bindHost)
.setPort(port)
.setBindAddress(rpcAddress.getHostName())
.setPort(rpcAddress.getPort())
.setNumHandlers(1)
.setVerbose(true)
.build();

View File

@ -24,11 +24,17 @@
import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
import org.apache.hadoop.hdfs.protocolPB.InMemoryAliasMapProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@ -44,17 +50,28 @@
public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion>
implements Configurable {
private static final Logger LOG =
LoggerFactory.getLogger(InMemoryLevelDBAliasMapClient.class);
private Configuration conf;
private InMemoryAliasMapProtocolClientSideTranslatorPB aliasMap;
private String blockPoolID;
private Collection<InMemoryAliasMapProtocol> aliasMaps;
@Override
public void close() {
aliasMap.stop();
if (aliasMaps != null) {
for (InMemoryAliasMapProtocol aliasMap : aliasMaps) {
RPC.stopProxy(aliasMap);
}
}
}
class LevelDbReader extends BlockAliasMap.Reader<FileRegion> {
private InMemoryAliasMapProtocol aliasMap;
LevelDbReader(InMemoryAliasMapProtocol aliasMap) {
this.aliasMap = aliasMap;
}
@Override
public Optional<FileRegion> resolve(Block block) throws IOException {
Optional<ProvidedStorageLocation> read = aliasMap.read(block);
@ -114,6 +131,13 @@ public Iterator<FileRegion> iterator() {
}
class LevelDbWriter extends BlockAliasMap.Writer<FileRegion> {
private InMemoryAliasMapProtocol aliasMap;
LevelDbWriter(InMemoryAliasMapProtocol aliasMap) {
this.aliasMap = aliasMap;
}
@Override
public void store(FileRegion fileRegion) throws IOException {
aliasMap.write(fileRegion.getBlock(),
@ -130,40 +154,53 @@ public void close() throws IOException {
throw new UnsupportedOperationException("Unable to start "
+ "InMemoryLevelDBAliasMapClient as security is enabled");
}
aliasMaps = new ArrayList<>();
}
private InMemoryAliasMapProtocol getAliasMap(String blockPoolID)
throws IOException {
if (blockPoolID == null) {
throw new IOException("Block pool id required to get aliasmap reader");
}
// if a block pool id has been supplied, and doesn't match the associated
// block pool ids, return null.
for (InMemoryAliasMapProtocol aliasMap : aliasMaps) {
try {
String aliasMapBlockPoolId = aliasMap.getBlockPoolId();
if (aliasMapBlockPoolId != null &&
aliasMapBlockPoolId.equals(blockPoolID)) {
return aliasMap;
}
} catch (IOException e) {
LOG.error("Exception in retrieving block pool id {}", e);
}
}
throw new IOException(
"Unable to retrive InMemoryAliasMap for block pool id " + blockPoolID);
}
@Override
public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolID)
throws IOException {
if (this.blockPoolID == null) {
this.blockPoolID = aliasMap.getBlockPoolId();
}
// if a block pool id has been supplied, and doesn't match the associated
// block pool id, return null.
if (blockPoolID != null && this.blockPoolID != null
&& !this.blockPoolID.equals(blockPoolID)) {
return null;
}
return new LevelDbReader();
InMemoryAliasMapProtocol aliasMap = getAliasMap(blockPoolID);
LOG.info("Loading InMemoryAliasMapReader for block pool id {}",
blockPoolID);
return new LevelDbReader(aliasMap);
}
@Override
public Writer<FileRegion> getWriter(Writer.Options opts, String blockPoolID)
throws IOException {
if (this.blockPoolID == null) {
this.blockPoolID = aliasMap.getBlockPoolId();
}
if (blockPoolID != null && !this.blockPoolID.equals(blockPoolID)) {
return null;
}
return new LevelDbWriter();
InMemoryAliasMapProtocol aliasMap = getAliasMap(blockPoolID);
LOG.info("Loading InMemoryAliasMapWriter for block pool id {}",
blockPoolID);
return new LevelDbWriter(aliasMap);
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.aliasMap = new InMemoryAliasMapProtocolClientSideTranslatorPB(conf);
aliasMaps = InMemoryAliasMapProtocolClientSideTranslatorPB.init(conf);
}
@Override
@ -174,5 +211,4 @@ public Configuration getConf() {
@Override
public void refresh() throws IOException {
}
}

View File

@ -4817,9 +4817,27 @@
<property>
<name>dfs.provided.aliasmap.inmemory.dnrpc-address</name>
<value>0.0.0.0:50200</value>
<value></value>
<description>
The address where the aliasmap server will be running
The address where the aliasmap server will be running. In the case of
HA/Federation where multiple namenodes exist, and if the Namenode is
configured to run the aliasmap server
(dfs.provided.aliasmap.inmemory.enabled is set to true),
the name service id is added to the name, e.g.,
dfs.provided.aliasmap.inmemory.rpc.address.EXAMPLENAMESERVICE.
The value of this property will take the form of host:rpc-port.
</description>
</property>
<property>
<name>dfs.provided.aliasmap.inmemory.rpc.bind-host</name>
<value></value>
<description>
The actual address the in-memory aliasmap server will bind to.
If this optional address is set, it overrides the hostname portion of
dfs.provided.aliasmap.inmemory.rpc.address.
This is useful for making the name node listen on all interfaces by
setting it to 0.0.0.0.
</description>
</property>

View File

@ -1187,7 +1187,7 @@ public NameNodeInfo[] getNameNodeInfos(String nameservice) {
}
private void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId,
protected void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId,
boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy, int nnIndex)
throws IOException {
if (nameserviceId != null) {
@ -1379,6 +1379,17 @@ private NameNodeInfo getNN(int nnIndex) {
return null;
}
public List<Integer> getNNIndexes(String nameserviceId) {
int count = 0;
List<Integer> nnIndexes = new ArrayList<>();
for (NameNodeInfo nn : namenodes.values()) {
if (nn.getNameserviceId().equals(nameserviceId)) {
nnIndexes.add(count);
}
count++;
}
return nnIndexes;
}
/**
* wait for the given namenode to get out of safemode.

View File

@ -227,7 +227,7 @@ public NNConf(String nnId) {
this.nnId = nnId;
}
String getNnId() {
public String getNnId() {
return nnId;
}

View File

@ -32,6 +32,7 @@
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -341,4 +342,10 @@ public void multipleReads() throws IOException {
assertThat(actualFileRegions).containsExactlyInAnyOrder(
expectedFileRegions.toArray(new FileRegion[0]));
}
@Test
public void testServerBindHost() throws Exception {
conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY, "0.0.0.0");
writeRead();
}
}

View File

@ -24,12 +24,18 @@
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@ -44,14 +50,14 @@
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@ -60,8 +66,10 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAliasMapClient;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.LevelDBFileRegionAliasMap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -71,6 +79,8 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase;
import org.junit.After;
import org.junit.Before;
@ -80,6 +90,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LEVELDB_PATH;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.fileNameFromBlockPoolID;
import static org.apache.hadoop.net.NodeBase.PATH_SEPARATOR_STR;
import static org.junit.Assert.*;
@ -106,6 +122,7 @@ public class ITestProvidedImplementation {
private final int baseFileLen = 1024;
private long providedDataSize = 0;
private final String bpid = "BP-1234-10.1.1.1-1224";
private static final String clusterID = "CID-PROVIDED";
private Configuration conf;
private MiniDFSCluster cluster;
@ -214,36 +231,78 @@ void startCluster(Path nspath, int numDatanodes,
StorageType[] storageTypes,
StorageType[][] storageTypesPerDatanode,
boolean doFormat, String[] racks) throws IOException {
startCluster(nspath, numDatanodes,
storageTypes, storageTypesPerDatanode,
doFormat, racks, null,
new MiniDFSCluster.Builder(conf));
}
void startCluster(Path nspath, int numDatanodes,
StorageType[] storageTypes,
StorageType[][] storageTypesPerDatanode,
boolean doFormat, String[] racks,
MiniDFSNNTopology topo,
MiniDFSCluster.Builder builder) throws IOException {
conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString());
builder.format(doFormat)
.manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes)
.racks(racks);
if (storageTypesPerDatanode != null) {
cluster = new MiniDFSCluster.Builder(conf)
.format(doFormat)
.manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes)
.storageTypes(storageTypesPerDatanode)
.racks(racks)
.build();
builder.storageTypes(storageTypesPerDatanode);
} else if (storageTypes != null) {
cluster = new MiniDFSCluster.Builder(conf)
.format(doFormat)
.manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes)
.storagesPerDatanode(storageTypes.length)
.storageTypes(storageTypes)
.racks(racks)
.build();
} else {
cluster = new MiniDFSCluster.Builder(conf)
.format(doFormat)
.manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes)
.racks(racks)
.build();
builder.storagesPerDatanode(storageTypes.length)
.storageTypes(storageTypes);
}
if (topo != null) {
builder.nnTopology(topo);
// If HA or Federation is enabled and formatting is set to false,
// copy the FSImage to all Namenode directories.
if ((topo.isHA() || topo.isFederated()) && !doFormat) {
builder.manageNameDfsDirs(true);
builder.enableManagedDfsDirsRedundancy(false);
builder.manageNameDfsSharedDirs(false);
List<File> nnDirs =
getProvidedNamenodeDirs(MiniDFSCluster.getBaseDirectory(), topo);
for (File nnDir : nnDirs) {
MiniDFSCluster.copyNameDirs(
Collections.singletonList(nspath.toUri()),
Collections.singletonList(fileAsURI(nnDir)),
conf);
}
}
}
cluster = builder.build();
cluster.waitActive();
}
private static List<File> getProvidedNamenodeDirs(String baseDir,
MiniDFSNNTopology topo) {
List<File> nnDirs = new ArrayList<>();
int nsCounter = 0;
for (MiniDFSNNTopology.NSConf nsConf : topo.getNameservices()) {
int nnCounter = nsCounter;
for (MiniDFSNNTopology.NNConf nnConf : nsConf.getNNs()) {
if (providedNameservice.equals(nsConf.getId())) {
// only add the first one
File[] nnFiles =
MiniDFSCluster.getNameNodeDirectory(
baseDir, nsCounter, nnCounter);
if (nnFiles == null || nnFiles.length == 0) {
throw new RuntimeException("Failed to get a location for the"
+ "Namenode directory for namespace: " + nsConf.getId()
+ " and namenodeId: " + nnConf.getNnId());
}
nnDirs.add(nnFiles[0]);
}
nnCounter++;
}
nsCounter = nnCounter;
}
return nnDirs;
}
@Test(timeout=20000)
public void testLoadImage() throws Exception {
final long seed = r.nextLong();
@ -405,8 +464,8 @@ static Path removePrefix(Path base, Path walk) {
return ret;
}
private void verifyFileSystemContents() throws Exception {
FileSystem fs = cluster.getFileSystem();
private void verifyFileSystemContents(int nnIndex) throws Exception {
FileSystem fs = cluster.getFileSystem(nnIndex);
int count = 0;
// read NN metadata, verify contents match
for (TreePath e : new FSTreeWalk(providedPath, conf)) {
@ -766,41 +825,255 @@ public void testNumberOfProvidedLocationsManyBlocks() throws Exception {
}
}
@Test
public void testInMemoryAliasMap() throws Exception {
conf.setClass(ImageWriter.Options.UGI_CLASS,
FsUGIResolver.class, UGIResolver.class);
private File createInMemoryAliasMapImage() throws Exception {
conf.setClass(ImageWriter.Options.UGI_CLASS, FsUGIResolver.class,
UGIResolver.class);
conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
InMemoryLevelDBAliasMapClient.class, BlockAliasMap.class);
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
"localhost:32445");
conf.set(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, "localhost:32445");
File tempDirectory =
Files.createTempDirectory("in-memory-alias-map").toFile();
File leveDBPath = new File(tempDirectory, bpid);
leveDBPath.mkdirs();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
new File(new Path(nnDirPath, "in-memory-alias-map").toUri());
File levelDBDir = new File(tempDirectory, bpid);
levelDBDir.mkdirs();
conf.set(DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
tempDirectory.getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10);
InMemoryLevelDBAliasMapServer levelDBAliasMapServer =
new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init, bpid);
levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start();
conf.set(DFS_PROVIDED_ALIASMAP_LEVELDB_PATH,
tempDirectory.getAbsolutePath());
createImage(new FSTreeWalk(providedPath, conf),
nnDirPath,
FixedBlockResolver.class, "",
InMemoryLevelDBAliasMapClient.class);
levelDBAliasMapServer.close();
FixedBlockResolver.class, clusterID,
LevelDBFileRegionAliasMap.class);
return tempDirectory;
}
@Test
public void testInMemoryAliasMap() throws Exception {
File aliasMapImage = createInMemoryAliasMapImage();
// start cluster with two datanodes,
// each with 1 PROVIDED volume and other DISK volume
conf.setBoolean(DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10);
startCluster(nnDirPath, 2,
new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
null, false);
verifyFileSystemContents();
FileUtils.deleteDirectory(tempDirectory);
verifyFileSystemContents(0);
FileUtils.deleteDirectory(aliasMapImage);
}
/**
* Find a free port that hasn't been assigned yet.
*
* @param usedPorts set of ports that have already been assigned.
* @param maxTrials maximum number of random ports to try before failure.
* @return an unassigned port.
*/
private int getUnAssignedPort(Set<Integer> usedPorts, int maxTrials) {
int count = 0;
while (count < maxTrials) {
int port = NetUtils.getFreeSocketPort();
if (usedPorts.contains(port)) {
count++;
} else {
return port;
}
}
return -1;
}
private static String providedNameservice;
/**
* Extends the {@link MiniDFSCluster.Builder} to create instances of
* {@link MiniDFSClusterBuilderAliasMap}.
*/
private static class MiniDFSClusterBuilderAliasMap
extends MiniDFSCluster.Builder {
MiniDFSClusterBuilderAliasMap(Configuration conf) {
super(conf);
}
@Override
public MiniDFSCluster build() throws IOException {
return new MiniDFSClusterAliasMap(this);
}
}
/**
* Extends {@link MiniDFSCluster} to correctly configure the InMemoryAliasMap.
*/
private static class MiniDFSClusterAliasMap extends MiniDFSCluster {
private Map<String, Collection<URI>> formattedDirsByNamespaceId;
private Set<Integer> completedNNs;
MiniDFSClusterAliasMap(MiniDFSCluster.Builder builder) throws IOException {
super(builder);
}
@Override
protected void initNameNodeConf(Configuration conf, String nameserviceId,
int nsIndex, String nnId, boolean manageNameDfsDirs,
boolean enableManagedDfsDirsRedundancy, int nnIndex)
throws IOException {
if (formattedDirsByNamespaceId == null) {
formattedDirsByNamespaceId = new HashMap<>();
completedNNs = new HashSet<>();
}
super.initNameNodeConf(conf, nameserviceId, nsIndex, nnId,
manageNameDfsDirs, enableManagedDfsDirsRedundancy, nnIndex);
if (providedNameservice.equals(nameserviceId)) {
// configure the InMemoryAliasMp.
conf.setBoolean(DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
String directory = conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR);
if (directory == null) {
throw new IllegalArgumentException("In-memory alias map configured"
+ "with the proper location; Set "
+ DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR);
}
// get the name of the directory (final component in path) used for map.
// Assume that the aliasmap configured with the same final component
// name in all Namenodes but is located in the path specified by
// DFS_NAMENODE_NAME_DIR_KEY
String dirName = new Path(directory).getName();
String nnDir =
conf.getTrimmedStringCollection(DFS_NAMENODE_NAME_DIR_KEY)
.iterator().next();
conf.set(DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
new File(new Path(nnDir, dirName).toUri()).getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true);
} else {
if (!completedNNs.contains(nnIndex)) {
// format the NN directories for non-provided namespaces
// if the directory for a namespace has been formatted, copy it over.
Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
if (formattedDirsByNamespaceId.containsKey(nameserviceId)) {
copyNameDirs(formattedDirsByNamespaceId.get(nameserviceId),
namespaceDirs, conf);
} else {
for (URI nameDirUri : namespaceDirs) {
File nameDir = new File(nameDirUri);
if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) {
throw new IOException("Could not fully delete " + nameDir);
}
}
HdfsServerConstants.StartupOption.FORMAT.setClusterId(clusterID);
DFSTestUtil.formatNameNode(conf);
formattedDirsByNamespaceId.put(nameserviceId, namespaceDirs);
}
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, false);
completedNNs.add(nnIndex);
}
}
}
}
/**
* Configures the addresseses of the InMemoryAliasMap.
*
* @param topology the MiniDFS topology to use.
* @param providedNameservice the nameservice id that supports provided.
*/
private void configureAliasMapAddresses(MiniDFSNNTopology topology,
String providedNameservice) {
conf.unset(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS);
Set<Integer> assignedPorts = new HashSet<>();
for (MiniDFSNNTopology.NSConf nsConf : topology.getNameservices()) {
for (MiniDFSNNTopology.NNConf nnConf : nsConf.getNNs()) {
if (providedNameservice.equals(nsConf.getId())) {
String key =
DFSUtil.addKeySuffixes(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
nsConf.getId(), nnConf.getNnId());
int port = getUnAssignedPort(assignedPorts, 10);
if (port == -1) {
throw new RuntimeException("No free ports available");
}
assignedPorts.add(port);
conf.set(key, "127.0.0.1:" + port);
String binHostKey =
DFSUtil.addKeySuffixes(
DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST,
nsConf.getId(), nnConf.getNnId());
conf.set(binHostKey, "0.0.0.0");
}
}
}
}
/**
* Verify the mounted contents of the Filesystem.
*
* @param topology the topology of the cluster.
* @param providedNameservice the namespace id of the provided namenodes.
* @throws Exception
*/
private void verifyPathsWithHAFailoverIfNecessary(MiniDFSNNTopology topology,
String providedNameservice) throws Exception {
List<Integer> nnIndexes = cluster.getNNIndexes(providedNameservice);
if (topology.isHA()) {
int nn1 = nnIndexes.get(0);
int nn2 = nnIndexes.get(1);
try {
verifyFileSystemContents(nn1);
fail("Read operation should fail as no Namenode is active");
} catch (RemoteException e) {
LOG.info("verifyPaths failed!. Expected exception: {}" + e);
}
cluster.transitionToActive(nn1);
LOG.info("Verifying data from NN with index = {}", nn1);
verifyFileSystemContents(nn1);
// transition to the second namenode.
cluster.transitionToStandby(nn1);
cluster.transitionToActive(nn2);
LOG.info("Verifying data from NN with index = {}", nn2);
verifyFileSystemContents(nn2);
cluster.shutdownNameNodes();
try {
verifyFileSystemContents(nn2);
fail("Read operation should fail as no Namenode is active");
} catch (NullPointerException e) {
LOG.info("verifyPaths failed!. Expected exception: {}" + e);
}
} else {
verifyFileSystemContents(nnIndexes.get(0));
}
}
@Test
public void testInMemoryAliasMapMultiTopologies() throws Exception {
MiniDFSNNTopology[] topologies =
new MiniDFSNNTopology[] {
MiniDFSNNTopology.simpleHATopology(),
MiniDFSNNTopology.simpleFederatedTopology(3),
MiniDFSNNTopology.simpleHAFederatedTopology(3)
};
for (MiniDFSNNTopology topology : topologies) {
LOG.info("Starting test with topology with HA = {}, federation = {}",
topology.isHA(), topology.isFederated());
setSeed();
createInMemoryAliasMapImage();
conf.setBoolean(DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10);
providedNameservice = topology.getNameservices().get(0).getId();
// configure the AliasMap addresses
configureAliasMapAddresses(topology, providedNameservice);
startCluster(nnDirPath, 2,
new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
null, false, null, topology,
new MiniDFSClusterBuilderAliasMap(conf));
verifyPathsWithHAFailoverIfNecessary(topology, providedNameservice);
shutdown();
}
}
private DatanodeDescriptor getDatanodeDescriptor(DatanodeManager dnm,
@ -919,7 +1192,7 @@ public void testProvidedWithHierarchicalTopology() throws Exception {
startCluster(nnDirPath, racks.length,
new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
null, false, racks);
verifyFileSystemContents();
verifyFileSystemContents(0);
setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix);
cluster.shutdown();
}