HADOOP-15684. triggerActiveLogRoll stuck on dead name node, when ConnectTimeoutException happens. Contributed by Rong Tang.
This commit is contained in:
parent
236d16e3a5
commit
090272d7de
@ -53,8 +53,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
|
||||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||||
@ -382,15 +380,6 @@ void triggerActiveLogRoll() {
|
|||||||
future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
|
future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
|
||||||
lastRollTriggerTxId = lastLoadedTxnId;
|
lastRollTriggerTxId = lastLoadedTxnId;
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
Throwable cause = e.getCause();
|
|
||||||
if (cause instanceof RemoteException) {
|
|
||||||
IOException ioe = ((RemoteException) cause).unwrapRemoteException();
|
|
||||||
if (ioe instanceof StandbyException) {
|
|
||||||
LOG.info("Skipping log roll. Remote node is not in Active state: " +
|
|
||||||
ioe.getMessage().split("\n")[0]);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.warn("Unable to trigger a roll of the active NN", e);
|
LOG.warn("Unable to trigger a roll of the active NN", e);
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
if (future != null) {
|
if (future != null) {
|
||||||
@ -497,7 +486,8 @@ private void doWork() {
|
|||||||
* This mechanism is <b>very bad</b> for cases where we care about being <i>fast</i>; it just
|
* This mechanism is <b>very bad</b> for cases where we care about being <i>fast</i>; it just
|
||||||
* blindly goes and tries namenodes.
|
* blindly goes and tries namenodes.
|
||||||
*/
|
*/
|
||||||
private abstract class MultipleNameNodeProxy<T> implements Callable<T> {
|
@VisibleForTesting
|
||||||
|
abstract class MultipleNameNodeProxy<T> implements Callable<T> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Do the actual work to the remote namenode via the {@link #cachedActiveProxy}.
|
* Do the actual work to the remote namenode via the {@link #cachedActiveProxy}.
|
||||||
@ -513,21 +503,15 @@ public T call() throws IOException {
|
|||||||
try {
|
try {
|
||||||
T ret = doWork();
|
T ret = doWork();
|
||||||
return ret;
|
return ret;
|
||||||
} catch (RemoteException e) {
|
} catch (IOException e) {
|
||||||
Throwable cause = e.unwrapRemoteException(StandbyException.class);
|
LOG.warn("Exception from remote name node " + currentNN
|
||||||
// if its not a standby exception, then we need to re-throw it, something bad has happened
|
+ ", try next.", e);
|
||||||
if (cause == e) {
|
|
||||||
throw e;
|
// Try next name node if exception happens.
|
||||||
} else {
|
|
||||||
// it is a standby exception, so we try the other NN
|
|
||||||
LOG.warn("Failed to reach remote node: " + currentNN
|
|
||||||
+ ", retrying with remaining remote NNs");
|
|
||||||
cachedActiveProxy = null;
|
cachedActiveProxy = null;
|
||||||
// this NN isn't responding to requests, try the next one
|
|
||||||
nnLoopCount++;
|
nnLoopCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
throw new IOException("Cannot find any valid remote NN to service request!");
|
throw new IOException("Cannot find any valid remote NN to service request!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,6 +72,23 @@ public static MiniDFSNNTopology simpleHATopology(int nnCount) {
|
|||||||
return topology;
|
return topology;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set up an HA topology with a single HA nameservice.
|
||||||
|
* @param nnCount of namenodes to use with the nameservice
|
||||||
|
* @param basePort for IPC and Http ports of namenodes.
|
||||||
|
*/
|
||||||
|
public static MiniDFSNNTopology simpleHATopology(int nnCount, int basePort) {
|
||||||
|
MiniDFSNNTopology.NSConf ns = new MiniDFSNNTopology.NSConf("minidfs-ns");
|
||||||
|
for (int i = 0; i < nnCount; i++) {
|
||||||
|
ns.addNN(new MiniDFSNNTopology.NNConf("nn" + i)
|
||||||
|
.setIpcPort(basePort++)
|
||||||
|
.setHttpPort(basePort++));
|
||||||
|
}
|
||||||
|
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
||||||
|
.addNameservice(ns);
|
||||||
|
return topology;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up federated cluster with the given number of nameservices, each
|
* Set up federated cluster with the given number of nameservices, each
|
||||||
* of which has only a single NameNode.
|
* of which has only a single NameNode.
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
@ -46,7 +47,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.net.ServerSocketUtil;
|
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.slf4j.event.Level;
|
import org.slf4j.event.Level;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -177,21 +177,7 @@ private static void testStandbyTriggersLogRolls(int activeIndex)
|
|||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
try {
|
try {
|
||||||
// Have to specify IPC ports so the NNs can talk to each other.
|
cluster = createMiniDFSCluster(conf, 3);
|
||||||
int[] ports = ServerSocketUtil.getPorts(3);
|
|
||||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
|
||||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
|
||||||
.addNN(new MiniDFSNNTopology.NNConf("nn1")
|
|
||||||
.setIpcPort(ports[0]))
|
|
||||||
.addNN(new MiniDFSNNTopology.NNConf("nn2")
|
|
||||||
.setIpcPort(ports[1]))
|
|
||||||
.addNN(new MiniDFSNNTopology.NNConf("nn3")
|
|
||||||
.setIpcPort(ports[2])));
|
|
||||||
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf)
|
|
||||||
.nnTopology(topology)
|
|
||||||
.numDataNodes(0)
|
|
||||||
.build();
|
|
||||||
break;
|
break;
|
||||||
} catch (BindException e) {
|
} catch (BindException e) {
|
||||||
// retry if race on ports given by ServerSocketUtil#getPorts
|
// retry if race on ports given by ServerSocketUtil#getPorts
|
||||||
@ -222,21 +208,9 @@ public void testTriggersLogRollsForAllStandbyNN() throws Exception {
|
|||||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100);
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100);
|
||||||
|
|
||||||
// Have to specify IPC ports so the NNs can talk to each other.
|
MiniDFSCluster cluster = null;
|
||||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
|
||||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
|
||||||
.addNN(new MiniDFSNNTopology.NNConf("nn1")
|
|
||||||
.setIpcPort(ServerSocketUtil.getPort(0, 100)))
|
|
||||||
.addNN(new MiniDFSNNTopology.NNConf("nn2")
|
|
||||||
.setIpcPort(ServerSocketUtil.getPort(0, 100)))
|
|
||||||
.addNN(new MiniDFSNNTopology.NNConf("nn3")
|
|
||||||
.setIpcPort(ServerSocketUtil.getPort(0, 100))));
|
|
||||||
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
||||||
.nnTopology(topology)
|
|
||||||
.numDataNodes(0)
|
|
||||||
.build();
|
|
||||||
try {
|
try {
|
||||||
|
cluster = createMiniDFSCluster(conf, 3);
|
||||||
cluster.transitionToStandby(0);
|
cluster.transitionToStandby(0);
|
||||||
cluster.transitionToStandby(1);
|
cluster.transitionToStandby(1);
|
||||||
cluster.transitionToStandby(2);
|
cluster.transitionToStandby(2);
|
||||||
@ -249,9 +223,11 @@ public void testTriggersLogRollsForAllStandbyNN() throws Exception {
|
|||||||
cluster.transitionToActive(0);
|
cluster.transitionToActive(0);
|
||||||
waitForLogRollInSharedDir(cluster, 3);
|
waitForLogRollInSharedDir(cluster, 3);
|
||||||
} finally {
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static String getDirPath(int suffix) {
|
private static String getDirPath(int suffix) {
|
||||||
return DIR_PREFIX + suffix;
|
return DIR_PREFIX + suffix;
|
||||||
@ -316,4 +292,64 @@ public Void call() throws Exception {
|
|||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRollEditLogIOExceptionForRemoteNN() throws IOException {
|
||||||
|
Configuration conf = getConf();
|
||||||
|
|
||||||
|
// Roll every 1s
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||||
|
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
cluster = createMiniDFSCluster(conf, 3);
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
EditLogTailer tailer = Mockito.spy(
|
||||||
|
cluster.getNamesystem(1).getEditLogTailer());
|
||||||
|
|
||||||
|
final AtomicInteger invokedTimes = new AtomicInteger(0);
|
||||||
|
|
||||||
|
// It should go on to next name node when IOException happens.
|
||||||
|
when(tailer.getNameNodeProxy()).thenReturn(
|
||||||
|
tailer.new MultipleNameNodeProxy<Void>() {
|
||||||
|
@Override
|
||||||
|
protected Void doWork() throws IOException {
|
||||||
|
invokedTimes.getAndIncrement();
|
||||||
|
throw new IOException("It is an IO Exception.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
tailer.triggerActiveLogRoll();
|
||||||
|
|
||||||
|
// MultipleNameNodeProxy uses Round-robin to look for active NN
|
||||||
|
// to do RollEditLog. If doWork() fails, then IOException throws,
|
||||||
|
// it continues to try next NN. triggerActiveLogRoll finishes
|
||||||
|
// either due to success, or using up retries.
|
||||||
|
// In this test case, there are 2 remote name nodes, default retry is 3.
|
||||||
|
// For test purpose, doWork() always returns IOException,
|
||||||
|
// so the total invoked times will be default retry 3 * remote NNs 2 = 6
|
||||||
|
assertEquals(6, invokedTimes.get());
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
|
||||||
|
int nnCount) throws IOException {
|
||||||
|
int basePort = 10060 + new Random().nextInt(100) * 2;
|
||||||
|
|
||||||
|
// By passing in basePort, name node will have IPC port set,
|
||||||
|
// which is needed for enabling roll log.
|
||||||
|
MiniDFSNNTopology topology =
|
||||||
|
MiniDFSNNTopology.simpleHATopology(nnCount, basePort);
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.nnTopology(topology)
|
||||||
|
.numDataNodes(0)
|
||||||
|
.build();
|
||||||
|
return cluster;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user