HDFS-3880. Use Builder to build RPC server in HDFS. Contributed by Brandon Li.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1379917 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-09-02 06:20:41 +00:00
parent 54e612bfb9
commit da3bd67138
6 changed files with 39 additions and 22 deletions

View File

@ -133,6 +133,9 @@ Trunk (unreleased changes)
HDFS-2580. NameNode#main(...) can make use of GenericOptionsParser. (harsh) HDFS-2580. NameNode#main(...) can make use of GenericOptionsParser. (harsh)
HDFS-3880. Use Builder to build RPC server in HDFS.
(Brandon Li vias suresh)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -417,10 +417,15 @@ private void initIpcServer(Configuration conf) throws IOException {
new ClientDatanodeProtocolServerSideTranslatorPB(this); new ClientDatanodeProtocolServerSideTranslatorPB(this);
BlockingService service = ClientDatanodeProtocolService BlockingService service = ClientDatanodeProtocolService
.newReflectiveBlockingService(clientDatanodeProtocolXlator); .newReflectiveBlockingService(clientDatanodeProtocolXlator);
ipcServer = RPC.getServer(ClientDatanodeProtocolPB.class, service, ipcAddr ipcServer = new RPC.Builder(conf)
.getHostName(), ipcAddr.getPort(), conf.getInt( .setProtocol(ClientDatanodeProtocolPB.class)
DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT), .setInstance(service)
false, conf, blockPoolTokenSecretManager); .setBindAddress(ipcAddr.getHostName())
.setPort(ipcAddr.getPort())
.setNumHandlers(
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
.setSecretManager(blockPoolTokenSecretManager).build();
InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
new InterDatanodeProtocolServerSideTranslatorPB(this); new InterDatanodeProtocolServerSideTranslatorPB(this);

View File

@ -283,8 +283,9 @@ private static RPC.Server createRpcServer(Configuration conf,
new JournalProtocolServerSideTranslatorPB(impl); new JournalProtocolServerSideTranslatorPB(impl);
BlockingService service = BlockingService service =
JournalProtocolService.newReflectiveBlockingService(xlator); JournalProtocolService.newReflectiveBlockingService(xlator);
return RPC.getServer(JournalProtocolPB.class, service, return new RPC.Builder(conf).setProtocol(JournalProtocolPB.class)
address.getHostName(), address.getPort(), 1, false, conf, null); .setInstance(service).setBindAddress(address.getHostName())
.setPort(address.getPort()).setNumHandlers(1).setVerbose(false).build();
} }
private void verifyEpoch(long e) throws FencedException { private void verifyEpoch(long e) throws FencedException {

View File

@ -206,12 +206,15 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
// Add all the RPC protocols that the namenode implements // Add all the RPC protocols that the namenode implements
this.serviceRpcServer = this.serviceRpcServer = new RPC.Builder(conf)
RPC.getServer(org.apache.hadoop.hdfs.protocolPB. .setProtocol(
ClientNamenodeProtocolPB.class, clientNNPbService, org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
dnSocketAddr.getHostName(), dnSocketAddr.getPort(), .setInstance(clientNNPbService)
serviceHandlerCount, .setBindAddress(dnSocketAddr.getHostName())
false, conf, namesystem.getDelegationTokenSecretManager()); .setPort(dnSocketAddr.getPort()).setNumHandlers(serviceHandlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
serviceRpcServer); serviceRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
@ -232,11 +235,13 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
serviceRPCAddress = null; serviceRPCAddress = null;
} }
// Add all the RPC protocols that the namenode implements // Add all the RPC protocols that the namenode implements
this.clientRpcServer = RPC.getServer( this.clientRpcServer = new RPC.Builder(conf)
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class, .setProtocol(
clientNNPbService, socAddr.getHostName(), org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
socAddr.getPort(), handlerCount, false, conf, .setInstance(clientNNPbService).setBindAddress(socAddr.getHostName())
namesystem.getDelegationTokenSecretManager()); .setPort(socAddr.getPort()).setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager()).build();
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
clientRpcServer); clientRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,

View File

@ -80,9 +80,11 @@ public void testDelegationTokenRpc() throws Exception {
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT, DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
3600000, mockNameSys); 3600000, mockNameSys);
sm.startThreads(); sm.startThreads();
final Server server = RPC.getServer(ClientProtocol.class, mockNN, ADDRESS, final Server server = new RPC.Builder(conf)
0, 5, true, conf, sm); .setProtocol(ClientProtocol.class).setInstance(mockNN)
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.setSecretManager(sm).build();
server.start(); server.start();
final UserGroupInformation current = UserGroupInformation.getCurrentUser(); final UserGroupInformation current = UserGroupInformation.getCurrentUser();

View File

@ -231,8 +231,9 @@ private Server createMockDatanode(BlockTokenSecretManager sm,
ProtobufRpcEngine.class); ProtobufRpcEngine.class);
BlockingService service = ClientDatanodeProtocolService BlockingService service = ClientDatanodeProtocolService
.newReflectiveBlockingService(mockDN); .newReflectiveBlockingService(mockDN);
return RPC.getServer(ClientDatanodeProtocolPB.class, service, ADDRESS, 0, 5, return new RPC.Builder(conf).setProtocol(ClientDatanodeProtocolPB.class)
true, conf, sm); .setInstance(service).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
} }
@Test @Test