HDFS-5399. Revisit SafeModeException and corresponding retry policies. Contributed by Haohui Mai.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1564629 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
96578f0e01
commit
0aa09f6d5a
@ -151,6 +151,13 @@ public static final RetryPolicy failoverOnNetworkException(
|
||||
delayMillis, maxDelayBase);
|
||||
}
|
||||
|
||||
public static final RetryPolicy failoverOnNetworkException(
|
||||
RetryPolicy fallbackPolicy, int maxFailovers, int maxRetries,
|
||||
long delayMillis, long maxDelayBase) {
|
||||
return new FailoverOnNetworkExceptionRetry(fallbackPolicy, maxFailovers,
|
||||
maxRetries, delayMillis, maxDelayBase);
|
||||
}
|
||||
|
||||
static class TryOnceThenFail implements RetryPolicy {
|
||||
@Override
|
||||
public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
||||
@ -516,18 +523,25 @@ static class FailoverOnNetworkExceptionRetry implements RetryPolicy {
|
||||
|
||||
private RetryPolicy fallbackPolicy;
|
||||
private int maxFailovers;
|
||||
private int maxRetries;
|
||||
private long delayMillis;
|
||||
private long maxDelayBase;
|
||||
|
||||
public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
|
||||
int maxFailovers) {
|
||||
this(fallbackPolicy, maxFailovers, 0, 0);
|
||||
this(fallbackPolicy, maxFailovers, 0, 0, 0);
|
||||
}
|
||||
|
||||
public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
|
||||
int maxFailovers, long delayMillis, long maxDelayBase) {
|
||||
this(fallbackPolicy, maxFailovers, 0, delayMillis, maxDelayBase);
|
||||
}
|
||||
|
||||
public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
|
||||
int maxFailovers, int maxRetries, long delayMillis, long maxDelayBase) {
|
||||
this.fallbackPolicy = fallbackPolicy;
|
||||
this.maxFailovers = maxFailovers;
|
||||
this.maxRetries = maxRetries;
|
||||
this.delayMillis = delayMillis;
|
||||
this.maxDelayBase = maxDelayBase;
|
||||
}
|
||||
@ -549,6 +563,10 @@ public RetryAction shouldRetry(Exception e, int retries,
|
||||
"failovers (" + failovers + ") exceeded maximum allowed ("
|
||||
+ maxFailovers + ")");
|
||||
}
|
||||
if (retries - failovers > maxRetries) {
|
||||
return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "retries ("
|
||||
+ retries + ") exceeded maximum allowed (" + maxRetries + ")");
|
||||
}
|
||||
|
||||
if (e instanceof ConnectException ||
|
||||
e instanceof NoRouteToHostException ||
|
||||
|
@ -863,6 +863,9 @@ Release 2.3.0 - UNRELEASED
|
||||
HDFS-5842. Cannot create hftp filesystem when using a proxy user ugi and a doAs
|
||||
on a secure cluster. (jing9)
|
||||
|
||||
HDFS-5399. Revisit SafeModeException and corresponding retry policies.
|
||||
(Haohui Mai via todd)
|
||||
|
||||
BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-4985. Add storage type to the protocol and expose it in block report
|
||||
|
@ -36,6 +36,8 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
|
||||
@ -258,6 +260,7 @@ synchronized void unref(ClientMmapManager mmapManager) {
|
||||
public static class Conf {
|
||||
final int hdfsTimeout; // timeout value for a DFS operation.
|
||||
final int maxFailoverAttempts;
|
||||
final int maxRetryAttempts;
|
||||
final int failoverSleepBaseMillis;
|
||||
final int failoverSleepMaxMillis;
|
||||
final int maxBlockAcquireFailures;
|
||||
@ -303,6 +306,9 @@ public Conf(Configuration conf) {
|
||||
maxFailoverAttempts = conf.getInt(
|
||||
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
|
||||
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
|
||||
maxRetryAttempts = conf.getInt(
|
||||
DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
|
||||
DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
|
||||
failoverSleepBaseMillis = conf.getInt(
|
||||
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
|
||||
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
|
||||
|
@ -82,6 +82,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_DEFAULT = 0;
|
||||
public static final String DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = "dfs.client.failover.connection.retries.on.timeouts";
|
||||
public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
|
||||
public static final String DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY = "dfs.client.retry.max.attempts";
|
||||
public static final int DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT = 10;
|
||||
|
||||
public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
|
||||
public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
|
||||
@ -574,6 +576,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final String DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,...
|
||||
public static final String DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY = "dfs.http.client.failover.max.attempts";
|
||||
public static final int DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT = 15;
|
||||
public static final String DFS_HTTP_CLIENT_RETRY_MAX_ATTEMPTS_KEY = "dfs.http.client.retry.max.attempts";
|
||||
public static final int DFS_HTTP_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT = 10;
|
||||
public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY = "dfs.http.client.failover.sleep.base.millis";
|
||||
public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT = 500;
|
||||
public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis";
|
||||
|
@ -24,6 +24,8 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
@ -144,9 +146,10 @@ public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
|
||||
.createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,
|
||||
nameNodeUri);
|
||||
Conf config = new Conf(conf);
|
||||
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies
|
||||
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
||||
config.maxFailoverAttempts, config.failoverSleepBaseMillis,
|
||||
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
|
||||
RetryPolicies.failoverOnNetworkException(
|
||||
RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
|
||||
config.maxRetryAttempts, config.failoverSleepBaseMillis,
|
||||
config.failoverSleepMaxMillis));
|
||||
|
||||
Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
|
||||
@ -192,11 +195,14 @@ public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
|
||||
int maxFailoverAttempts = config.getInt(
|
||||
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
|
||||
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
|
||||
int maxRetryAttempts = config.getInt(
|
||||
DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
|
||||
DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
|
||||
InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>(
|
||||
numResponseToDrop, failoverProxyProvider,
|
||||
RetryPolicies.failoverOnNetworkException(
|
||||
RetryPolicies.TRY_ONCE_THEN_FAIL,
|
||||
Math.max(numResponseToDrop + 1, maxFailoverAttempts), delay,
|
||||
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
|
||||
Math.max(numResponseToDrop + 1, maxRetryAttempts), delay,
|
||||
maxCap));
|
||||
|
||||
T proxy = (T) Proxy.newProxyInstance(
|
||||
|
@ -1161,7 +1161,8 @@ private void checkNameNodeSafeMode(String errorMsg)
|
||||
if (isInSafeMode()) {
|
||||
SafeModeException se = new SafeModeException(errorMsg, safeMode);
|
||||
if (haEnabled && haContext != null
|
||||
&& haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
|
||||
&& haContext.getState().getServiceState() == HAServiceState.ACTIVE
|
||||
&& shouldRetrySafeMode(this.safeMode)) {
|
||||
throw new RetriableException(se);
|
||||
} else {
|
||||
throw se;
|
||||
@ -1169,6 +1170,18 @@ private void checkNameNodeSafeMode(String errorMsg)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We already know that the safemode is on. We will throw a RetriableException
|
||||
* if the safemode is not manual or caused by low resource.
|
||||
*/
|
||||
private boolean shouldRetrySafeMode(SafeModeInfo safeMode) {
|
||||
if (safeMode == null) {
|
||||
return false;
|
||||
} else {
|
||||
return !safeMode.isManual() && !safeMode.areResourcesLow();
|
||||
}
|
||||
}
|
||||
|
||||
public static Collection<URI> getNamespaceDirs(Configuration conf) {
|
||||
return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
|
||||
}
|
||||
|
@ -188,6 +188,9 @@ public synchronized void initialize(URI uri, Configuration conf
|
||||
int maxFailoverAttempts = conf.getInt(
|
||||
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
|
||||
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
|
||||
int maxRetryAttempts = conf.getInt(
|
||||
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
|
||||
DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
|
||||
int failoverSleepBaseMillis = conf.getInt(
|
||||
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
|
||||
DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
|
||||
@ -197,7 +200,7 @@ public synchronized void initialize(URI uri, Configuration conf
|
||||
|
||||
this.retryPolicy = RetryPolicies
|
||||
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
||||
maxFailoverAttempts, failoverSleepBaseMillis,
|
||||
maxFailoverAttempts, maxRetryAttempts, failoverSleepBaseMillis,
|
||||
failoverSleepMaxMillis);
|
||||
}
|
||||
|
||||
|
@ -55,6 +55,7 @@
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
@ -65,6 +66,7 @@
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -124,6 +126,9 @@ public void testClientRetrySafeMode() throws Exception {
|
||||
final Path test = new Path("/test");
|
||||
// let nn0 enter safemode
|
||||
NameNodeAdapter.enterSafeMode(nn0, false);
|
||||
SafeModeInfo safeMode = (SafeModeInfo) Whitebox.getInternalState(
|
||||
nn0.getNamesystem(), "safeMode");
|
||||
Whitebox.setInternalState(safeMode, "extension", Integer.valueOf(30000));
|
||||
LOG.info("enter safemode");
|
||||
new Thread() {
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user