diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6dcd943ac0..98b931702d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -40,6 +40,8 @@ Trunk (unreleased changes) HDFS-2663. Optional protobuf parameters are not handled correctly. (suresh) + HDFS-2661. Enable protobuf RPC for DatanodeProtocol. (jitendra) + IMPROVEMENTS HADOOP-7524 Change RPC to allow multiple protocols including multuple diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 66db4c39cc..05cd5d3d31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -168,7 +168,7 @@ public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration, throws IOException { HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)).setCapacity(capacity) - .setCapacity(dfsUsed).setRemaining(remaining) + .setDfsUsed(dfsUsed).setRemaining(remaining) .setBlockPoolUsed(blockPoolUsed).setXmitsInProgress(xmitsInProgress) .setXceiverCount(xceiverCount).setFailedVolumes(failedVolumes).build(); HeartbeatResponseProto resp; @@ -194,7 +194,7 @@ public DatanodeCommand blockReport(DatanodeRegistration registration, .setBlockPoolId(poolId); if (blocks != null) { for (int i = 0; i < blocks.length; i++) { - builder.setBlocks(i, blocks[i]); + builder.addBlocks(blocks[i]); } } BlockReportRequestProto req = builder.build(); @@ -217,7 +217,7 @@ public void blockReceivedAndDeleted(DatanodeRegistration registration, .setBlockPoolId(poolId); if (receivedAndDeletedBlocks != null) { for (int i = 0; i < receivedAndDeletedBlocks.length; i++) { - builder.setBlocks(i, PBHelper.convert(receivedAndDeletedBlocks[i])); + builder.addBlocks(PBHelper.convert(receivedAndDeletedBlocks[i])); } } BlockReceivedAndDeletedRequestProto req = builder.build(); @@ -290,7 +290,7 @@ public void commitBlockSynchronization(ExtendedBlock block, .setNewLength(newlength).setCloseFile(closeFile) .setDeleteBlock(deleteblock); for (int i = 0; i < newtargets.length; i++) { - builder.setNewTaragets(i, PBHelper.convert(newtargets[i])); + builder.addNewTaragets(PBHelper.convert(newtargets[i])); } CommitBlockSynchronizationRequestProto req = builder.build(); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 2dbf9150e9..f0526e4bb7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -96,7 +96,7 @@ public RegisterDatanodeResponseProto registerDatanode( @Override public HeartbeatResponseProto sendHeartbeat(RpcController controller, HeartbeatRequestProto request) throws ServiceException { - DatanodeCommand[] cmds; + DatanodeCommand[] cmds = null; try { cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), request.getCapacity(), request.getDfsUsed(), request.getRemaining(), @@ -120,7 +120,7 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller, @Override public BlockReportResponseProto blockReport(RpcController controller, BlockReportRequestProto request) throws ServiceException { - DatanodeCommand cmd; + DatanodeCommand cmd = null; List blockIds = request.getBlocksList(); long[] blocks = new long[blockIds.size()]; for (int i = 0; i < blockIds.size(); i++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index c684ee2688..232b820758 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -665,6 +665,9 @@ public static BlockCommandProto convert(BlockCommand cmd) { case DatanodeProtocol.DNA_INVALIDATE: builder.setAction(BlockCommandProto.Action.INVALIDATE); break; + case DatanodeProtocol.DNA_SHUTDOWN: + builder.setAction(BlockCommandProto.Action.SHUTDOWN); + break; } Block[] blocks = cmd.getBlocks(); for (int i = 0; i < blocks.length; i++) { @@ -685,6 +688,10 @@ private static List convert(DatanodeInfo[][] targets) { public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) { DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder(); + if (datanodeCommand == null) { + return builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand) + .build(); + } switch (datanodeCommand.getAction()) { case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE: builder.setCmdType(DatanodeCommandProto.Type.BalancerBandwidthCommand) @@ -711,11 +718,18 @@ public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) { break; case DatanodeProtocol.DNA_TRANSFER: case DatanodeProtocol.DNA_INVALIDATE: + case DatanodeProtocol.DNA_SHUTDOWN: builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd( PBHelper.convert((BlockCommand) datanodeCommand)); break; - case DatanodeProtocol.DNA_SHUTDOWN: //Not expected + case DatanodeProtocol.DNA_UC_ACTION_REPORT_STATUS: + case DatanodeProtocol.DNA_UC_ACTION_START_UPGRADE: + builder.setCmdType(DatanodeCommandProto.Type.UpgradeCommand) + .setUpgradeCmd(PBHelper.convert((UpgradeCommand) datanodeCommand)); + break; case DatanodeProtocol.DNA_UNKNOWN: //Not expected + default: + builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand); } return builder.build(); } @@ -754,13 +768,15 @@ public static BlockRecoveryCommand convert( public static BlockCommand convert(BlockCommandProto blkCmd) { List blockProtoList = blkCmd.getBlocksList(); - List targetList = blkCmd.getTargetsList(); - DatanodeInfo[][] targets = new DatanodeInfo[blockProtoList.size()][]; Block[] blocks = new Block[blockProtoList.size()]; for (int i = 0; i < blockProtoList.size(); i++) { - targets[i] = PBHelper.convert(targetList.get(i)); blocks[i] = PBHelper.convert(blockProtoList.get(i)); } + List targetList = blkCmd.getTargetsList(); + DatanodeInfo[][] targets = new DatanodeInfo[targetList.size()][]; + for (int i = 0; i < targetList.size(); i++) { + targets[i] = PBHelper.convert(targetList.get(i)); + } int action = DatanodeProtocol.DNA_UNKNOWN; switch (blkCmd.getAction()) { case TRANSFER: @@ -769,6 +785,9 @@ public static BlockCommand convert(BlockCommandProto blkCmd) { case INVALIDATE: action = DatanodeProtocol.DNA_INVALIDATE; break; + case SHUTDOWN: + action = DatanodeProtocol.DNA_SHUTDOWN; + break; } return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets); } @@ -800,9 +819,13 @@ public static ReceivedDeletedBlockInfoProto convert( } public static UpgradeCommandProto convert(UpgradeCommand comm) { - UpgradeCommandProto.Builder builder = UpgradeCommandProto.newBuilder() - .setVersion(comm.getVersion()) - .setUpgradeStatus(comm.getCurrentStatus()); + UpgradeCommandProto.Builder builder = UpgradeCommandProto.newBuilder(); + if (comm == null) { + return builder.setAction(UpgradeCommandProto.Action.UNKNOWN) + .setVersion(0).setUpgradeStatus(0).build(); + } + builder.setVersion(comm.getVersion()).setUpgradeStatus( + comm.getCurrentStatus()); switch (comm.getAction()) { case UpgradeCommand.UC_ACTION_REPORT_STATUS: builder.setAction(UpgradeCommandProto.Action.REPORT_STATUS); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 4d098ebec2..635714a332 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -93,7 +95,7 @@ class BPOfferService implements Runnable { boolean resetBlockReportTime = true; Thread bpThread; - DatanodeProtocol bpNamenode; + DatanodeProtocolClientSideTranslatorPB bpNamenode; private long lastHeartbeat = 0; private volatile boolean initialized = false; private final LinkedList receivedAndDeletedBlockList @@ -164,7 +166,7 @@ InetSocketAddress getNNSocketAddress() { * Used to inject a spy NN in the unit tests. */ @VisibleForTesting - void setNameNode(DatanodeProtocol dnProtocol) { + void setNameNode(DatanodeProtocolClientSideTranslatorPB dnProtocol) { bpNamenode = dnProtocol; } @@ -224,8 +226,8 @@ private void checkNNVersion(NamespaceInfo nsInfo) private void connectToNNAndHandshake() throws IOException { // get NN proxy - bpNamenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class, - DatanodeProtocol.versionID, nnAddr, dn.getConf()); + bpNamenode = new DatanodeProtocolClientSideTranslatorPB(nnAddr, + dn.getConf()); // First phase of the handshake with NN - get the namespace // info. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 4cb1d79a38..dba65dad46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -111,6 +111,7 @@ import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; @@ -1277,7 +1278,7 @@ private void handleDiskError(String errMsgr) { //inform NameNodes for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) { - DatanodeProtocol nn = bpos.bpNamenode; + DatanodeProtocolClientSideTranslatorPB nn = bpos.bpNamenode; try { nn.errorReport(bpos.bpRegistration, dpError, errMsgr); } catch(IOException e) { @@ -1310,7 +1311,8 @@ UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) { private void transferBlock( ExtendedBlock block, DatanodeInfo xferTargets[] ) throws IOException { - DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId()); + DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block + .getBlockPoolId()); DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId()); if (!data.isValidBlock(block)) { @@ -1978,7 +1980,8 @@ private void recoverBlock(RecoveringBlock rBlock) throws IOException { * @return Namenode corresponding to the bpid * @throws IOException */ - public DatanodeProtocol getBPNamenode(String bpid) throws IOException { + public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid) + throws IOException { BPOfferService bpos = blockPoolManager.get(bpid); if(bpos == null || bpos.bpNamenode == null) { throw new IOException("cannot find a namnode proxy for bpid=" + bpid); @@ -1990,7 +1993,8 @@ public DatanodeProtocol getBPNamenode(String bpid) throws IOException { void syncBlock(RecoveringBlock rBlock, List syncList) throws IOException { ExtendedBlock block = rBlock.getBlock(); - DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId()); + DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block + .getBlockPoolId()); long recoveryId = rBlock.getNewGenerationStamp(); if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index e4f5de0505..924e63192c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -59,6 +59,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; @@ -142,6 +145,11 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorR23(this); + DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator = + new DatanodeProtocolServerSideTranslatorPB(this); + BlockingService dnProtoPbService = DatanodeProtocolService + .newReflectiveBlockingService(dnProtoPbTranslator); + NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = new NamenodeProtocolServerSideTranslatorPB(this); BlockingService service = NamenodeProtocolService @@ -159,8 +167,6 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); - this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE, - DatanodeProtocol.class, this); this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE, RefreshAuthorizationPolicyProtocol.class, this); this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE, @@ -169,6 +175,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) GetUserMappingsProtocol.class, this); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service, serviceRpcServer); + DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, + serviceRpcServer); this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress(); nn.setRpcServiceServerAddress(conf, serviceRPCAddress); @@ -183,8 +191,6 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) clientProtocolServerTranslator, socAddr.getHostName(), socAddr.getPort(), handlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); - this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, - DatanodeProtocol.class, this); this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, RefreshAuthorizationPolicyProtocol.class, this); this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, @@ -193,7 +199,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) GetUserMappingsProtocol.class, this); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service, clientRpcServer); - + DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, + clientRpcServer); // set service-level authorization security policy if (serviceAuthEnabled = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 28f54e86ee..ee93a8fcd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -76,6 +76,8 @@ public interface DatanodeProtocol extends VersionedProtocol { final static int DNA_RECOVERBLOCK = 6; // request a block recovery final static int DNA_ACCESSKEYUPDATE = 7; // update access key final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth + final static int DNA_UC_ACTION_REPORT_STATUS = 100; // Report upgrade status + final static int DNA_UC_ACTION_START_UPGRADE = 101; // start upgrade /** * Register Datanode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/UpgradeCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/UpgradeCommand.java index fd9263fbc6..a6de55d843 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/UpgradeCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/UpgradeCommand.java @@ -41,8 +41,10 @@ @InterfaceStability.Evolving public class UpgradeCommand extends DatanodeCommand { public final static int UC_ACTION_UNKNOWN = DatanodeProtocol.DNA_UNKNOWN; - public final static int UC_ACTION_REPORT_STATUS = 100; // report upgrade status - public final static int UC_ACTION_START_UPGRADE = 101; // start upgrade + public final static int UC_ACTION_REPORT_STATUS = + DatanodeProtocol.DNA_UC_ACTION_REPORT_STATUS; + public final static int UC_ACTION_START_UPGRADE = + DatanodeProtocol.DNA_UC_ACTION_START_UPGRADE; private int version; private short upgradeStatus; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 71f609e40a..dff8d5ab1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -47,6 +47,7 @@ message DatanodeCommandProto { KeyUpdateCommand = 4; RegisterCommand = 5; UpgradeCommand = 6; + NullDatanodeCommand = 7; } required Type cmdType = 1; // Type of the command @@ -80,6 +81,7 @@ message BlockCommandProto { enum Action { TRANSFER = 1; // Transfer blocks to another datanode INVALIDATE = 2; // Invalidate blocks + SHUTDOWN = 3; // Shutdown the datanode } required Action action = 1; required string blockPoolId = 2; @@ -190,7 +192,7 @@ message BlockReportRequestProto { * cmd - Command from namenode to the datanode */ message BlockReportResponseProto { - required DatanodeCommandProto cmd = 1; + optional DatanodeCommandProto cmd = 1; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index ee3e46998b..e2ef348760 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; import org.apache.hadoop.hdfs.server.common.Storage; @@ -518,7 +519,7 @@ private void initMiniDFSCluster(int nameNodePort, int nameNodeHttpPort, setRpcEngine(conf, ClientDatanodeProtocolPB.class, rpcEngine); setRpcEngine(conf, NamenodeProtocolPB.class, rpcEngine); setRpcEngine(conf, ClientProtocol.class, rpcEngine); - setRpcEngine(conf, DatanodeProtocol.class, rpcEngine); + setRpcEngine(conf, DatanodeProtocolPB.class, rpcEngine); setRpcEngine(conf, RefreshAuthorizationPolicyProtocol.class, rpcEngine); setRpcEngine(conf, RefreshUserMappingsProtocol.class, rpcEngine); setRpcEngine(conf, GetUserMappingsProtocol.class, rpcEngine); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java index 97554e7a80..ca9b3dcfb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java @@ -23,8 +23,8 @@ import static org.mockito.Mockito.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; -import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.junit.Test; import org.mockito.Mockito; @@ -46,7 +46,8 @@ public void testDataNodeRegister() throws Exception { NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class); when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion"); - DatanodeProtocol fakeDNProt = mock(DatanodeProtocol.class); + DatanodeProtocolClientSideTranslatorPB fakeDNProt = + mock(DatanodeProtocolClientSideTranslatorPB.class); when(fakeDNProt.versionRequest()).thenReturn(fakeNSInfo); bpos.setNameNode( fakeDNProt );