HDFS-3871. Change NameNodeProxies to use RetryUtils. Contributed by Arun C Murthy
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1379743 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4911d9b1ff
commit
50222ff529
@ -434,6 +434,9 @@ Branch-2 ( Unreleased changes )
|
||||
HDFS-3177. Update DFSClient and DataXceiver to handle different checkum
|
||||
types in file checksum computation. (Kihwal Lee via szetszwo)
|
||||
|
||||
HDFS-3871. Change NameNodeProxies to use RetryUtils. (Arun C Murthy
|
||||
via szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-2982. Startup performance suffers when there are many edit log
|
||||
|
@ -57,6 +57,7 @@
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryProxy;
|
||||
import org.apache.hadoop.io.retry.RetryUtils;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
@ -68,7 +69,6 @@
|
||||
import org.apache.hadoop.tools.GetUserMappingsProtocol;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* Create proxy objects to communicate with a remote NN. All remote access to an
|
||||
@ -243,106 +243,20 @@ private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
|
||||
return new NamenodeProtocolTranslatorPB(proxy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the default retry policy used in RPC.
|
||||
*
|
||||
* If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
|
||||
*
|
||||
* Otherwise, first unwrap ServiceException if possible, and then
|
||||
* (1) use multipleLinearRandomRetry for
|
||||
* - SafeModeException, or
|
||||
* - IOException other than RemoteException, or
|
||||
* - ServiceException; and
|
||||
* (2) use TRY_ONCE_THEN_FAIL for
|
||||
* - non-SafeMode RemoteException, or
|
||||
* - non-IOException.
|
||||
*
|
||||
* Note that dfs.client.retry.max < 0 is not allowed.
|
||||
*/
|
||||
public static RetryPolicy getDefaultRetryPolicy(Configuration conf) {
|
||||
final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
|
||||
}
|
||||
if (multipleLinearRandomRetry == null) {
|
||||
//no retry
|
||||
return RetryPolicies.TRY_ONCE_THEN_FAIL;
|
||||
} else {
|
||||
return new RetryPolicy() {
|
||||
@Override
|
||||
public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
||||
boolean isMethodIdempotent) throws Exception {
|
||||
if (e instanceof ServiceException) {
|
||||
//unwrap ServiceException
|
||||
final Throwable cause = e.getCause();
|
||||
if (cause != null && cause instanceof Exception) {
|
||||
e = (Exception)cause;
|
||||
}
|
||||
}
|
||||
|
||||
//see (1) and (2) in the javadoc of this method.
|
||||
final RetryPolicy p;
|
||||
if (e instanceof RemoteException) {
|
||||
final RemoteException re = (RemoteException)e;
|
||||
p = SafeModeException.class.getName().equals(re.getClassName())?
|
||||
multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
|
||||
} else if (e instanceof IOException || e instanceof ServiceException) {
|
||||
p = multipleLinearRandomRetry;
|
||||
} else { //non-IOException
|
||||
p = RetryPolicies.TRY_ONCE_THEN_FAIL;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("RETRY " + retries + ") policy="
|
||||
+ p.getClass().getSimpleName() + ", exception=" + e);
|
||||
}
|
||||
LOG.info("RETRY " + retries + ") policy="
|
||||
+ p.getClass().getSimpleName() + ", exception=" + e);
|
||||
return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RetryPolicy[" + multipleLinearRandomRetry + ", "
|
||||
+ RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName()
|
||||
+ "]";
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the MultipleLinearRandomRetry policy specified in the conf,
|
||||
* or null if the feature is disabled.
|
||||
* If the policy is specified in the conf but the policy cannot be parsed,
|
||||
* the default policy is returned.
|
||||
*
|
||||
* Conf property: N pairs of sleep-time and number-of-retries
|
||||
* dfs.client.retry.policy = "s1,n1,s2,n2,..."
|
||||
*/
|
||||
private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
|
||||
final boolean enabled = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
|
||||
if (!enabled) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final String policy = conf.get(
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
||||
|
||||
final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
|
||||
return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
||||
}
|
||||
|
||||
private static ClientProtocol createNNProxyWithClientProtocol(
|
||||
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
||||
boolean withRetries) throws IOException {
|
||||
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
|
||||
|
||||
final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf);
|
||||
final RetryPolicy defaultPolicy =
|
||||
RetryUtils.getDefaultRetryPolicy(
|
||||
conf,
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
|
||||
SafeModeException.class);
|
||||
|
||||
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
|
||||
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
|
||||
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
|
||||
|
@ -54,7 +54,6 @@
|
||||
import org.apache.hadoop.hdfs.ByteRangeInputStream;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||
@ -89,6 +88,7 @@
|
||||
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
@ -181,7 +181,14 @@ public synchronized void initialize(URI uri, Configuration conf
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
|
||||
this.retryPolicy = NameNodeProxies.getDefaultRetryPolicy(conf);
|
||||
this.retryPolicy =
|
||||
RetryUtils.getDefaultRetryPolicy(
|
||||
conf,
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
|
||||
SafeModeException.class);
|
||||
this.workingDir = getHomeDirectory();
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
|
Loading…
Reference in New Issue
Block a user