From 2740112bb64e1cc8132a1dc450d9e461c2e4729e Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Sun, 11 Dec 2011 18:53:21 +0000 Subject: [PATCH] HDFS-2647. Used protobuf based RPC for InterDatanodeProtocol, ClientDatanodeProtocol, JournalProtocol, NamenodeProtocol. Contributed by Suresh Srinivas. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1213040 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../java/org/apache/hadoop/hdfs/DFSUtil.java | 33 ++++++-- .../ClientDatanodeProtocolTranslatorPB.java | 78 +++++++++++++++++-- ...atanodeProtocolServerSideTranslatorPB.java | 3 +- .../InterDatanodeProtocolTranslatorPB.java | 13 +++- .../NamenodeProtocolTranslatorPB.java | 18 +++-- .../hadoop/hdfs/protocolPB/PBHelper.java | 26 ++++++- .../server/balancer/NameNodeConnector.java | 20 +++-- .../hadoop/hdfs/server/datanode/DataNode.java | 48 +++++++----- .../hdfs/server/namenode/BackupNode.java | 28 ++++--- .../namenode/EditLogBackupOutputStream.java | 4 +- .../server/namenode/NameNodeHttpServer.java | 4 +- .../server/namenode/NameNodeRpcServer.java | 20 ++++- .../server/namenode/SecondaryNameNode.java | 9 ++- .../apache/hadoop/hdfs/MiniDFSCluster.java | 8 +- .../hadoop/hdfs/TestDFSClientRetries.java | 2 +- .../org/apache/hadoop/hdfs/TestGetBlocks.java | 9 +-- .../security/token/block/TestBlockToken.java | 58 +++++++++----- .../datanode/TestInterDatanodeProtocol.java | 10 +-- 19 files changed, 282 insertions(+), 112 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 85543c12ac..05919c9767 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -29,6 +29,9 @@ Trunk (unreleased changes) HDFS-2642. Protobuf translators for DatanodeProtocol. (jitendra) + HDFS-2647. Used protobuf based RPC for InterDatanodeProtocol, + ClientDatanodeProtocol, JournalProtocol, NamenodeProtocol. (suresh) + IMPROVEMENTS HADOOP-7524 Change RPC to allow multiple protocols including multuple diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 1870369df8..343f4f36b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -50,12 +50,17 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; +import com.google.protobuf.BlockingService; + @InterfaceAudience.Private public class DFSUtil { private static final ThreadLocal RANDOM = new ThreadLocal() { @@ -573,12 +578,12 @@ public static void setGenericConf(Configuration conf, /** Return used as percentage of capacity */ public static float getPercentUsed(long used, long capacity) { - return capacity <= 0 ? 100 : ((float)used * 100.0f)/(float)capacity; + return capacity <= 0 ? 100 : (used * 100.0f)/capacity; } /** Return remaining as percentage of capacity */ public static float getPercentRemaining(long remaining, long capacity) { - return capacity <= 0 ? 0 : ((float)remaining * 100.0f)/(float)capacity; + return capacity <= 0 ? 0 : (remaining * 100.0f)/capacity; } /** @@ -634,8 +639,7 @@ public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr, public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( DatanodeID datanodeid, Configuration conf, int socketTimeout, LocatedBlock locatedBlock) throws IOException { - return new org.apache.hadoop.hdfs.protocolR23Compatible. - ClientDatanodeProtocolTranslatorR23(datanodeid, conf, socketTimeout, + return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout, locatedBlock); } @@ -643,7 +647,7 @@ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( static ClientDatanodeProtocol createClientDatanodeProtocolProxy( DatanodeID datanodeid, Configuration conf, int socketTimeout) throws IOException { - return new org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeProtocolTranslatorR23( + return new ClientDatanodeProtocolTranslatorPB( datanodeid, conf, socketTimeout); } @@ -651,8 +655,7 @@ static ClientDatanodeProtocol createClientDatanodeProtocolProxy( public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory) throws IOException { - return new org.apache.hadoop.hdfs.protocolR23Compatible. - ClientDatanodeProtocolTranslatorR23(addr, ticket, conf, factory); + return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory); } /** @@ -737,4 +740,18 @@ public static URI createUri(String scheme, InetSocketAddress address) { throw new IllegalArgumentException(ue); } } -} + + /** + * Add protobuf based protocol to the {@link org.apache.hadoop.ipc.RPC.Server} + * @param conf configuration + * @param protocol Protocol interface + * @param service service that implements the protocol + * @param server RPC server to which the protocol & implementation is added to + * @throws IOException + */ + public static void addPBProtocol(Configuration conf, Class protocol, + BlockingService service, RPC.Server server) throws IOException { + RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class); + server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, protocol, service); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 8c345a480e..7cbdc41144 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -21,12 +21,19 @@ import java.io.IOException; import java.net.InetSocketAddress; +import javax.net.SocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; @@ -38,6 +45,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import com.google.protobuf.RpcController; @@ -52,20 +61,79 @@ @InterfaceStability.Stable public class ClientDatanodeProtocolTranslatorPB implements ClientDatanodeProtocol, Closeable { + public static final Log LOG = LogFactory + .getLog(ClientDatanodeProtocolTranslatorPB.class); + /** RpcController is not used and hence is set to null */ private final static RpcController NULL_CONTROLLER = null; private final ClientDatanodeProtocolPB rpcProxy; private final static RefreshNamenodesRequestProto REFRESH_NAMENODES = RefreshNamenodesRequestProto.newBuilder().build(); + public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, + Configuration conf, int socketTimeout, LocatedBlock locatedBlock) + throws IOException { + rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf, + socketTimeout, locatedBlock); + } + + public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr, + UserGroupInformation ticket, Configuration conf, SocketFactory factory) + throws IOException { + rpcProxy = createClientDatanodeProtocolProxy(addr, ticket, conf, factory, 0); + } + + /** + * Constructor. + * @param datanodeid Datanode to connect to. + * @param conf Configuration. + * @param socketTimeout Socket timeout to use. + * @throws IOException + */ + public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, + Configuration conf, int socketTimeout) throws IOException { + InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getHost() + + ":" + datanodeid.getIpcPort()); + rpcProxy = createClientDatanodeProtocolProxy(addr, + UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), socketTimeout); + } - public ClientDatanodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr, - Configuration conf) throws IOException { + static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy( + DatanodeID datanodeid, Configuration conf, int socketTimeout, + LocatedBlock locatedBlock) throws IOException { + InetSocketAddress addr = NetUtils.createSocketAddr( + datanodeid.getHost() + ":" + datanodeid.getIpcPort()); + if (LOG.isDebugEnabled()) { + LOG.debug("ClientDatanodeProtocol addr=" + addr); + } + + // Since we're creating a new UserGroupInformation here, we know that no + // future RPC proxies will be able to re-use the same connection. And + // usages of this proxy tend to be one-off calls. + // + // This is a temporary fix: callers should really achieve this by using + // RPC.stopProxy() on the resulting object, but this is currently not + // working in trunk. See the discussion on HDFS-1965. + Configuration confWithNoIpcIdle = new Configuration(conf); + confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic + .IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); + + UserGroupInformation ticket = UserGroupInformation + .createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString()); + ticket.addToken(locatedBlock.getBlockToken()); + return createClientDatanodeProtocolProxy(addr, ticket, confWithNoIpcIdle, + NetUtils.getDefaultSocketFactory(conf), socketTimeout); + } + + static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy( + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int socketTimeout) throws IOException { RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class, ProtobufRpcEngine.class); - rpcProxy = RPC.getProxy(ClientDatanodeProtocolPB.class, - RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), nameNodeAddr, - conf); + return RPC.getProxy(ClientDatanodeProtocolPB.class, + RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), addr, ticket, + conf, factory, socketTimeout); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java index d21a4c4cdc..495a0b67e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java @@ -63,7 +63,8 @@ public InitReplicaRecoveryResponseProto initReplicaRecovery( throw new ServiceException(e); } return InitReplicaRecoveryResponseProto.newBuilder() - .setBlock(PBHelper.convert(r)).build(); + .setBlock(PBHelper.convert(r)) + .setState(PBHelper.convert(r.getOriginalReplicaState())).build(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java index 5f39043302..5bcde8fb16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.net.InetSocketAddress; +import javax.net.SocketFactory; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -37,6 +39,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -54,13 +57,15 @@ public class InterDatanodeProtocolTranslatorPB implements private final static RpcController NULL_CONTROLLER = null; final private InterDatanodeProtocolPB rpcProxy; - public InterDatanodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr, - Configuration conf) throws IOException { + public InterDatanodeProtocolTranslatorPB(InetSocketAddress addr, + UserGroupInformation ugi, Configuration conf, SocketFactory factory, + int socketTimeout) + throws IOException { RPC.setProtocolEngine(conf, InterDatanodeProtocolPB.class, ProtobufRpcEngine.class); rpcProxy = RPC.getProxy(InterDatanodeProtocolPB.class, - RPC.getProtocolVersion(InterDatanodeProtocolPB.class), nameNodeAddr, - conf); + RPC.getProtocolVersion(InterDatanodeProtocolPB.class), addr, ugi, conf, + factory, socketTimeout); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index d0e5c50efa..99bd3e687f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -56,6 +56,7 @@ import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -91,11 +92,11 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, final private NamenodeProtocolPB rpcProxy; - - private static NamenodeProtocolPB createNamenode( InetSocketAddress nameNodeAddr, Configuration conf, UserGroupInformation ugi) throws IOException { + RPC.setProtocolEngine(conf, NamenodeProtocolPB.class, + ProtobufRpcEngine.class); return RPC.getProxy(NamenodeProtocolPB.class, RPC.getProtocolVersion(NamenodeProtocolPB.class), nameNodeAddr, ugi, conf, NetUtils.getSocketFactory(conf, NamenodeProtocolPB.class)); @@ -107,17 +108,20 @@ static NamenodeProtocolPB createNamenodeWithRetry( RetryPolicy createPolicy = RetryPolicies .retryUpToMaximumCountWithFixedSleep(5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); - Map, RetryPolicy> remoteExceptionToPolicyMap = new HashMap, RetryPolicy>(); + Map, RetryPolicy> remoteExceptionToPolicyMap = + new HashMap, RetryPolicy>(); remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy); - Map, RetryPolicy> exceptionToPolicyMap = new HashMap, RetryPolicy>(); + Map, RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); exceptionToPolicyMap.put(RemoteException.class, RetryPolicies .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap)); RetryPolicy methodPolicy = RetryPolicies.retryByException( RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); - Map methodNameToPolicyMap = new HashMap(); + Map methodNameToPolicyMap = + new HashMap(); methodNameToPolicyMap.put("create", methodPolicy); @@ -129,6 +133,10 @@ public NamenodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr, Configuration conf, UserGroupInformation ugi) throws IOException { rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi)); } + + public NamenodeProtocolTranslatorPB(NamenodeProtocolPB rpcProxy) { + this.rpcProxy = rpcProxy; + } public void close() { RPC.stopProxy(rpcProxy); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 7061d35b4c..15aa1ee721 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -96,7 +96,7 @@ /** * Utilities for converting protobuf classes to and from implementation classes. */ -class PBHelper { +public class PBHelper { private PBHelper() { /** Hidden constructor */ } @@ -170,7 +170,7 @@ public static BlockProto convert(Block b) { } public static Block convert(BlockProto b) { - return new Block(b.getBlockId(), b.getGenStamp(), b.getNumBytes()); + return new Block(b.getBlockId(), b.getNumBytes(), b.getGenStamp()); } public static BlockWithLocationsProto convert(BlockWithLocations blk) { @@ -330,6 +330,9 @@ public static ExtendedBlock convert(ExtendedBlockProto b) { } public static RecoveringBlockProto convert(RecoveringBlock b) { + if (b == null) { + return null; + } LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b); return RecoveringBlockProto.newBuilder().setBlock(lb) .setNewGenStamp(b.getNewGenerationStamp()).build(); @@ -399,6 +402,9 @@ public static AdminState convert(AdminStates adminState) { } public static LocatedBlockProto convert(LocatedBlock b) { + if (b == null) { + return null; + } Builder builder = LocatedBlockProto.newBuilder(); DatanodeInfo[] locs = b.getLocations(); for (int i = 0; i < locs.length; i++) { @@ -452,6 +458,22 @@ public static ReplicaState convert(ReplicaStateProto state) { return ReplicaState.FINALIZED; } } + + public static ReplicaStateProto convert(ReplicaState state) { + switch (state) { + case RBW: + return ReplicaStateProto.RBW; + case RUR: + return ReplicaStateProto.RUR; + case RWR: + return ReplicaStateProto.RWR; + case TEMPORARY: + return ReplicaStateProto.TEMPORARY; + case FINALIZED: + default: + return ReplicaStateProto.FINALIZED; + } + } public static DatanodeRegistrationProto convert( DatanodeRegistration registration) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 634efdf5b3..ebd9783fc1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; @@ -46,6 +48,7 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; @@ -202,14 +205,15 @@ private static NamenodeProtocol createNamenode(InetSocketAddress address, methodNameToPolicyMap.put("getBlocks", methodPolicy); methodNameToPolicyMap.put("getAccessKeys", methodPolicy); - return (NamenodeProtocol) RetryProxy.create(NamenodeProtocol.class, - RPC.getProxy(NamenodeProtocol.class, - NamenodeProtocol.versionID, - address, - UserGroupInformation.getCurrentUser(), - conf, - NetUtils.getDefaultSocketFactory(conf)), - methodNameToPolicyMap); + RPC.setProtocolEngine(conf, NamenodeProtocolPB.class, + ProtobufRpcEngine.class); + NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class, + RPC.getProtocolVersion(NamenodeProtocolPB.class), address, + UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf)); + NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create( + NamenodeProtocolPB.class, proxy, methodNameToPolicyMap); + return new NamenodeProtocolTranslatorPB(retryProxy); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 35f4441116..96e6944cb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -105,9 +105,15 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeProtocolServerSideTranslatorR23; +import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; @@ -132,18 +138,15 @@ import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; -import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolServerSideTranslatorR23; -import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolTranslatorR23; -import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeWireProtocol; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.DNS; @@ -167,6 +170,7 @@ import org.mortbay.util.ajax.JSON; import com.google.common.base.Preconditions; +import com.google.protobuf.BlockingService; /********************************************************** @@ -519,21 +523,23 @@ private void initIpcServer(Configuration conf) throws IOException { conf.get(DFS_DATANODE_IPC_ADDRESS_KEY)); // Add all the RPC protocols that the Datanode implements - ClientDatanodeProtocolServerSideTranslatorR23 - clientDatanodeProtocolServerTranslator = - new ClientDatanodeProtocolServerSideTranslatorR23(this); - ipcServer = RPC.getServer( - org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class, - clientDatanodeProtocolServerTranslator, ipcAddr.getHostName(), - ipcAddr.getPort(), - conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, - DFS_DATANODE_HANDLER_COUNT_DEFAULT), - false, conf, blockPoolTokenSecretManager); - InterDatanodeProtocolServerSideTranslatorR23 - interDatanodeProtocolServerTranslator = - new InterDatanodeProtocolServerSideTranslatorR23(this); - ipcServer.addProtocol(RpcKind.RPC_WRITABLE, InterDatanodeWireProtocol.class, - interDatanodeProtocolServerTranslator); + RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator = + new ClientDatanodeProtocolServerSideTranslatorPB(this); + BlockingService service = ClientDatanodeProtocolService + .newReflectiveBlockingService(clientDatanodeProtocolXlator); + ipcServer = RPC.getServer(ClientDatanodeProtocolPB.class, service, ipcAddr + .getHostName(), ipcAddr.getPort(), conf.getInt( + DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT), + false, conf, blockPoolTokenSecretManager); + + InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = + new InterDatanodeProtocolServerSideTranslatorPB(this); + service = InterDatanodeProtocolService + .newReflectiveBlockingService(interDatanodeProtocolXlator); + DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service, + ipcServer); // set service-level authorization security policy if (conf.getBoolean( @@ -1023,7 +1029,7 @@ public static InterDatanodeProtocol createInterDataNodeProtocolProxy( return loginUgi .doAs(new PrivilegedExceptionAction() { public InterDatanodeProtocol run() throws IOException { - return new InterDatanodeProtocolTranslatorR23(addr, loginUgi, + return new InterDatanodeProtocolTranslatorPB(addr, loginUgi, conf, NetUtils.getDefaultSocketFactory(conf), socketTimeout); } }); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java index 421b34457d..658d3fd90f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java @@ -29,8 +29,11 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import org.apache.hadoop.hdfs.protocolR23Compatible.JournalProtocolServerSideTranslatorR23; -import org.apache.hadoop.hdfs.protocolR23Compatible.JournalWireProtocol; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService; +import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; @@ -40,9 +43,10 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.net.NetUtils; +import com.google.protobuf.BlockingService; + /** * BackupNode. *

@@ -209,10 +213,12 @@ static class BackupNodeRpcServer extends NameNodeRpcServer implements private BackupNodeRpcServer(Configuration conf, BackupNode nn) throws IOException { super(conf, nn); - JournalProtocolServerSideTranslatorR23 journalProtocolTranslator = - new JournalProtocolServerSideTranslatorR23(this); - this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, JournalWireProtocol.class, - journalProtocolTranslator); + JournalProtocolServerSideTranslatorPB journalProtocolTranslator = + new JournalProtocolServerSideTranslatorPB(this); + BlockingService service = JournalProtocolService + .newReflectiveBlockingService(journalProtocolTranslator); + DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service, + this.clientRpcServer); nnRpcAddress = nn.nnRpcAddress; } @@ -298,9 +304,11 @@ boolean shouldCheckpointAtStartup() { private NamespaceInfo handshake(Configuration conf) throws IOException { // connect to name node InetSocketAddress nnAddress = NameNode.getServiceAddress(conf, true); - this.namenode = - RPC.waitForProxy(NamenodeProtocol.class, - NamenodeProtocol.versionID, nnAddress, conf); + NamenodeProtocolPB proxy = + RPC.waitForProxy(NamenodeProtocolPB.class, + RPC.getProtocolVersion(NamenodeProtocolPB.class), + nnAddress, conf); + this.namenode = new NamenodeProtocolTranslatorPB(proxy); this.nnRpcAddress = getHostPortString(nnAddress); this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf)); // get version and id info from the name-node diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java index 711fcce48e..867a93d058 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java @@ -22,7 +22,7 @@ import java.util.Arrays; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.protocolR23Compatible.JournalProtocolTranslatorR23; +import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; @@ -57,7 +57,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream { NetUtils.createSocketAddr(bnRegistration.getAddress()); try { this.backupNode = - new JournalProtocolTranslatorR23(bnAddress, new HdfsConfiguration()); + new JournalProtocolTranslatorPB(bnAddress, new HdfsConfiguration()); } catch(IOException e) { Storage.LOG.error("Error connecting to: " + bnAddress, e); throw e; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java index d8b265913e..8d35469791 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java @@ -199,7 +199,9 @@ private Map getAuthFilterParams(Configuration conf) } public void stop() throws Exception { - httpServer.stop(); + if (httpServer != null) { + httpServer.stop(); + } } public InetSocketAddress getHttpAddress() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index e47066ac82..e4f5de0505 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -38,6 +38,8 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import static org.apache.hadoop.hdfs.DFSConfigKeys.*; + +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -56,6 +58,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeProtocolServerSideTranslatorR23; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; @@ -97,6 +102,8 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.tools.GetUserMappingsProtocol; +import com.google.protobuf.BlockingService; + /** * This class is responsible for handling all of the RPC calls to the NameNode. * It is created, started, and stopped by {@link NameNode}. @@ -135,6 +142,11 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorR23(this); + NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = + new NamenodeProtocolServerSideTranslatorPB(this); + BlockingService service = NamenodeProtocolService + .newReflectiveBlockingService(namenodeProtocolXlator); + InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf); if (dnSocketAddr != null) { int serviceHandlerCount = @@ -149,14 +161,14 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) false, conf, namesystem.getDelegationTokenSecretManager()); this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE, DatanodeProtocol.class, this); - this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE, - NamenodeProtocol.class, this); this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE, RefreshAuthorizationPolicyProtocol.class, this); this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE, RefreshUserMappingsProtocol.class, this); this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE, GetUserMappingsProtocol.class, this); + DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service, + serviceRpcServer); this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress(); nn.setRpcServiceServerAddress(conf, serviceRPCAddress); @@ -173,14 +185,14 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) namesystem.getDelegationTokenSecretManager()); this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, DatanodeProtocol.class, this); - this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, - NamenodeProtocol.class, this); this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, RefreshAuthorizationPolicyProtocol.class, this); this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, RefreshUserMappingsProtocol.class, this); this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, GetUserMappingsProtocol.class, this); + DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service, + clientRpcServer); // set service-level authorization security policy diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java index d403629146..6c711b9d37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java @@ -48,6 +48,8 @@ import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; import org.apache.hadoop.hdfs.server.common.JspHelper; @@ -217,9 +219,10 @@ private void initialize(final Configuration conf, nameNodeAddr = NameNode.getServiceAddress(conf, true); this.conf = conf; - this.namenode = - (NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class, - NamenodeProtocol.versionID, nameNodeAddr, conf); + NamenodeProtocolPB proxy = + RPC.waitForProxy(NamenodeProtocolPB.class, + RPC.getProtocolVersion(NamenodeProtocolPB.class), nameNodeAddr, conf); + this.namenode = new NamenodeProtocolTranslatorPB(proxy); // initialize checkpoint directories fsName = getInfoServer(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index c66950af49..ee3e46998b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -48,7 +48,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; @@ -62,7 +63,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -515,8 +515,8 @@ private void initMiniDFSCluster(int nameNodePort, int nameNodeHttpPort, Class rpcEngine = conf.getClassByName(rpcEngineName); setRpcEngine(conf, NamenodeProtocols.class, rpcEngine); setRpcEngine(conf, ClientNamenodeWireProtocol.class, rpcEngine); - setRpcEngine(conf, ClientDatanodeWireProtocol.class, rpcEngine); - setRpcEngine(conf, NamenodeProtocol.class, rpcEngine); + setRpcEngine(conf, ClientDatanodeProtocolPB.class, rpcEngine); + setRpcEngine(conf, NamenodeProtocolPB.class, rpcEngine); setRpcEngine(conf, ClientProtocol.class, rpcEngine); setRpcEngine(conf, DatanodeProtocol.class, rpcEngine); setRpcEngine(conf, RefreshAuthorizationPolicyProtocol.class, rpcEngine); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index be57bd4e39..937b28cd90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -638,7 +638,7 @@ public void testClientDNProtocolTimeout() throws IOException { proxy = DFSUtil.createClientDatanodeProtocolProxy( fakeDnId, conf, 500, fakeBlock); - proxy.getReplicaVisibleLength(null); + proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1)); fail ("Did not get expected exception: SocketTimeoutException"); } catch (SocketTimeoutException e) { LOG.info("Got the expected Exception: SocketTimeoutException"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java index 53a433bbbd..8693885ec6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; @@ -32,9 +33,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import junit.framework.TestCase; @@ -98,10 +97,8 @@ public void testGetBlocks() throws Exception { // get RPC client to namenode InetSocketAddress addr = new InetSocketAddress("localhost", cluster.getNameNodePort()); - NamenodeProtocol namenode = (NamenodeProtocol) RPC.getProxy( - NamenodeProtocol.class, NamenodeProtocol.versionID, addr, - UserGroupInformation.getCurrentUser(), CONF, - NetUtils.getDefaultSocketFactory(CONF)); + NamenodeProtocol namenode = new NamenodeProtocolTranslatorPB(addr, CONF, + UserGroupInformation.getCurrentUser()); // get blocks of size fileLen from dataNodes[0] BlockWithLocations[] locs; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java index fd9c91d88c..a58394acfd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java @@ -57,8 +57,14 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.io.TestWritable; import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; @@ -76,6 +82,10 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.protobuf.BlockingService; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + /** Unit tests for block tokens */ public class TestBlockToken { public static final Log LOG = LogFactory.getLog(TestBlockToken.class); @@ -106,22 +116,24 @@ public class TestBlockToken { ExtendedBlock block2 = new ExtendedBlock("10", 10L); ExtendedBlock block3 = new ExtendedBlock("-10", -108L); - private static class getLengthAnswer implements Answer { + private static class GetLengthAnswer implements + Answer { BlockTokenSecretManager sm; BlockTokenIdentifier ident; - public getLengthAnswer(BlockTokenSecretManager sm, + public GetLengthAnswer(BlockTokenSecretManager sm, BlockTokenIdentifier ident) { this.sm = sm; this.ident = ident; } @Override - public Long answer(InvocationOnMock invocation) throws IOException { + public GetReplicaVisibleLengthResponseProto answer( + InvocationOnMock invocation) throws IOException { Object args[] = invocation.getArguments(); - assertEquals(1, args.length); - org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable block = - (org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable) args[0]; + assertEquals(2, args.length); + GetReplicaVisibleLengthRequestProto req = + (GetReplicaVisibleLengthRequestProto) args[1]; Set tokenIds = UserGroupInformation.getCurrentUser() .getTokenIdentifiers(); assertEquals("Only one BlockTokenIdentifier expected", 1, tokenIds.size()); @@ -130,12 +142,12 @@ public Long answer(InvocationOnMock invocation) throws IOException { BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId; LOG.info("Got: " + id.toString()); assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id)); - sm.checkAccess(id, null, org.apache.hadoop.hdfs.protocolR23Compatible. - ExtendedBlockWritable.convertExtendedBlock(block), + sm.checkAccess(id, null, PBHelper.convert(req.getBlock()), BlockTokenSecretManager.AccessMode.WRITE); result = id.getBlockId(); } - return result; + return GetReplicaVisibleLengthResponseProto.newBuilder() + .setLength(result).build(); } } @@ -208,25 +220,29 @@ public void testBlockTokenSecretManager() throws Exception { } private Server createMockDatanode(BlockTokenSecretManager sm, - Token token) throws IOException { - org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol mockDN = - mock(org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class); + Token token) throws IOException, ServiceException { + ClientDatanodeProtocolPB mockDN = mock(ClientDatanodeProtocolPB.class); when(mockDN.getProtocolVersion(anyString(), anyLong())).thenReturn( - org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.versionID); + RPC.getProtocolVersion(ClientDatanodeProtocolPB.class)); doReturn( ProtocolSignature.getProtocolSignature(mockDN, - org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class.getName(), - org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.versionID, 0)).when(mockDN) - .getProtocolSignature(anyString(), anyLong(), anyInt()); + ClientDatanodeProtocolPB.class.getName(), + RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), 0)).when( + mockDN).getProtocolSignature(anyString(), anyLong(), anyInt()); BlockTokenIdentifier id = sm.createIdentifier(); id.readFields(new DataInputStream(new ByteArrayInputStream(token .getIdentifier()))); - doAnswer(new getLengthAnswer(sm, id)).when(mockDN).getReplicaVisibleLength( - any(org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable.class)); + + doAnswer(new GetLengthAnswer(sm, id)).when(mockDN) + .getReplicaVisibleLength(any(RpcController.class), + any(GetReplicaVisibleLengthRequestProto.class)); - return RPC.getServer(org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class, - mockDN, ADDRESS, 0, 5, + RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + BlockingService service = ClientDatanodeProtocolService + .newReflectiveBlockingService(mockDN); + return RPC.getServer(ClientDatanodeProtocolPB.class, service, ADDRESS, 0, 5, true, conf, sm); } @@ -323,7 +339,7 @@ public void testBlockTokenRpcLeak() throws Exception { /** * @return the current number of file descriptors open by this process. */ - private static int countOpenFileDescriptors() throws IOException { + private static int countOpenFileDescriptors() { return FD_DIR.list().length; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java index 5f48651c89..26fc102b4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java @@ -150,7 +150,6 @@ public void testBlockMetaDataInfo() throws Exception { DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort()); InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy( datanodeinfo[0], conf, datanode.getDnConf().socketTimeout); - assertTrue(datanode != null); //stop block scanner, so we could compare lastScanTime if (datanode.blockScanner != null) { @@ -347,8 +346,8 @@ public void testUpdateReplicaUnderRecovery() throws IOException { /** Test to verify that InterDatanode RPC timesout as expected when * the server DN does not respond. */ - @Test - public void testInterDNProtocolTimeout() throws Exception { + @Test(expected=SocketTimeoutException.class) + public void testInterDNProtocolTimeout() throws Throwable { final Server server = new TestServer(1, true); server.start(); @@ -361,10 +360,9 @@ public void testInterDNProtocolTimeout() throws Exception { try { proxy = DataNode.createInterDataNodeProtocolProxy( dInfo, conf, 500); - proxy.initReplicaRecovery(null); + proxy.initReplicaRecovery(new RecoveringBlock( + new ExtendedBlock("bpid", 1), null, 100)); fail ("Expected SocketTimeoutException exception, but did not get."); - } catch (SocketTimeoutException e) { - DataNode.LOG.info("Got expected Exception: SocketTimeoutException" + e); } finally { if (proxy != null) { RPC.stopProxy(proxy);