HDFS-2481 Unknown protocol: org.apache.hadoop.hdfs.protocol.ClientProtocol (sanjay)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1188300 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanjay Radia 2011-10-24 19:08:03 +00:00
parent 8fe95d276d
commit 0920056f04
5 changed files with 48 additions and 30 deletions

View File

@ -147,6 +147,8 @@ Trunk (unreleased changes)
getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem
and cleared content type in ExceptionHandler. (szetszwo)
HDFS-2481 Unknown protocol: org.apache.hadoop.hdfs.protocol.ClientProtocol (sanjay)
Release 0.23.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -178,7 +178,9 @@ public void stop() {
}
}
// Stop the RPC client
RPC.stopProxy(namenode);
if (namenode != null) {
RPC.stopProxy(namenode);
}
namenode = null;
// Stop the checkpoint manager
if(checkpointManager != null) {
@ -198,7 +200,7 @@ private BackupNodeRpcServer(Configuration conf, BackupNode nn)
super(conf, nn);
JournalProtocolServerSideTranslatorR23 journalProtocolTranslator =
new JournalProtocolServerSideTranslatorR23(this);
this.server.addProtocol(JournalWireProtocol.class,
this.clientRpcServer.addProtocol(JournalWireProtocol.class,
journalProtocolTranslator);
nnRpcAddress = nn.nnRpcAddress;
}
@ -251,7 +253,7 @@ public void journal(NamenodeRegistration nnReg,
verifyRequest(nnReg);
if(!nnRpcAddress.equals(nnReg.getAddress()))
throw new IOException("Journal request from unexpected name-node: "
+ nnReg.getAddress() + " expecting " + rpcAddress);
+ nnReg.getAddress() + " expecting " + clientRpcAddress);
getBNImage().journal(firstTxId, numTxns, records);
}

View File

@ -117,8 +117,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
private final InetSocketAddress serviceRPCAddress;
/** The RPC server that listens to requests from clients */
protected final RPC.Server server;
protected final InetSocketAddress rpcAddress;
protected final RPC.Server clientRpcServer;
protected final InetSocketAddress clientRpcAddress;
public NameNodeRpcServer(Configuration conf, NameNode nn)
throws IOException {
@ -130,15 +130,30 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
DFS_DATANODE_HANDLER_COUNT_DEFAULT);
InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
ClientNamenodeProtocolServerSideTranslatorR23
clientProtocolServerTranslator =
new ClientNamenodeProtocolServerSideTranslatorR23(this);
InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
int serviceHandlerCount =
conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
this.serviceRpcServer = RPC.getServer(NamenodeProtocols.class, this,
dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount,
// Add all the RPC protocols that the namenode implements
this.serviceRpcServer =
RPC.getServer(org.apache.hadoop.hdfs.protocolR23Compatible.
ClientNamenodeWireProtocol.class, clientProtocolServerTranslator,
dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRpcServer.addProtocol(DatanodeProtocol.class, this);
this.serviceRpcServer.addProtocol(NamenodeProtocol.class, this);
this.serviceRpcServer.addProtocol(
RefreshAuthorizationPolicyProtocol.class, this);
this.serviceRpcServer.addProtocol(
RefreshUserMappingsProtocol.class, this);
this.serviceRpcServer.addProtocol(GetUserMappingsProtocol.class, this);
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
} else {
@ -146,40 +161,40 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
serviceRPCAddress = null;
}
// Add all the RPC protocols that the namenode implements
ClientNamenodeProtocolServerSideTranslatorR23 clientProtocolServerTranslator =
new ClientNamenodeProtocolServerSideTranslatorR23(this);
this.server = RPC.getServer(
org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol.class,
this.clientRpcServer = RPC.getServer(
org.apache.hadoop.hdfs.protocolR23Compatible.
ClientNamenodeWireProtocol.class,
clientProtocolServerTranslator, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf,
namesystem.getDelegationTokenSecretManager());
this.server.addProtocol(DatanodeProtocol.class, this);
this.server.addProtocol(NamenodeProtocol.class, this);
this.server.addProtocol(RefreshAuthorizationPolicyProtocol.class, this);
this.server.addProtocol(RefreshUserMappingsProtocol.class, this);
this.server.addProtocol(GetUserMappingsProtocol.class, this);
this.clientRpcServer.addProtocol(DatanodeProtocol.class, this);
this.clientRpcServer.addProtocol(NamenodeProtocol.class, this);
this.clientRpcServer.addProtocol(
RefreshAuthorizationPolicyProtocol.class, this);
this.clientRpcServer.addProtocol(RefreshUserMappingsProtocol.class, this);
this.clientRpcServer.addProtocol(GetUserMappingsProtocol.class, this);
// set service-level authorization security policy
if (serviceAuthEnabled =
conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
this.server.refreshServiceAcl(conf, new HDFSPolicyProvider());
this.clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
if (this.serviceRpcServer != null) {
this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
}
// The rpc-server port can be ephemeral... ensure we have the correct info
this.rpcAddress = this.server.getListenerAddress();
nn.setRpcServerAddress(conf, rpcAddress);
this.clientRpcAddress = this.clientRpcServer.getListenerAddress();
nn.setRpcServerAddress(conf, clientRpcAddress);
}
/**
* Actually start serving requests.
*/
void start() {
server.start(); //start RPC server
clientRpcServer.start(); //start RPC server
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
@ -189,11 +204,11 @@ void start() {
* Wait until the RPC server has shut down.
*/
void join() throws InterruptedException {
this.server.join();
this.clientRpcServer.join();
}
void stop() {
if(server != null) server.stop();
if(clientRpcServer != null) clientRpcServer.stop();
if(serviceRpcServer != null) serviceRpcServer.stop();
}
@ -202,7 +217,7 @@ InetSocketAddress getServiceRpcAddress() {
}
InetSocketAddress getRpcAddress() {
return rpcAddress;
return clientRpcAddress;
}
@Override // VersionedProtocol
@ -877,7 +892,7 @@ public void refreshServiceAcl() throws IOException {
throw new AuthorizationException("Service Level Authorization not enabled!");
}
this.server.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
this.clientRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
if (this.serviceRpcServer != null) {
this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
}

View File

@ -1146,10 +1146,9 @@ private ClientDatanodeProtocol getDataNodeProxy(String datanode)
conf.get(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, ""));
// Create the client
ClientDatanodeProtocol dnProtocol = RPC.getProxy(
ClientDatanodeProtocol.class, ClientDatanodeProtocol.versionID,
datanodeAddr, getUGI(), conf, NetUtils.getSocketFactory(conf,
ClientDatanodeProtocol.class));
ClientDatanodeProtocol dnProtocol =
DFSUtil.createClientDatanodeProtocolProxy(datanodeAddr, getUGI(), conf,
NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class));
return dnProtocol;
}

View File

@ -52,7 +52,7 @@ public static LocatedBlocks getBlockLocations(NameNode namenode,
* @return rpc server
*/
public static Server getRpcServer(NameNode namenode) {
return ((NameNodeRpcServer)namenode.getRpcServer()).server;
return ((NameNodeRpcServer)namenode.getRpcServer()).clientRpcServer;
}
public static DelegationTokenSecretManager getDtSecretManager(