HDFS-2647. Used protobuf based RPC for InterDatanodeProtocol, ClientDatanodeProtocol, JournalProtocol, NamenodeProtocol. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1213040 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2011-12-11 18:53:21 +00:00
parent 604f99e367
commit 2740112bb6
19 changed files with 282 additions and 112 deletions

View File

@ -29,6 +29,9 @@ Trunk (unreleased changes)
HDFS-2642. Protobuf translators for DatanodeProtocol. (jitendra)
HDFS-2647. Used protobuf based RPC for InterDatanodeProtocol,
ClientDatanodeProtocol, JournalProtocol, NamenodeProtocol. (suresh)
IMPROVEMENTS
HADOOP-7524 Change RPC to allow multiple protocols including multuple

View File

@ -50,12 +50,17 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.protobuf.BlockingService;
@InterfaceAudience.Private
public class DFSUtil {
private static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
@ -573,12 +578,12 @@ public static void setGenericConf(Configuration conf,
/** Return used as percentage of capacity */
public static float getPercentUsed(long used, long capacity) {
return capacity <= 0 ? 100 : ((float)used * 100.0f)/(float)capacity;
return capacity <= 0 ? 100 : (used * 100.0f)/capacity;
}
/** Return remaining as percentage of capacity */
public static float getPercentRemaining(long remaining, long capacity) {
return capacity <= 0 ? 0 : ((float)remaining * 100.0f)/(float)capacity;
return capacity <= 0 ? 0 : (remaining * 100.0f)/capacity;
}
/**
@ -634,8 +639,7 @@ public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
LocatedBlock locatedBlock) throws IOException {
return new org.apache.hadoop.hdfs.protocolR23Compatible.
ClientDatanodeProtocolTranslatorR23(datanodeid, conf, socketTimeout,
return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
locatedBlock);
}
@ -643,7 +647,7 @@ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout)
throws IOException {
return new org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeProtocolTranslatorR23(
return new ClientDatanodeProtocolTranslatorPB(
datanodeid, conf, socketTimeout);
}
@ -651,8 +655,7 @@ static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory) throws IOException {
return new org.apache.hadoop.hdfs.protocolR23Compatible.
ClientDatanodeProtocolTranslatorR23(addr, ticket, conf, factory);
return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
}
/**
@ -737,4 +740,18 @@ public static URI createUri(String scheme, InetSocketAddress address) {
throw new IllegalArgumentException(ue);
}
}
}
/**
* Add protobuf based protocol to the {@link org.apache.hadoop.ipc.RPC.Server}
* @param conf configuration
* @param protocol Protocol interface
* @param service service that implements the protocol
* @param server RPC server to which the protocol & implementation is added to
* @throws IOException
*/
public static void addPBProtocol(Configuration conf, Class<?> protocol,
BlockingService service, RPC.Server server) throws IOException {
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
}
}

View File

@ -21,12 +21,19 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
@ -38,6 +45,8 @@
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.RpcController;
@ -52,20 +61,79 @@
@InterfaceStability.Stable
public class ClientDatanodeProtocolTranslatorPB implements
ClientDatanodeProtocol, Closeable {
public static final Log LOG = LogFactory
.getLog(ClientDatanodeProtocolTranslatorPB.class);
/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
private final ClientDatanodeProtocolPB rpcProxy;
private final static RefreshNamenodesRequestProto REFRESH_NAMENODES =
RefreshNamenodesRequestProto.newBuilder().build();
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
Configuration conf, int socketTimeout, LocatedBlock locatedBlock)
throws IOException {
rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf,
socketTimeout, locatedBlock);
}
public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory)
throws IOException {
rpcProxy = createClientDatanodeProtocolProxy(addr, ticket, conf, factory, 0);
}
/**
* Constructor.
* @param datanodeid Datanode to connect to.
* @param conf Configuration.
* @param socketTimeout Socket timeout to use.
* @throws IOException
*/
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
Configuration conf, int socketTimeout) throws IOException {
InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getHost()
+ ":" + datanodeid.getIpcPort());
rpcProxy = createClientDatanodeProtocolProxy(addr,
UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
}
public ClientDatanodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
LocatedBlock locatedBlock) throws IOException {
InetSocketAddress addr = NetUtils.createSocketAddr(
datanodeid.getHost() + ":" + datanodeid.getIpcPort());
if (LOG.isDebugEnabled()) {
LOG.debug("ClientDatanodeProtocol addr=" + addr);
}
// Since we're creating a new UserGroupInformation here, we know that no
// future RPC proxies will be able to re-use the same connection. And
// usages of this proxy tend to be one-off calls.
//
// This is a temporary fix: callers should really achieve this by using
// RPC.stopProxy() on the resulting object, but this is currently not
// working in trunk. See the discussion on HDFS-1965.
Configuration confWithNoIpcIdle = new Configuration(conf);
confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic
.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
UserGroupInformation ticket = UserGroupInformation
.createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString());
ticket.addToken(locatedBlock.getBlockToken());
return createClientDatanodeProtocolProxy(addr, ticket, confWithNoIpcIdle,
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
}
static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int socketTimeout) throws IOException {
RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
rpcProxy = RPC.getProxy(ClientDatanodeProtocolPB.class,
RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), nameNodeAddr,
conf);
return RPC.getProxy(ClientDatanodeProtocolPB.class,
RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), addr, ticket,
conf, factory, socketTimeout);
}
@Override

View File

@ -63,7 +63,8 @@ public InitReplicaRecoveryResponseProto initReplicaRecovery(
throw new ServiceException(e);
}
return InitReplicaRecoveryResponseProto.newBuilder()
.setBlock(PBHelper.convert(r)).build();
.setBlock(PBHelper.convert(r))
.setState(PBHelper.convert(r.getOriginalReplicaState())).build();
}
@Override

View File

@ -21,6 +21,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import javax.net.SocketFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -37,6 +39,7 @@
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -54,13 +57,15 @@ public class InterDatanodeProtocolTranslatorPB implements
private final static RpcController NULL_CONTROLLER = null;
final private InterDatanodeProtocolPB rpcProxy;
public InterDatanodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
public InterDatanodeProtocolTranslatorPB(InetSocketAddress addr,
UserGroupInformation ugi, Configuration conf, SocketFactory factory,
int socketTimeout)
throws IOException {
RPC.setProtocolEngine(conf, InterDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
rpcProxy = RPC.getProxy(InterDatanodeProtocolPB.class,
RPC.getProtocolVersion(InterDatanodeProtocolPB.class), nameNodeAddr,
conf);
RPC.getProtocolVersion(InterDatanodeProtocolPB.class), addr, ugi, conf,
factory, socketTimeout);
}
@Override

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@ -91,11 +92,11 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
final private NamenodeProtocolPB rpcProxy;
private static NamenodeProtocolPB createNamenode(
InetSocketAddress nameNodeAddr, Configuration conf,
UserGroupInformation ugi) throws IOException {
RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
ProtobufRpcEngine.class);
return RPC.getProxy(NamenodeProtocolPB.class,
RPC.getProtocolVersion(NamenodeProtocolPB.class), nameNodeAddr, ugi,
conf, NetUtils.getSocketFactory(conf, NamenodeProtocolPB.class));
@ -107,17 +108,20 @@ static NamenodeProtocolPB createNamenodeWithRetry(
RetryPolicy createPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(5,
HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>();
Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
createPolicy);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>();
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
Map<String, RetryPolicy> methodNameToPolicyMap =
new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
@ -129,6 +133,10 @@ public NamenodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
Configuration conf, UserGroupInformation ugi) throws IOException {
rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi));
}
public NamenodeProtocolTranslatorPB(NamenodeProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
public void close() {
RPC.stopProxy(rpcProxy);

View File

@ -96,7 +96,7 @@
/**
* Utilities for converting protobuf classes to and from implementation classes.
*/
class PBHelper {
public class PBHelper {
private PBHelper() {
/** Hidden constructor */
}
@ -170,7 +170,7 @@ public static BlockProto convert(Block b) {
}
public static Block convert(BlockProto b) {
return new Block(b.getBlockId(), b.getGenStamp(), b.getNumBytes());
return new Block(b.getBlockId(), b.getNumBytes(), b.getGenStamp());
}
public static BlockWithLocationsProto convert(BlockWithLocations blk) {
@ -330,6 +330,9 @@ public static ExtendedBlock convert(ExtendedBlockProto b) {
}
public static RecoveringBlockProto convert(RecoveringBlock b) {
if (b == null) {
return null;
}
LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b);
return RecoveringBlockProto.newBuilder().setBlock(lb)
.setNewGenStamp(b.getNewGenerationStamp()).build();
@ -399,6 +402,9 @@ public static AdminState convert(AdminStates adminState) {
}
public static LocatedBlockProto convert(LocatedBlock b) {
if (b == null) {
return null;
}
Builder builder = LocatedBlockProto.newBuilder();
DatanodeInfo[] locs = b.getLocations();
for (int i = 0; i < locs.length; i++) {
@ -452,6 +458,22 @@ public static ReplicaState convert(ReplicaStateProto state) {
return ReplicaState.FINALIZED;
}
}
public static ReplicaStateProto convert(ReplicaState state) {
switch (state) {
case RBW:
return ReplicaStateProto.RBW;
case RUR:
return ReplicaStateProto.RUR;
case RWR:
return ReplicaStateProto.RWR;
case TEMPORARY:
return ReplicaStateProto.TEMPORARY;
case FINALIZED:
default:
return ReplicaStateProto.FINALIZED;
}
}
public static DatanodeRegistrationProto convert(
DatanodeRegistration registration) {

View File

@ -36,6 +36,8 @@
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@ -46,6 +48,7 @@
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
@ -202,14 +205,15 @@ private static NamenodeProtocol createNamenode(InetSocketAddress address,
methodNameToPolicyMap.put("getBlocks", methodPolicy);
methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
return (NamenodeProtocol) RetryProxy.create(NamenodeProtocol.class,
RPC.getProxy(NamenodeProtocol.class,
NamenodeProtocol.versionID,
address,
UserGroupInformation.getCurrentUser(),
conf,
NetUtils.getDefaultSocketFactory(conf)),
methodNameToPolicyMap);
RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
ProtobufRpcEngine.class);
NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class,
RPC.getProtocolVersion(NamenodeProtocolPB.class), address,
UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf));
NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create(
NamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
return new NamenodeProtocolTranslatorPB(retryProxy);
}
/**

View File

@ -105,9 +105,15 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeProtocolServerSideTranslatorR23;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@ -132,18 +138,15 @@
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolServerSideTranslatorR23;
import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolTranslatorR23;
import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeWireProtocol;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.DNS;
@ -167,6 +170,7 @@
import org.mortbay.util.ajax.JSON;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
/**********************************************************
@ -519,21 +523,23 @@ private void initIpcServer(Configuration conf) throws IOException {
conf.get(DFS_DATANODE_IPC_ADDRESS_KEY));
// Add all the RPC protocols that the Datanode implements
ClientDatanodeProtocolServerSideTranslatorR23
clientDatanodeProtocolServerTranslator =
new ClientDatanodeProtocolServerSideTranslatorR23(this);
ipcServer = RPC.getServer(
org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class,
clientDatanodeProtocolServerTranslator, ipcAddr.getHostName(),
ipcAddr.getPort(),
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
DFS_DATANODE_HANDLER_COUNT_DEFAULT),
false, conf, blockPoolTokenSecretManager);
InterDatanodeProtocolServerSideTranslatorR23
interDatanodeProtocolServerTranslator =
new InterDatanodeProtocolServerSideTranslatorR23(this);
ipcServer.addProtocol(RpcKind.RPC_WRITABLE, InterDatanodeWireProtocol.class,
interDatanodeProtocolServerTranslator);
RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator =
new ClientDatanodeProtocolServerSideTranslatorPB(this);
BlockingService service = ClientDatanodeProtocolService
.newReflectiveBlockingService(clientDatanodeProtocolXlator);
ipcServer = RPC.getServer(ClientDatanodeProtocolPB.class, service, ipcAddr
.getHostName(), ipcAddr.getPort(), conf.getInt(
DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT),
false, conf, blockPoolTokenSecretManager);
InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
new InterDatanodeProtocolServerSideTranslatorPB(this);
service = InterDatanodeProtocolService
.newReflectiveBlockingService(interDatanodeProtocolXlator);
DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
ipcServer);
// set service-level authorization security policy
if (conf.getBoolean(
@ -1023,7 +1029,7 @@ public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
return loginUgi
.doAs(new PrivilegedExceptionAction<InterDatanodeProtocol>() {
public InterDatanodeProtocol run() throws IOException {
return new InterDatanodeProtocolTranslatorR23(addr, loginUgi,
return new InterDatanodeProtocolTranslatorPB(addr, loginUgi,
conf, NetUtils.getDefaultSocketFactory(conf), socketTimeout);
}
});

View File

@ -29,8 +29,11 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocolR23Compatible.JournalProtocolServerSideTranslatorR23;
import org.apache.hadoop.hdfs.protocolR23Compatible.JournalWireProtocol;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@ -40,9 +43,10 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.net.NetUtils;
import com.google.protobuf.BlockingService;
/**
* BackupNode.
* <p>
@ -209,10 +213,12 @@ static class BackupNodeRpcServer extends NameNodeRpcServer implements
private BackupNodeRpcServer(Configuration conf, BackupNode nn)
throws IOException {
super(conf, nn);
JournalProtocolServerSideTranslatorR23 journalProtocolTranslator =
new JournalProtocolServerSideTranslatorR23(this);
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, JournalWireProtocol.class,
journalProtocolTranslator);
JournalProtocolServerSideTranslatorPB journalProtocolTranslator =
new JournalProtocolServerSideTranslatorPB(this);
BlockingService service = JournalProtocolService
.newReflectiveBlockingService(journalProtocolTranslator);
DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service,
this.clientRpcServer);
nnRpcAddress = nn.nnRpcAddress;
}
@ -298,9 +304,11 @@ boolean shouldCheckpointAtStartup() {
private NamespaceInfo handshake(Configuration conf) throws IOException {
// connect to name node
InetSocketAddress nnAddress = NameNode.getServiceAddress(conf, true);
this.namenode =
RPC.waitForProxy(NamenodeProtocol.class,
NamenodeProtocol.versionID, nnAddress, conf);
NamenodeProtocolPB proxy =
RPC.waitForProxy(NamenodeProtocolPB.class,
RPC.getProtocolVersion(NamenodeProtocolPB.class),
nnAddress, conf);
this.namenode = new NamenodeProtocolTranslatorPB(proxy);
this.nnRpcAddress = getHostPortString(nnAddress);
this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf));
// get version and id info from the name-node

View File

@ -22,7 +22,7 @@
import java.util.Arrays;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocolR23Compatible.JournalProtocolTranslatorR23;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -57,7 +57,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
NetUtils.createSocketAddr(bnRegistration.getAddress());
try {
this.backupNode =
new JournalProtocolTranslatorR23(bnAddress, new HdfsConfiguration());
new JournalProtocolTranslatorPB(bnAddress, new HdfsConfiguration());
} catch(IOException e) {
Storage.LOG.error("Error connecting to: " + bnAddress, e);
throw e;

View File

@ -199,7 +199,9 @@ private Map<String, String> getAuthFilterParams(Configuration conf)
}
public void stop() throws Exception {
httpServer.stop();
if (httpServer != null) {
httpServer.stop();
}
}
public InetSocketAddress getHttpAddress() {

View File

@ -38,6 +38,8 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -56,6 +58,9 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeProtocolServerSideTranslatorR23;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@ -97,6 +102,8 @@
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import com.google.protobuf.BlockingService;
/**
* This class is responsible for handling all of the RPC calls to the NameNode.
* It is created, started, and stopped by {@link NameNode}.
@ -135,6 +142,11 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
clientProtocolServerTranslator =
new ClientNamenodeProtocolServerSideTranslatorR23(this);
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
new NamenodeProtocolServerSideTranslatorPB(this);
BlockingService service = NamenodeProtocolService
.newReflectiveBlockingService(namenodeProtocolXlator);
InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
int serviceHandlerCount =
@ -149,14 +161,14 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
DatanodeProtocol.class, this);
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
NamenodeProtocol.class, this);
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
RefreshAuthorizationPolicyProtocol.class, this);
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
RefreshUserMappingsProtocol.class, this);
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
GetUserMappingsProtocol.class, this);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
serviceRpcServer);
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
@ -173,14 +185,14 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
namesystem.getDelegationTokenSecretManager());
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
DatanodeProtocol.class, this);
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
NamenodeProtocol.class, this);
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
RefreshAuthorizationPolicyProtocol.class, this);
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
RefreshUserMappingsProtocol.class, this);
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
GetUserMappingsProtocol.class, this);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
clientRpcServer);
// set service-level authorization security policy

View File

@ -48,6 +48,8 @@
import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.JspHelper;
@ -217,9 +219,10 @@ private void initialize(final Configuration conf,
nameNodeAddr = NameNode.getServiceAddress(conf, true);
this.conf = conf;
this.namenode =
(NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class,
NamenodeProtocol.versionID, nameNodeAddr, conf);
NamenodeProtocolPB proxy =
RPC.waitForProxy(NamenodeProtocolPB.class,
RPC.getProtocolVersion(NamenodeProtocolPB.class), nameNodeAddr, conf);
this.namenode = new NamenodeProtocolTranslatorPB(proxy);
// initialize checkpoint directories
fsName = getInfoServer();

View File

@ -48,7 +48,8 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@ -62,7 +63,6 @@
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -515,8 +515,8 @@ private void initMiniDFSCluster(int nameNodePort, int nameNodeHttpPort,
Class<?> rpcEngine = conf.getClassByName(rpcEngineName);
setRpcEngine(conf, NamenodeProtocols.class, rpcEngine);
setRpcEngine(conf, ClientNamenodeWireProtocol.class, rpcEngine);
setRpcEngine(conf, ClientDatanodeWireProtocol.class, rpcEngine);
setRpcEngine(conf, NamenodeProtocol.class, rpcEngine);
setRpcEngine(conf, ClientDatanodeProtocolPB.class, rpcEngine);
setRpcEngine(conf, NamenodeProtocolPB.class, rpcEngine);
setRpcEngine(conf, ClientProtocol.class, rpcEngine);
setRpcEngine(conf, DatanodeProtocol.class, rpcEngine);
setRpcEngine(conf, RefreshAuthorizationPolicyProtocol.class, rpcEngine);

View File

@ -638,7 +638,7 @@ public void testClientDNProtocolTimeout() throws IOException {
proxy = DFSUtil.createClientDatanodeProtocolProxy(
fakeDnId, conf, 500, fakeBlock);
proxy.getReplicaVisibleLength(null);
proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1));
fail ("Did not get expected exception: SocketTimeoutException");
} catch (SocketTimeoutException e) {
LOG.info("Got the expected Exception: SocketTimeoutException");

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@ -32,9 +33,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import junit.framework.TestCase;
@ -98,10 +97,8 @@ public void testGetBlocks() throws Exception {
// get RPC client to namenode
InetSocketAddress addr = new InetSocketAddress("localhost",
cluster.getNameNodePort());
NamenodeProtocol namenode = (NamenodeProtocol) RPC.getProxy(
NamenodeProtocol.class, NamenodeProtocol.versionID, addr,
UserGroupInformation.getCurrentUser(), CONF,
NetUtils.getDefaultSocketFactory(CONF));
NamenodeProtocol namenode = new NamenodeProtocolTranslatorPB(addr, CONF,
UserGroupInformation.getCurrentUser());
// get blocks of size fileLen from dataNodes[0]
BlockWithLocations[] locs;

View File

@ -57,8 +57,14 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.io.TestWritable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
@ -76,6 +82,10 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/** Unit tests for block tokens */
public class TestBlockToken {
public static final Log LOG = LogFactory.getLog(TestBlockToken.class);
@ -106,22 +116,24 @@ public class TestBlockToken {
ExtendedBlock block2 = new ExtendedBlock("10", 10L);
ExtendedBlock block3 = new ExtendedBlock("-10", -108L);
private static class getLengthAnswer implements Answer<Long> {
private static class GetLengthAnswer implements
Answer<GetReplicaVisibleLengthResponseProto> {
BlockTokenSecretManager sm;
BlockTokenIdentifier ident;
public getLengthAnswer(BlockTokenSecretManager sm,
public GetLengthAnswer(BlockTokenSecretManager sm,
BlockTokenIdentifier ident) {
this.sm = sm;
this.ident = ident;
}
@Override
public Long answer(InvocationOnMock invocation) throws IOException {
public GetReplicaVisibleLengthResponseProto answer(
InvocationOnMock invocation) throws IOException {
Object args[] = invocation.getArguments();
assertEquals(1, args.length);
org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable block =
(org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable) args[0];
assertEquals(2, args.length);
GetReplicaVisibleLengthRequestProto req =
(GetReplicaVisibleLengthRequestProto) args[1];
Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
.getTokenIdentifiers();
assertEquals("Only one BlockTokenIdentifier expected", 1, tokenIds.size());
@ -130,12 +142,12 @@ public Long answer(InvocationOnMock invocation) throws IOException {
BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
LOG.info("Got: " + id.toString());
assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
sm.checkAccess(id, null, org.apache.hadoop.hdfs.protocolR23Compatible.
ExtendedBlockWritable.convertExtendedBlock(block),
sm.checkAccess(id, null, PBHelper.convert(req.getBlock()),
BlockTokenSecretManager.AccessMode.WRITE);
result = id.getBlockId();
}
return result;
return GetReplicaVisibleLengthResponseProto.newBuilder()
.setLength(result).build();
}
}
@ -208,25 +220,29 @@ public void testBlockTokenSecretManager() throws Exception {
}
private Server createMockDatanode(BlockTokenSecretManager sm,
Token<BlockTokenIdentifier> token) throws IOException {
org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol mockDN =
mock(org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class);
Token<BlockTokenIdentifier> token) throws IOException, ServiceException {
ClientDatanodeProtocolPB mockDN = mock(ClientDatanodeProtocolPB.class);
when(mockDN.getProtocolVersion(anyString(), anyLong())).thenReturn(
org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.versionID);
RPC.getProtocolVersion(ClientDatanodeProtocolPB.class));
doReturn(
ProtocolSignature.getProtocolSignature(mockDN,
org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class.getName(),
org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.versionID, 0)).when(mockDN)
.getProtocolSignature(anyString(), anyLong(), anyInt());
ClientDatanodeProtocolPB.class.getName(),
RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), 0)).when(
mockDN).getProtocolSignature(anyString(), anyLong(), anyInt());
BlockTokenIdentifier id = sm.createIdentifier();
id.readFields(new DataInputStream(new ByteArrayInputStream(token
.getIdentifier())));
doAnswer(new getLengthAnswer(sm, id)).when(mockDN).getReplicaVisibleLength(
any(org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable.class));
doAnswer(new GetLengthAnswer(sm, id)).when(mockDN)
.getReplicaVisibleLength(any(RpcController.class),
any(GetReplicaVisibleLengthRequestProto.class));
return RPC.getServer(org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class,
mockDN, ADDRESS, 0, 5,
RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
BlockingService service = ClientDatanodeProtocolService
.newReflectiveBlockingService(mockDN);
return RPC.getServer(ClientDatanodeProtocolPB.class, service, ADDRESS, 0, 5,
true, conf, sm);
}
@ -323,7 +339,7 @@ public void testBlockTokenRpcLeak() throws Exception {
/**
* @return the current number of file descriptors open by this process.
*/
private static int countOpenFileDescriptors() throws IOException {
private static int countOpenFileDescriptors() {
return FD_DIR.list().length;
}

View File

@ -150,7 +150,6 @@ public void testBlockMetaDataInfo() throws Exception {
DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
datanodeinfo[0], conf, datanode.getDnConf().socketTimeout);
assertTrue(datanode != null);
//stop block scanner, so we could compare lastScanTime
if (datanode.blockScanner != null) {
@ -347,8 +346,8 @@ public void testUpdateReplicaUnderRecovery() throws IOException {
/** Test to verify that InterDatanode RPC timesout as expected when
* the server DN does not respond.
*/
@Test
public void testInterDNProtocolTimeout() throws Exception {
@Test(expected=SocketTimeoutException.class)
public void testInterDNProtocolTimeout() throws Throwable {
final Server server = new TestServer(1, true);
server.start();
@ -361,10 +360,9 @@ public void testInterDNProtocolTimeout() throws Exception {
try {
proxy = DataNode.createInterDataNodeProtocolProxy(
dInfo, conf, 500);
proxy.initReplicaRecovery(null);
proxy.initReplicaRecovery(new RecoveringBlock(
new ExtendedBlock("bpid", 1), null, 100));
fail ("Expected SocketTimeoutException exception, but did not get.");
} catch (SocketTimeoutException e) {
DataNode.LOG.info("Got expected Exception: SocketTimeoutException" + e);
} finally {
if (proxy != null) {
RPC.stopProxy(proxy);