HDFS-2499. RPC client is created incorrectly introduced in HDFS-2459. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1190620 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2011-10-28 22:06:04 +00:00
parent 55d3dc50d1
commit c0c938f242

View File

@ -20,26 +20,14 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
/** /**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server * This class forwards NN's ClientProtocol calls as RPC calls to the NN server
@ -50,54 +38,17 @@
@InterfaceStability.Stable @InterfaceStability.Stable
public class JournalProtocolTranslatorR23 implements public class JournalProtocolTranslatorR23 implements
JournalProtocol, Closeable { JournalProtocol, Closeable {
final private JournalWireProtocol rpcProxyWithoutRetry; private final JournalWireProtocol rpcProxy;
final private JournalWireProtocol rpcProxy;
private static JournalWireProtocol createNamenode(
InetSocketAddress nameNodeAddr, Configuration conf,
UserGroupInformation ugi) throws IOException {
return RPC.getProxy(JournalWireProtocol.class,
JournalWireProtocol.versionID, nameNodeAddr, ugi, conf,
NetUtils.getSocketFactory(conf, JournalWireProtocol.class));
}
/** Create a {@link NameNode} proxy */
static JournalWireProtocol createNamenodeWithRetry(
JournalWireProtocol rpcNamenode) {
RetryPolicy createPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(5,
HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
createPolicy);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
return (JournalWireProtocol) RetryProxy.create(
JournalWireProtocol.class, rpcNamenode, methodNameToPolicyMap);
}
public JournalProtocolTranslatorR23(InetSocketAddress nameNodeAddr, public JournalProtocolTranslatorR23(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException { Configuration conf) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); rpcProxy = RPC.getProxy(JournalWireProtocol.class,
rpcProxyWithoutRetry = createNamenode(nameNodeAddr, conf, ugi); JournalWireProtocol.versionID, nameNodeAddr, conf);
rpcProxy = createNamenodeWithRetry(rpcProxyWithoutRetry);
} }
@Override @Override
public void close() { public void close() {
RPC.stopProxy(rpcProxyWithoutRetry); RPC.stopProxy(rpcProxy);
} }
@Override @Override