HDFS-4404. Create file failure when the machine of first attempted NameNode is down. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1442461 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5d679c4f43
commit
5a0b746639
@ -35,6 +35,7 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
@ -543,6 +544,7 @@ public RetryAction shouldRetry(Exception e, int retries,
|
||||
e instanceof NoRouteToHostException ||
|
||||
e instanceof UnknownHostException ||
|
||||
e instanceof StandbyException ||
|
||||
e instanceof ConnectTimeoutException ||
|
||||
isWrappedStandbyException(e)) {
|
||||
return new RetryAction(
|
||||
RetryAction.RetryDecision.FAILOVER_AND_RETRY,
|
||||
|
@ -67,6 +67,7 @@
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
|
||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
import org.apache.hadoop.security.SaslRpcClient;
|
||||
@ -511,14 +512,14 @@ private synchronized void setupConnection() throws IOException {
|
||||
}
|
||||
this.socket.setSoTimeout(pingInterval);
|
||||
return;
|
||||
} catch (SocketTimeoutException toe) {
|
||||
} catch (ConnectTimeoutException toe) {
|
||||
/* Check for an address change and update the local reference.
|
||||
* Reset the failure counter if the address was changed
|
||||
*/
|
||||
if (updateAddress()) {
|
||||
timeoutFailures = ioFailures = 0;
|
||||
}
|
||||
handleConnectionFailure(timeoutFailures++,
|
||||
handleConnectionTimeout(timeoutFailures++,
|
||||
maxRetriesOnSocketTimeouts, toe);
|
||||
} catch (IOException ie) {
|
||||
if (updateAddress()) {
|
||||
@ -680,7 +681,7 @@ private void closeConnection() {
|
||||
socket = null;
|
||||
}
|
||||
|
||||
/* Handle connection failures
|
||||
/* Handle connection failures due to timeout on connect
|
||||
*
|
||||
* If the current number of retries is equal to the max number of retries,
|
||||
* stop retrying and throw the exception; Otherwise backoff 1 second and
|
||||
@ -694,7 +695,7 @@ private void closeConnection() {
|
||||
* @param ioe failure reason
|
||||
* @throws IOException if max number of retries is reached
|
||||
*/
|
||||
private void handleConnectionFailure(
|
||||
private void handleConnectionTimeout(
|
||||
int curRetries, int maxRetries, IOException ioe) throws IOException {
|
||||
|
||||
closeConnection();
|
||||
|
@ -0,0 +1,37 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.net;
|
||||
|
||||
import java.net.SocketTimeoutException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Thrown by {@link NetUtils#connect(java.net.Socket, java.net.SocketAddress, int)}
|
||||
* if it times out while connecting to the remote host.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public class ConnectTimeoutException extends SocketTimeoutException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public ConnectTimeoutException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
@ -20,6 +20,7 @@
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.net.BindException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -517,11 +518,15 @@ public static void connect(Socket socket,
|
||||
socket.bind(localAddr);
|
||||
}
|
||||
|
||||
if (ch == null) {
|
||||
// let the default implementation handle it.
|
||||
socket.connect(endpoint, timeout);
|
||||
} else {
|
||||
SocketIOWithTimeout.connect(ch, endpoint, timeout);
|
||||
try {
|
||||
if (ch == null) {
|
||||
// let the default implementation handle it.
|
||||
socket.connect(endpoint, timeout);
|
||||
} else {
|
||||
SocketIOWithTimeout.connect(ch, endpoint, timeout);
|
||||
}
|
||||
} catch (SocketTimeoutException ste) {
|
||||
throw new ConnectTimeoutException(ste.getMessage());
|
||||
}
|
||||
|
||||
// There is a very rare case allowed by the TCP specification, such that
|
||||
@ -719,7 +724,7 @@ public static IOException wrapException(final String destHost,
|
||||
+ see("BindException"));
|
||||
} else if (exception instanceof ConnectException) {
|
||||
// connection refused; include the host:port in the error
|
||||
return (ConnectException) new ConnectException(
|
||||
return wrapWithMessage(exception,
|
||||
"Call From "
|
||||
+ localHost
|
||||
+ " to "
|
||||
@ -729,32 +734,28 @@ public static IOException wrapException(final String destHost,
|
||||
+ " failed on connection exception: "
|
||||
+ exception
|
||||
+ ";"
|
||||
+ see("ConnectionRefused"))
|
||||
.initCause(exception);
|
||||
+ see("ConnectionRefused"));
|
||||
} else if (exception instanceof UnknownHostException) {
|
||||
return (UnknownHostException) new UnknownHostException(
|
||||
return wrapWithMessage(exception,
|
||||
"Invalid host name: "
|
||||
+ getHostDetailsAsString(destHost, destPort, localHost)
|
||||
+ exception
|
||||
+ ";"
|
||||
+ see("UnknownHost"))
|
||||
.initCause(exception);
|
||||
+ see("UnknownHost"));
|
||||
} else if (exception instanceof SocketTimeoutException) {
|
||||
return (SocketTimeoutException) new SocketTimeoutException(
|
||||
return wrapWithMessage(exception,
|
||||
"Call From "
|
||||
+ localHost + " to " + destHost + ":" + destPort
|
||||
+ " failed on socket timeout exception: " + exception
|
||||
+ ";"
|
||||
+ see("SocketTimeout"))
|
||||
.initCause(exception);
|
||||
+ see("SocketTimeout"));
|
||||
} else if (exception instanceof NoRouteToHostException) {
|
||||
return (NoRouteToHostException) new NoRouteToHostException(
|
||||
return wrapWithMessage(exception,
|
||||
"No Route to Host from "
|
||||
+ localHost + " to " + destHost + ":" + destPort
|
||||
+ " failed on socket timeout exception: " + exception
|
||||
+ ";"
|
||||
+ see("NoRouteToHost"))
|
||||
.initCause(exception);
|
||||
+ see("NoRouteToHost"));
|
||||
}
|
||||
else {
|
||||
return (IOException) new IOException("Failed on local exception: "
|
||||
@ -769,6 +770,21 @@ public static IOException wrapException(final String destHost,
|
||||
private static String see(final String entry) {
|
||||
return FOR_MORE_DETAILS_SEE + HADOOP_WIKI + entry;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T extends IOException> T wrapWithMessage(
|
||||
T exception, String msg) {
|
||||
Class<? extends Throwable> clazz = exception.getClass();
|
||||
try {
|
||||
Constructor<? extends Throwable> ctor = clazz.getConstructor(String.class);
|
||||
Throwable t = ctor.newInstance(msg);
|
||||
return (T)(t.initCause(exception));
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Unable to wrap exception of type " +
|
||||
clazz + ": it has no (String) constructor", e);
|
||||
return exception;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the host details as a string
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
||||
import java.util.Random;
|
||||
@ -586,7 +587,7 @@ public void testConnectionRetriesOnSocketTimeoutExceptions() throws Exception {
|
||||
private void assertRetriesOnSocketTimeouts(Configuration conf,
|
||||
int maxTimeoutRetries) throws IOException, InterruptedException {
|
||||
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
|
||||
doThrow(new SocketTimeoutException()).when(mockFactory).createSocket();
|
||||
doThrow(new ConnectTimeoutException("fake")).when(mockFactory).createSocket();
|
||||
Client client = new Client(IntWritable.class, conf, mockFactory);
|
||||
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090);
|
||||
try {
|
||||
|
@ -756,6 +756,9 @@ Release 2.0.3-alpha - Unreleased
|
||||
HDFS-4462. 2NN will fail to checkpoint after an HDFS upgrade from a
|
||||
pre-federation version of HDFS. (atm)
|
||||
|
||||
HDFS-4404. Create file failure when the machine of first attempted NameNode
|
||||
is down. (Todd Lipcon via atm)
|
||||
|
||||
BREAKDOWN OF HDFS-3077 SUBTASKS
|
||||
|
||||
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
|
||||
|
@ -23,22 +23,34 @@
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||
import org.apache.hadoop.net.StandardSocketFactory;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.hamcrest.BaseMatcher;
|
||||
import org.hamcrest.Description;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestDFSClientFailover {
|
||||
|
||||
@ -91,6 +103,63 @@ public void testDfsClientFailover() throws IOException, URISyntaxException {
|
||||
fs.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that even a non-idempotent method will properly fail-over if the
|
||||
* first IPC attempt times out trying to connect. Regression test for
|
||||
* HDFS-4404.
|
||||
*/
|
||||
@Test
|
||||
public void testFailoverOnConnectTimeout() throws Exception {
|
||||
conf.setClass(CommonConfigurationKeysPublic.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
|
||||
InjectingSocketFactory.class, SocketFactory.class);
|
||||
// Set up the InjectingSocketFactory to throw a ConnectTimeoutException
|
||||
// when connecting to the first NN.
|
||||
InjectingSocketFactory.portToInjectOn = cluster.getNameNodePort(0);
|
||||
|
||||
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||
|
||||
// Make the second NN the active one.
|
||||
cluster.shutdownNameNode(0);
|
||||
cluster.transitionToActive(1);
|
||||
|
||||
// Call a non-idempotent method, and ensure the failover of the call proceeds
|
||||
// successfully.
|
||||
IOUtils.closeStream(fs.create(TEST_FILE));
|
||||
}
|
||||
|
||||
private static class InjectingSocketFactory extends StandardSocketFactory {
|
||||
|
||||
static SocketFactory defaultFactory = SocketFactory.getDefault();
|
||||
|
||||
static int portToInjectOn;
|
||||
|
||||
@Override
|
||||
public Socket createSocket() throws IOException {
|
||||
Socket spy = Mockito.spy(defaultFactory.createSocket());
|
||||
// Simplify our spying job by not having to also spy on the channel
|
||||
Mockito.doReturn(null).when(spy).getChannel();
|
||||
// Throw a ConnectTimeoutException when connecting to our target "bad"
|
||||
// host.
|
||||
Mockito.doThrow(new ConnectTimeoutException("injected"))
|
||||
.when(spy).connect(
|
||||
Mockito.argThat(new MatchesPort()),
|
||||
Mockito.anyInt());
|
||||
return spy;
|
||||
}
|
||||
|
||||
private class MatchesPort extends BaseMatcher<SocketAddress> {
|
||||
@Override
|
||||
public boolean matches(Object arg0) {
|
||||
return ((InetSocketAddress)arg0).getPort() == portToInjectOn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description desc) {
|
||||
desc.appendText("matches port " + portToInjectOn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Regression test for HDFS-2683.
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user