HDFS-13369. Fix for FSCK Report broken with RequestHedgingProxyProvider (#4917)

Contributed-by: navinko <nakumr@cloudera.com>
This commit is contained in:
Navink 2022-09-30 20:58:12 +05:30 committed by GitHub
parent e22f5e75ae
commit 4891bf5049
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 171 additions and 64 deletions

View File

@ -124,12 +124,28 @@ public static void setCallIdAndRetryCount(int cid, int rc,
Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID); Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
Preconditions.checkState(callId.get() == null); Preconditions.checkState(callId.get() == null);
Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT); Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);
setCallIdAndRetryCountUnprotected(cid, rc, externalHandler);
}
public static void setCallIdAndRetryCountUnprotected(Integer cid, int rc,
Object externalHandler) {
callId.set(cid); callId.set(cid);
retryCount.set(rc); retryCount.set(rc);
EXTERNAL_CALL_HANDLER.set(externalHandler); EXTERNAL_CALL_HANDLER.set(externalHandler);
} }
public static int getCallId() {
return callId.get() != null ? callId.get() : nextCallId();
}
public static int getRetryCount() {
return retryCount.get() != null ? retryCount.get() : 0;
}
public static Object getExternalHandler() {
return EXTERNAL_CALL_HANDLER.get();
}
private final ConcurrentMap<ConnectionId, Connection> connections = private final ConcurrentMap<ConnectionId, Connection> connections =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private final Object putLock = new Object(); private final Object putLock = new Object();

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
@ -27,20 +26,24 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.io.retry.MultiException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.ipc.StandbyException;
/** /**
* A FailoverProxyProvider implementation that technically does not "failover" * A FailoverProxyProvider implementation that technically does not "failover"
* per-se. It constructs a wrapper proxy that sends the request to ALL * per-se. It constructs a wrapper proxy that sends the request to ALL
@ -55,7 +58,7 @@ public class RequestHedgingProxyProvider<T> extends
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(RequestHedgingProxyProvider.class); LoggerFactory.getLogger(RequestHedgingProxyProvider.class);
class RequestHedgingInvocationHandler implements InvocationHandler { class RequestHedgingInvocationHandler implements RpcInvocationHandler {
final Map<String, ProxyInfo<T>> targetProxies; final Map<String, ProxyInfo<T>> targetProxies;
// Proxy of the active nn // Proxy of the active nn
@ -123,11 +126,18 @@ public RequestHedgingInvocationHandler(
} }
executor = Executors.newFixedThreadPool(proxies.size()); executor = Executors.newFixedThreadPool(proxies.size());
completionService = new ExecutorCompletionService<>(executor); completionService = new ExecutorCompletionService<>(executor);
// Set the callId and other informations from current thread.
final int callId = Client.getCallId();
final int retryCount = Client.getRetryCount();
final Object externalHandler = Client.getExternalHandler();
for (final Map.Entry<String, ProxyInfo<T>> pEntry : targetProxies for (final Map.Entry<String, ProxyInfo<T>> pEntry : targetProxies
.entrySet()) { .entrySet()) {
Callable<Object> c = new Callable<Object>() { Callable<Object> c = new Callable<Object>() {
@Override @Override
public Object call() throws Exception { public Object call() throws Exception {
// Call Id and other informations from parent thread.
Client.setCallIdAndRetryCount(callId, retryCount,
externalHandler);
LOG.trace("Invoking method {} on proxy {}", method, LOG.trace("Invoking method {} on proxy {}", method,
pEntry.getValue().proxyInfo); pEntry.getValue().proxyInfo);
return method.invoke(pEntry.getValue().proxy, args); return method.invoke(pEntry.getValue().proxy, args);
@ -136,7 +146,9 @@ public Object call() throws Exception {
proxyMap.put(completionService.submit(c), pEntry.getValue()); proxyMap.put(completionService.submit(c), pEntry.getValue());
numAttempts++; numAttempts++;
} }
// Current thread's callId will not be cleared as RPC happens in
// separate threads. Reset the CallId information Forcefully.
Client.setCallIdAndRetryCountUnprotected(null, 0, null);
Map<String, Exception> badResults = new HashMap<>(); Map<String, Exception> badResults = new HashMap<>();
while (numAttempts > 0) { while (numAttempts > 0) {
Future<Object> callResultFuture = completionService.take(); Future<Object> callResultFuture = completionService.take();
@ -189,6 +201,18 @@ public Object call() throws Exception {
throw unwrappedException; throw unwrappedException;
} }
} }
@Override
public void close() throws IOException {
}
@Override
public ConnectionId getConnectionId() {
if (currentUsedProxy == null) {
return null;
}
return RPC.getConnectionIdForProxy(currentUsedProxy.proxy);
}
} }
/** A proxy wrapping {@link RequestHedgingInvocationHandler}. */ /** A proxy wrapping {@link RequestHedgingInvocationHandler}. */

View File

@ -20,6 +20,7 @@
import java.io.EOFException; import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Proxy;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
@ -34,6 +35,7 @@
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.io.retry.MultiException; import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -101,6 +103,8 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
RequestHedgingProxyProvider<ClientProtocol> provider = RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
createFactory(badMock, goodMock)); createFactory(badMock, goodMock));
Assert.assertTrue(Proxy.getInvocationHandler(
provider.getProxy().proxy) instanceof RpcInvocationHandler);
long[] stats = provider.getProxy().proxy.getStats(); long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1); Assert.assertTrue(stats.length == 1);
Mockito.verify(badMock).getStats(); Mockito.verify(badMock).getStats();

View File

@ -165,7 +165,8 @@ public void testFormatShouldBeIgnoredForNonFileBasedDirs() throws Exception {
String localhost = "127.0.0.1"; String localhost = "127.0.0.1";
InetSocketAddress nnAddr1 = new InetSocketAddress(localhost, 8020); InetSocketAddress nnAddr1 = new InetSocketAddress(localhost, 8020);
InetSocketAddress nnAddr2 = new InetSocketAddress(localhost, 8020); InetSocketAddress nnAddr2 = new InetSocketAddress(localhost, 8020);
HATestUtil.setFailoverConfigurations(conf, logicalName, nnAddr1, nnAddr2); HATestUtil.setFailoverConfigurations(conf, logicalName, null,
nnAddr1, nnAddr2);
conf.set(DFS_NAMENODE_NAME_DIR_KEY, conf.set(DFS_NAMENODE_NAME_DIR_KEY,
new File(DFS_BASE_DIR, "name").getAbsolutePath()); new File(DFS_BASE_DIR, "name").getAbsolutePath());

View File

@ -17,13 +17,6 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSUtil.createUri;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
@ -36,11 +29,12 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.Supplier;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.ClientGSIContext; import org.apache.hadoop.hdfs.ClientGSIContext;
@ -49,6 +43,7 @@
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -59,47 +54,52 @@
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import java.util.function.Supplier; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSUtil.createUri;
/** /**
* Static utility functions useful for testing HA. * Static utility functions useful for testing HA.
*/ */
public abstract class HATestUtil { public abstract class HATestUtil {
private static final Logger LOG = LoggerFactory.getLogger(HATestUtil.class); private static final Logger LOG = LoggerFactory.getLogger(HATestUtil.class);
private static final String LOGICAL_HOSTNAME = "ha-nn-uri-%d"; private static final String LOGICAL_HOSTNAME = "ha-nn-uri-%d";
/** /**
* Trigger an edits log roll on the active and then wait for the standby to * Trigger an edits log roll on the active and then wait for the standby to
* catch up to all the edits done by the active. This method will check * catch up to all the edits done by the active. This method will check
* repeatedly for up to NN_LAG_TIMEOUT milliseconds, and then fail throwing * repeatedly for up to NN_LAG_TIMEOUT milliseconds, and then fail throwing
* {@link CouldNotCatchUpException} * {@link CouldNotCatchUpException}
* *
* @param active active NN * @param active active NN
* @param standby standby NN which should catch up to active * @param standby standby NN which should catch up to active
* @throws IOException if an error occurs rolling the edit log * @throws IOException if an error occurs rolling the edit log
* @throws CouldNotCatchUpException if the standby doesn't catch up to the * @throws CouldNotCatchUpException if the standby doesn't catch up to the
* active in NN_LAG_TIMEOUT milliseconds * active in NN_LAG_TIMEOUT milliseconds
*/ */
public static void waitForStandbyToCatchUp(NameNode active, public static void waitForStandbyToCatchUp(NameNode active, NameNode standby)
NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException { throws InterruptedException, IOException, CouldNotCatchUpException {
long activeTxId = active.getNamesystem().getFSImage().getEditLog() long activeTxId =
.getLastWrittenTxId(); active.getNamesystem().getFSImage().getEditLog().getLastWrittenTxId();
active.getRpcServer().rollEditLog(); active.getRpcServer().rollEditLog();
long start = Time.now(); long start = Time.now();
while (Time.now() - start < TestEditLogTailer.NN_LAG_TIMEOUT) { while (Time.now() - start < TestEditLogTailer.NN_LAG_TIMEOUT) {
long nn2HighestTxId = standby.getNamesystem().getFSImage() long nn2HighestTxId =
.getLastAppliedTxId(); standby.getNamesystem().getFSImage().getLastAppliedTxId();
if (nn2HighestTxId >= activeTxId) { if (nn2HighestTxId >= activeTxId) {
return; return;
} }
Thread.sleep(TestEditLogTailer.SLEEP_TIME); Thread.sleep(TestEditLogTailer.SLEEP_TIME);
} }
throw new CouldNotCatchUpException("Standby did not catch up to txid " + throw new CouldNotCatchUpException(
activeTxId + " (currently at " + "Standby did not catch up to txid " + activeTxId + " (currently at "
standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")"); + standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")");
} }
/** /**
@ -119,7 +119,7 @@ public Boolean get() {
return true; return true;
} }
}, 1000, 10000); }, 1000, 10000);
} }
/** /**
@ -144,16 +144,18 @@ public CouldNotCatchUpException(String message) {
super(message); super(message);
} }
} }
/** Gets the filesystem instance by setting the failover configurations */ /**
* Gets the filesystem instance by setting the failover configurations.
*/
public static DistributedFileSystem configureFailoverFs( public static DistributedFileSystem configureFailoverFs(
MiniDFSCluster cluster, Configuration conf) MiniDFSCluster cluster, Configuration conf)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
return configureFailoverFs(cluster, conf, 0); return configureFailoverFs(cluster, conf, 0);
} }
/** /**
* Gets the filesystem instance by setting the failover configurations * Gets the filesystem instance by setting the failover configurations.
* @param cluster the single process DFS cluster * @param cluster the single process DFS cluster
* @param conf cluster configuration * @param conf cluster configuration
* @param nsIndex namespace index starting with zero * @param nsIndex namespace index starting with zero
@ -164,13 +166,13 @@ public static DistributedFileSystem configureFailoverFs(
int nsIndex) throws IOException, URISyntaxException { int nsIndex) throws IOException, URISyntaxException {
conf = new Configuration(conf); conf = new Configuration(conf);
String logicalName = getLogicalHostname(cluster); String logicalName = getLogicalHostname(cluster);
setFailoverConfigurations(cluster, conf, logicalName, nsIndex); setFailoverConfigurations(cluster, conf, logicalName, null, nsIndex);
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf); FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
return (DistributedFileSystem)fs; return (DistributedFileSystem)fs;
} }
public static <P extends ObserverReadProxyProvider<?>> public static <P extends ObserverReadProxyProvider<?>>
DistributedFileSystem configureObserverReadFs( DistributedFileSystem configureObserverReadFs(
MiniDFSCluster cluster, Configuration conf, MiniDFSCluster cluster, Configuration conf,
Class<P> classFPP, boolean isObserverReadEnabled) Class<P> classFPP, boolean isObserverReadEnabled)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
@ -246,8 +248,8 @@ public static MiniQJMHACluster setUpObserverCluster(
return qjmhaCluster; return qjmhaCluster;
} }
public static <P extends FailoverProxyProvider<?>> public static <P extends FailoverProxyProvider<?>> void
void setupHAConfiguration(MiniDFSCluster cluster, setupHAConfiguration(MiniDFSCluster cluster,
Configuration conf, int nsIndex, Class<P> classFPP) { Configuration conf, int nsIndex, Class<P> classFPP) {
MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex); MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
List<String> nnAddresses = new ArrayList<String>(); List<String> nnAddresses = new ArrayList<String>();
@ -264,18 +266,23 @@ public static void setFailoverConfigurations(MiniDFSCluster cluster,
Configuration conf) { Configuration conf) {
setFailoverConfigurations(cluster, conf, getLogicalHostname(cluster)); setFailoverConfigurations(cluster, conf, getLogicalHostname(cluster));
} }
/** Sets the required configurations for performing failover of default namespace. */ /** Sets the required configurations for performing failover of default namespace. */
public static void setFailoverConfigurations(MiniDFSCluster cluster, public static void setFailoverConfigurations(MiniDFSCluster cluster,
Configuration conf, String logicalName) { Configuration conf, String logicalName) {
setFailoverConfigurations(cluster, conf, logicalName, 0); setFailoverConfigurations(cluster, conf, logicalName, null, 0);
} }
/** Sets the required configurations for performing failover. */ /** Sets the required configurations for performing failover. */
public static void setFailoverConfigurations(MiniDFSCluster cluster, public static void setFailoverConfigurations(MiniDFSCluster cluster,
Configuration conf, String logicalName, int nsIndex) { Configuration conf, String logicalName, String proxyProvider,
setFailoverConfigurations(cluster, conf, logicalName, nsIndex, int nsIndex) {
ConfiguredFailoverProxyProvider.class); MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
List<InetSocketAddress> nnAddresses = new ArrayList<InetSocketAddress>(3);
for (MiniDFSCluster.NameNodeInfo nn : nns) {
nnAddresses.add(nn.nameNode.getNameNodeAddress());
}
setFailoverConfigurations(conf, logicalName, proxyProvider, nnAddresses);
} }
/** Sets the required configurations for performing failover. */ /** Sets the required configurations for performing failover. */
@ -290,19 +297,56 @@ public static void setFailoverConfigurations(MiniDFSCluster cluster,
setFailoverConfigurations(conf, logicalName, nnAddresses, classFPP); setFailoverConfigurations(conf, logicalName, nnAddresses, classFPP);
} }
public static void setFailoverConfigurations(Configuration conf, String logicalName, public static void setFailoverConfigurations(Configuration conf,
InetSocketAddress ... nnAddresses){ String logicalName, String proxyProvider,
setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses), InetSocketAddress... nnAddresses) {
ConfiguredFailoverProxyProvider.class); setFailoverConfigurations(conf, logicalName, proxyProvider,
Arrays.asList(nnAddresses));
} }
/** /**
* Sets the required configurations for performing failover * Sets the required configurations for performing failover.
*/
public static void setFailoverConfigurations(
Configuration conf, String logicalName,
String proxyProvider, List<InetSocketAddress> nnAddresses) {
final List<String> addresses = new ArrayList<>();
nnAddresses.forEach(addr ->
addresses.add("hdfs://" + addr.getHostName() + ":" + addr.getPort()));
setFailoverConfigurations(conf, logicalName, proxyProvider, addresses);
}
public static void setFailoverConfigurations(
Configuration conf, String logicalName,
String proxyProvider, Iterable<String> nnAddresses) {
List<String> nnids = new ArrayList<String>();
int i = 0;
for (String address : nnAddresses) {
String nnId = "nn" + (i + 1);
nnids.add(nnId);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, logicalName, nnId), address);
i++;
}
conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName);
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName),
Joiner.on(',').join(nnids));
if (proxyProvider == null) {
conf.set(Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
ConfiguredFailoverProxyProvider.class.getName());
} else {
conf.set(Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
proxyProvider);
}
conf.set("fs.defaultFS", "hdfs://" + logicalName);
}
/**
* Sets the required configurations for performing failover.
*/ */
public static <P extends FailoverProxyProvider<?>> void public static <P extends FailoverProxyProvider<?>> void
setFailoverConfigurations(Configuration conf, String logicalName, setFailoverConfigurations(Configuration conf, String logicalName,
List<InetSocketAddress> nnAddresses, Class<P> classFPP) { List<InetSocketAddress> nnAddresses, Class<P> classFPP) {
final List<String> addresses = new ArrayList(); final List<String> addresses = new ArrayList<>();
nnAddresses.forEach( nnAddresses.forEach(
addr -> addresses.add( addr -> addresses.add(
"hdfs://" + addr.getHostName() + ":" + addr.getPort())); "hdfs://" + addr.getHostName() + ":" + addr.getPort()));
@ -310,7 +354,7 @@ public static void setFailoverConfigurations(Configuration conf, String logicalN
} }
public static <P extends FailoverProxyProvider<?>> public static <P extends FailoverProxyProvider<?>>
void setFailoverConfigurations( void setFailoverConfigurations(
Configuration conf, String logicalName, Configuration conf, String logicalName,
Iterable<String> nnAddresses, Class<P> classFPP) { Iterable<String> nnAddresses, Class<P> classFPP) {
List<String> nnids = new ArrayList<String>(); List<String> nnids = new ArrayList<String>();
@ -332,13 +376,13 @@ void setFailoverConfigurations(
public static String getLogicalHostname(MiniDFSCluster cluster) { public static String getLogicalHostname(MiniDFSCluster cluster) {
return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId()); return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId());
} }
public static URI getLogicalUri(MiniDFSCluster cluster) public static URI getLogicalUri(MiniDFSCluster cluster)
throws URISyntaxException { throws URISyntaxException {
return new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + return new URI(HdfsConstants.HDFS_URI_SCHEME + "://" +
getLogicalHostname(cluster)); getLogicalHostname(cluster));
} }
public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx, public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx,
List<Integer> txids) throws InterruptedException { List<Integer> txids) throws InterruptedException {
long start = Time.now(); long start = Time.now();

View File

@ -95,7 +95,7 @@ public void setupCluster() throws Exception {
cluster.waitActive(); cluster.waitActive();
String logicalName = HATestUtil.getLogicalHostname(cluster); String logicalName = HATestUtil.getLogicalHostname(cluster);
HATestUtil.setFailoverConfigurations(cluster, conf, logicalName, 0); HATestUtil.setFailoverConfigurations(cluster, conf, logicalName, null, 0);
nn0 = cluster.getNameNode(0); nn0 = cluster.getNameNode(0);
nn1 = cluster.getNameNode(1); nn1 = cluster.getNameNode(1);

View File

@ -17,11 +17,14 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.slf4j.event.Level;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -32,15 +35,30 @@
import org.apache.hadoop.hdfs.tools.DFSck; import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.slf4j.event.Level;
import org.junit.Test; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestHAFsck { public class TestHAFsck {
static { static {
GenericTestUtils.setLogLevel(DFSUtil.LOG, Level.TRACE); GenericTestUtils.setLogLevel(DFSUtil.LOG, Level.TRACE);
} }
@Parameter
private String proxyProvider;
public String getProxyProvider() {
return proxyProvider;
}
@Parameterized.Parameters(name = "ProxyProvider: {0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][]
{{ConfiguredFailoverProxyProvider.class.getName()},
{RequestHedgingProxyProvider.class.getName()}});
}
/** /**
* Test that fsck still works with HA enabled. * Test that fsck still works with HA enabled.
*/ */
@ -65,9 +83,9 @@ public void testHaFsck() throws Exception {
cluster.transitionToActive(0); cluster.transitionToActive(0);
// Make sure conf has the relevant HA configs. // Make sure conf has the relevant HA configs.
HATestUtil.setFailoverConfigurations(cluster, conf, "ha-nn-uri-0", 0); HATestUtil.setFailoverConfigurations(cluster, conf, "ha-nn-uri-0", getProxyProvider(), 0);
fs = HATestUtil.configureFailoverFs(cluster, conf); fs = FileSystem.get(conf);
fs.mkdirs(new Path("/test1")); fs.mkdirs(new Path("/test1"));
fs.mkdirs(new Path("/test2")); fs.mkdirs(new Path("/test2"));