HDFS-2661. Enable protobuf RPC for DatanodeProtocol.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1214033 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jitendra Nath Pandey 2011-12-14 02:15:37 +00:00
parent 433a6e78f6
commit 3cffe34177
12 changed files with 78 additions and 32 deletions

View File

@ -40,6 +40,8 @@ Trunk (unreleased changes)
HDFS-2663. Optional protobuf parameters are not handled correctly. HDFS-2663. Optional protobuf parameters are not handled correctly.
(suresh) (suresh)
HDFS-2661. Enable protobuf RPC for DatanodeProtocol. (jitendra)
IMPROVEMENTS IMPROVEMENTS
HADOOP-7524 Change RPC to allow multiple protocols including multuple HADOOP-7524 Change RPC to allow multiple protocols including multuple

View File

@ -168,7 +168,7 @@ public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
throws IOException { throws IOException {
HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder() HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)).setCapacity(capacity) .setRegistration(PBHelper.convert(registration)).setCapacity(capacity)
.setCapacity(dfsUsed).setRemaining(remaining) .setDfsUsed(dfsUsed).setRemaining(remaining)
.setBlockPoolUsed(blockPoolUsed).setXmitsInProgress(xmitsInProgress) .setBlockPoolUsed(blockPoolUsed).setXmitsInProgress(xmitsInProgress)
.setXceiverCount(xceiverCount).setFailedVolumes(failedVolumes).build(); .setXceiverCount(xceiverCount).setFailedVolumes(failedVolumes).build();
HeartbeatResponseProto resp; HeartbeatResponseProto resp;
@ -194,7 +194,7 @@ public DatanodeCommand blockReport(DatanodeRegistration registration,
.setBlockPoolId(poolId); .setBlockPoolId(poolId);
if (blocks != null) { if (blocks != null) {
for (int i = 0; i < blocks.length; i++) { for (int i = 0; i < blocks.length; i++) {
builder.setBlocks(i, blocks[i]); builder.addBlocks(blocks[i]);
} }
} }
BlockReportRequestProto req = builder.build(); BlockReportRequestProto req = builder.build();
@ -217,7 +217,7 @@ public void blockReceivedAndDeleted(DatanodeRegistration registration,
.setBlockPoolId(poolId); .setBlockPoolId(poolId);
if (receivedAndDeletedBlocks != null) { if (receivedAndDeletedBlocks != null) {
for (int i = 0; i < receivedAndDeletedBlocks.length; i++) { for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
builder.setBlocks(i, PBHelper.convert(receivedAndDeletedBlocks[i])); builder.addBlocks(PBHelper.convert(receivedAndDeletedBlocks[i]));
} }
} }
BlockReceivedAndDeletedRequestProto req = builder.build(); BlockReceivedAndDeletedRequestProto req = builder.build();
@ -290,7 +290,7 @@ public void commitBlockSynchronization(ExtendedBlock block,
.setNewLength(newlength).setCloseFile(closeFile) .setNewLength(newlength).setCloseFile(closeFile)
.setDeleteBlock(deleteblock); .setDeleteBlock(deleteblock);
for (int i = 0; i < newtargets.length; i++) { for (int i = 0; i < newtargets.length; i++) {
builder.setNewTaragets(i, PBHelper.convert(newtargets[i])); builder.addNewTaragets(PBHelper.convert(newtargets[i]));
} }
CommitBlockSynchronizationRequestProto req = builder.build(); CommitBlockSynchronizationRequestProto req = builder.build();
try { try {

View File

@ -96,7 +96,7 @@ public RegisterDatanodeResponseProto registerDatanode(
@Override @Override
public HeartbeatResponseProto sendHeartbeat(RpcController controller, public HeartbeatResponseProto sendHeartbeat(RpcController controller,
HeartbeatRequestProto request) throws ServiceException { HeartbeatRequestProto request) throws ServiceException {
DatanodeCommand[] cmds; DatanodeCommand[] cmds = null;
try { try {
cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
request.getCapacity(), request.getDfsUsed(), request.getRemaining(), request.getCapacity(), request.getDfsUsed(), request.getRemaining(),
@ -120,7 +120,7 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
@Override @Override
public BlockReportResponseProto blockReport(RpcController controller, public BlockReportResponseProto blockReport(RpcController controller,
BlockReportRequestProto request) throws ServiceException { BlockReportRequestProto request) throws ServiceException {
DatanodeCommand cmd; DatanodeCommand cmd = null;
List<Long> blockIds = request.getBlocksList(); List<Long> blockIds = request.getBlocksList();
long[] blocks = new long[blockIds.size()]; long[] blocks = new long[blockIds.size()];
for (int i = 0; i < blockIds.size(); i++) { for (int i = 0; i < blockIds.size(); i++) {

View File

@ -665,6 +665,9 @@ public static BlockCommandProto convert(BlockCommand cmd) {
case DatanodeProtocol.DNA_INVALIDATE: case DatanodeProtocol.DNA_INVALIDATE:
builder.setAction(BlockCommandProto.Action.INVALIDATE); builder.setAction(BlockCommandProto.Action.INVALIDATE);
break; break;
case DatanodeProtocol.DNA_SHUTDOWN:
builder.setAction(BlockCommandProto.Action.SHUTDOWN);
break;
} }
Block[] blocks = cmd.getBlocks(); Block[] blocks = cmd.getBlocks();
for (int i = 0; i < blocks.length; i++) { for (int i = 0; i < blocks.length; i++) {
@ -685,6 +688,10 @@ private static List<DatanodeInfosProto> convert(DatanodeInfo[][] targets) {
public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) { public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder(); DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder();
if (datanodeCommand == null) {
return builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand)
.build();
}
switch (datanodeCommand.getAction()) { switch (datanodeCommand.getAction()) {
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE: case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
builder.setCmdType(DatanodeCommandProto.Type.BalancerBandwidthCommand) builder.setCmdType(DatanodeCommandProto.Type.BalancerBandwidthCommand)
@ -711,11 +718,18 @@ public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
break; break;
case DatanodeProtocol.DNA_TRANSFER: case DatanodeProtocol.DNA_TRANSFER:
case DatanodeProtocol.DNA_INVALIDATE: case DatanodeProtocol.DNA_INVALIDATE:
case DatanodeProtocol.DNA_SHUTDOWN:
builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd( builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
PBHelper.convert((BlockCommand) datanodeCommand)); PBHelper.convert((BlockCommand) datanodeCommand));
break; break;
case DatanodeProtocol.DNA_SHUTDOWN: //Not expected case DatanodeProtocol.DNA_UC_ACTION_REPORT_STATUS:
case DatanodeProtocol.DNA_UC_ACTION_START_UPGRADE:
builder.setCmdType(DatanodeCommandProto.Type.UpgradeCommand)
.setUpgradeCmd(PBHelper.convert((UpgradeCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_UNKNOWN: //Not expected case DatanodeProtocol.DNA_UNKNOWN: //Not expected
default:
builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
} }
return builder.build(); return builder.build();
} }
@ -754,13 +768,15 @@ public static BlockRecoveryCommand convert(
public static BlockCommand convert(BlockCommandProto blkCmd) { public static BlockCommand convert(BlockCommandProto blkCmd) {
List<BlockProto> blockProtoList = blkCmd.getBlocksList(); List<BlockProto> blockProtoList = blkCmd.getBlocksList();
List<DatanodeInfosProto> targetList = blkCmd.getTargetsList();
DatanodeInfo[][] targets = new DatanodeInfo[blockProtoList.size()][];
Block[] blocks = new Block[blockProtoList.size()]; Block[] blocks = new Block[blockProtoList.size()];
for (int i = 0; i < blockProtoList.size(); i++) { for (int i = 0; i < blockProtoList.size(); i++) {
targets[i] = PBHelper.convert(targetList.get(i));
blocks[i] = PBHelper.convert(blockProtoList.get(i)); blocks[i] = PBHelper.convert(blockProtoList.get(i));
} }
List<DatanodeInfosProto> targetList = blkCmd.getTargetsList();
DatanodeInfo[][] targets = new DatanodeInfo[targetList.size()][];
for (int i = 0; i < targetList.size(); i++) {
targets[i] = PBHelper.convert(targetList.get(i));
}
int action = DatanodeProtocol.DNA_UNKNOWN; int action = DatanodeProtocol.DNA_UNKNOWN;
switch (blkCmd.getAction()) { switch (blkCmd.getAction()) {
case TRANSFER: case TRANSFER:
@ -769,6 +785,9 @@ public static BlockCommand convert(BlockCommandProto blkCmd) {
case INVALIDATE: case INVALIDATE:
action = DatanodeProtocol.DNA_INVALIDATE; action = DatanodeProtocol.DNA_INVALIDATE;
break; break;
case SHUTDOWN:
action = DatanodeProtocol.DNA_SHUTDOWN;
break;
} }
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets); return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
} }
@ -800,9 +819,13 @@ public static ReceivedDeletedBlockInfoProto convert(
} }
public static UpgradeCommandProto convert(UpgradeCommand comm) { public static UpgradeCommandProto convert(UpgradeCommand comm) {
UpgradeCommandProto.Builder builder = UpgradeCommandProto.newBuilder() UpgradeCommandProto.Builder builder = UpgradeCommandProto.newBuilder();
.setVersion(comm.getVersion()) if (comm == null) {
.setUpgradeStatus(comm.getCurrentStatus()); return builder.setAction(UpgradeCommandProto.Action.UNKNOWN)
.setVersion(0).setUpgradeStatus(0).build();
}
builder.setVersion(comm.getVersion()).setUpgradeStatus(
comm.getCurrentStatus());
switch (comm.getAction()) { switch (comm.getAction()) {
case UpgradeCommand.UC_ACTION_REPORT_STATUS: case UpgradeCommand.UC_ACTION_REPORT_STATUS:
builder.setAction(UpgradeCommandProto.Action.REPORT_STATUS); builder.setAction(UpgradeCommandProto.Action.REPORT_STATUS);

View File

@ -36,6 +36,8 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -93,7 +95,7 @@ class BPOfferService implements Runnable {
boolean resetBlockReportTime = true; boolean resetBlockReportTime = true;
Thread bpThread; Thread bpThread;
DatanodeProtocol bpNamenode; DatanodeProtocolClientSideTranslatorPB bpNamenode;
private long lastHeartbeat = 0; private long lastHeartbeat = 0;
private volatile boolean initialized = false; private volatile boolean initialized = false;
private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList
@ -164,7 +166,7 @@ InetSocketAddress getNNSocketAddress() {
* Used to inject a spy NN in the unit tests. * Used to inject a spy NN in the unit tests.
*/ */
@VisibleForTesting @VisibleForTesting
void setNameNode(DatanodeProtocol dnProtocol) { void setNameNode(DatanodeProtocolClientSideTranslatorPB dnProtocol) {
bpNamenode = dnProtocol; bpNamenode = dnProtocol;
} }
@ -224,8 +226,8 @@ private void checkNNVersion(NamespaceInfo nsInfo)
private void connectToNNAndHandshake() throws IOException { private void connectToNNAndHandshake() throws IOException {
// get NN proxy // get NN proxy
bpNamenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class, bpNamenode = new DatanodeProtocolClientSideTranslatorPB(nnAddr,
DatanodeProtocol.versionID, nnAddr, dn.getConf()); dn.getConf());
// First phase of the handshake with NN - get the namespace // First phase of the handshake with NN - get the namespace
// info. // info.

View File

@ -111,6 +111,7 @@
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
@ -1277,7 +1278,7 @@ private void handleDiskError(String errMsgr) {
//inform NameNodes //inform NameNodes
for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) { for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
DatanodeProtocol nn = bpos.bpNamenode; DatanodeProtocolClientSideTranslatorPB nn = bpos.bpNamenode;
try { try {
nn.errorReport(bpos.bpRegistration, dpError, errMsgr); nn.errorReport(bpos.bpRegistration, dpError, errMsgr);
} catch(IOException e) { } catch(IOException e) {
@ -1310,7 +1311,8 @@ UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) {
private void transferBlock( ExtendedBlock block, private void transferBlock( ExtendedBlock block,
DatanodeInfo xferTargets[] DatanodeInfo xferTargets[]
) throws IOException { ) throws IOException {
DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId()); DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block
.getBlockPoolId());
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId()); DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
if (!data.isValidBlock(block)) { if (!data.isValidBlock(block)) {
@ -1978,7 +1980,8 @@ private void recoverBlock(RecoveringBlock rBlock) throws IOException {
* @return Namenode corresponding to the bpid * @return Namenode corresponding to the bpid
* @throws IOException * @throws IOException
*/ */
public DatanodeProtocol getBPNamenode(String bpid) throws IOException { public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid)
throws IOException {
BPOfferService bpos = blockPoolManager.get(bpid); BPOfferService bpos = blockPoolManager.get(bpid);
if(bpos == null || bpos.bpNamenode == null) { if(bpos == null || bpos.bpNamenode == null) {
throw new IOException("cannot find a namnode proxy for bpid=" + bpid); throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
@ -1990,7 +1993,8 @@ public DatanodeProtocol getBPNamenode(String bpid) throws IOException {
void syncBlock(RecoveringBlock rBlock, void syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException { List<BlockRecord> syncList) throws IOException {
ExtendedBlock block = rBlock.getBlock(); ExtendedBlock block = rBlock.getBlock();
DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId()); DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block
.getBlockPoolId());
long recoveryId = rBlock.getNewGenerationStamp(); long recoveryId = rBlock.getNewGenerationStamp();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -59,6 +59,9 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
@ -142,6 +145,11 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
clientProtocolServerTranslator = clientProtocolServerTranslator =
new ClientNamenodeProtocolServerSideTranslatorR23(this); new ClientNamenodeProtocolServerSideTranslatorR23(this);
DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
new DatanodeProtocolServerSideTranslatorPB(this);
BlockingService dnProtoPbService = DatanodeProtocolService
.newReflectiveBlockingService(dnProtoPbTranslator);
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
new NamenodeProtocolServerSideTranslatorPB(this); new NamenodeProtocolServerSideTranslatorPB(this);
BlockingService service = NamenodeProtocolService BlockingService service = NamenodeProtocolService
@ -159,8 +167,6 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
dnSocketAddr.getHostName(), dnSocketAddr.getPort(), dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
serviceHandlerCount, serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager()); false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
DatanodeProtocol.class, this);
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE, this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
RefreshAuthorizationPolicyProtocol.class, this); RefreshAuthorizationPolicyProtocol.class, this);
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE, this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
@ -169,6 +175,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
GetUserMappingsProtocol.class, this); GetUserMappingsProtocol.class, this);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service, DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
serviceRpcServer); serviceRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
serviceRpcServer);
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress(); this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
nn.setRpcServiceServerAddress(conf, serviceRPCAddress); nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
@ -183,8 +191,6 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
clientProtocolServerTranslator, socAddr.getHostName(), clientProtocolServerTranslator, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf, socAddr.getPort(), handlerCount, false, conf,
namesystem.getDelegationTokenSecretManager()); namesystem.getDelegationTokenSecretManager());
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
DatanodeProtocol.class, this);
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
RefreshAuthorizationPolicyProtocol.class, this); RefreshAuthorizationPolicyProtocol.class, this);
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE, this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
@ -193,7 +199,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
GetUserMappingsProtocol.class, this); GetUserMappingsProtocol.class, this);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service, DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
clientRpcServer); clientRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
clientRpcServer);
// set service-level authorization security policy // set service-level authorization security policy
if (serviceAuthEnabled = if (serviceAuthEnabled =

View File

@ -76,6 +76,8 @@ public interface DatanodeProtocol extends VersionedProtocol {
final static int DNA_RECOVERBLOCK = 6; // request a block recovery final static int DNA_RECOVERBLOCK = 6; // request a block recovery
final static int DNA_ACCESSKEYUPDATE = 7; // update access key final static int DNA_ACCESSKEYUPDATE = 7; // update access key
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
final static int DNA_UC_ACTION_REPORT_STATUS = 100; // Report upgrade status
final static int DNA_UC_ACTION_START_UPGRADE = 101; // start upgrade
/** /**
* Register Datanode. * Register Datanode.

View File

@ -41,8 +41,10 @@
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class UpgradeCommand extends DatanodeCommand { public class UpgradeCommand extends DatanodeCommand {
public final static int UC_ACTION_UNKNOWN = DatanodeProtocol.DNA_UNKNOWN; public final static int UC_ACTION_UNKNOWN = DatanodeProtocol.DNA_UNKNOWN;
public final static int UC_ACTION_REPORT_STATUS = 100; // report upgrade status public final static int UC_ACTION_REPORT_STATUS =
public final static int UC_ACTION_START_UPGRADE = 101; // start upgrade DatanodeProtocol.DNA_UC_ACTION_REPORT_STATUS;
public final static int UC_ACTION_START_UPGRADE =
DatanodeProtocol.DNA_UC_ACTION_START_UPGRADE;
private int version; private int version;
private short upgradeStatus; private short upgradeStatus;

View File

@ -47,6 +47,7 @@ message DatanodeCommandProto {
KeyUpdateCommand = 4; KeyUpdateCommand = 4;
RegisterCommand = 5; RegisterCommand = 5;
UpgradeCommand = 6; UpgradeCommand = 6;
NullDatanodeCommand = 7;
} }
required Type cmdType = 1; // Type of the command required Type cmdType = 1; // Type of the command
@ -80,6 +81,7 @@ message BlockCommandProto {
enum Action { enum Action {
TRANSFER = 1; // Transfer blocks to another datanode TRANSFER = 1; // Transfer blocks to another datanode
INVALIDATE = 2; // Invalidate blocks INVALIDATE = 2; // Invalidate blocks
SHUTDOWN = 3; // Shutdown the datanode
} }
required Action action = 1; required Action action = 1;
required string blockPoolId = 2; required string blockPoolId = 2;
@ -190,7 +192,7 @@ message BlockReportRequestProto {
* cmd - Command from namenode to the datanode * cmd - Command from namenode to the datanode
*/ */
message BlockReportResponseProto { message BlockReportResponseProto {
required DatanodeCommandProto cmd = 1; optional DatanodeCommandProto cmd = 1;
} }
/** /**

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
@ -518,7 +519,7 @@ private void initMiniDFSCluster(int nameNodePort, int nameNodeHttpPort,
setRpcEngine(conf, ClientDatanodeProtocolPB.class, rpcEngine); setRpcEngine(conf, ClientDatanodeProtocolPB.class, rpcEngine);
setRpcEngine(conf, NamenodeProtocolPB.class, rpcEngine); setRpcEngine(conf, NamenodeProtocolPB.class, rpcEngine);
setRpcEngine(conf, ClientProtocol.class, rpcEngine); setRpcEngine(conf, ClientProtocol.class, rpcEngine);
setRpcEngine(conf, DatanodeProtocol.class, rpcEngine); setRpcEngine(conf, DatanodeProtocolPB.class, rpcEngine);
setRpcEngine(conf, RefreshAuthorizationPolicyProtocol.class, rpcEngine); setRpcEngine(conf, RefreshAuthorizationPolicyProtocol.class, rpcEngine);
setRpcEngine(conf, RefreshUserMappingsProtocol.class, rpcEngine); setRpcEngine(conf, RefreshUserMappingsProtocol.class, rpcEngine);
setRpcEngine(conf, GetUserMappingsProtocol.class, rpcEngine); setRpcEngine(conf, GetUserMappingsProtocol.class, rpcEngine);

View File

@ -23,8 +23,8 @@
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -46,7 +46,8 @@ public void testDataNodeRegister() throws Exception {
NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class); NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class);
when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion"); when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion");
DatanodeProtocol fakeDNProt = mock(DatanodeProtocol.class); DatanodeProtocolClientSideTranslatorPB fakeDNProt =
mock(DatanodeProtocolClientSideTranslatorPB.class);
when(fakeDNProt.versionRequest()).thenReturn(fakeNSInfo); when(fakeDNProt.versionRequest()).thenReturn(fakeNSInfo);
bpos.setNameNode( fakeDNProt ); bpos.setNameNode( fakeDNProt );