HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple storages. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1241519 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-02-07 16:59:48 +00:00
parent aaace5e84e
commit 28eadb7cd7
15 changed files with 433 additions and 139 deletions

View File

@ -45,9 +45,12 @@ Trunk (unreleased changes)
HDFS-2697. Move RefreshAuthPolicy, RefreshUserMappings, GetUserMappings HDFS-2697. Move RefreshAuthPolicy, RefreshUserMappings, GetUserMappings
protocol to protocol buffers. (jitendra) protocol to protocol buffers. (jitendra)
HDFS-2880. Protobuf chagnes in DatanodeProtocol to add multiple storages. HDFS-2880. Protobuf changes in DatanodeProtocol to add multiple storages.
(suresh) (suresh)
HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple
storages. (suresh)
IMPROVEMENTS IMPROVEMENTS
HADOOP-7524 Change RPC to allow multiple protocols including multuple HADOOP-7524 Change RPC to allow multiple protocols including multuple

View File

@ -48,15 +48,18 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
@ -153,13 +156,17 @@ public void close() throws IOException {
} }
@Override @Override
public DatanodeRegistration registerDatanode(DatanodeRegistration registration) public DatanodeRegistration registerDatanode(DatanodeRegistration registration,
throws IOException { DatanodeStorage[] storages) throws IOException {
RegisterDatanodeRequestProto req = RegisterDatanodeRequestProto RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration)).build(); .newBuilder().setRegistration(PBHelper.convert(registration));
for (DatanodeStorage s : storages) {
builder.addStorages(PBHelper.convert(s));
}
RegisterDatanodeResponseProto resp; RegisterDatanodeResponseProto resp;
try { try {
resp = rpcProxy.registerDatanode(NULL_CONTROLLER, req); resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se); throw ProtobufHelper.getRemoteException(se);
} }
@ -168,22 +175,19 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration)
@Override @Override
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration, public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
long capacity, long dfsUsed, long remaining, long blockPoolUsed, StorageReport[] reports, int xmitsInProgress, int xceiverCount,
int xmitsInProgress, int xceiverCount, int failedVolumes) int failedVolumes) throws IOException {
throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
StorageReportProto report = StorageReportProto.newBuilder() .setRegistration(PBHelper.convert(registration))
.setBlockPoolUsed(blockPoolUsed).setCapacity(capacity)
.setDfsUsed(dfsUsed).setRemaining(remaining)
.setStorageID(registration.getStorageID()).build();
HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)).addReports(report)
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
.setFailedVolumes(failedVolumes) .setFailedVolumes(failedVolumes);
.build(); for (StorageReport r : reports) {
builder.addReports(PBHelper.convert(r));
}
HeartbeatResponseProto resp; HeartbeatResponseProto resp;
try { try {
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, req); resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se); throw ProtobufHelper.getRemoteException(se);
} }
@ -198,21 +202,23 @@ public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
@Override @Override
public DatanodeCommand blockReport(DatanodeRegistration registration, public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, long[] blocks) throws IOException { String poolId, StorageBlockReport[] reports) throws IOException {
StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto BlockReportRequestProto.Builder builder = BlockReportRequestProto
.newBuilder().setStorageID(registration.getStorageID()); .newBuilder().setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId);
if (blocks != null) { for (StorageBlockReport r : reports) {
StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
.newBuilder().setStorageID(r.getStorageID());
long[] blocks = r.getBlocks();
for (int i = 0; i < blocks.length; i++) { for (int i = 0; i < blocks.length; i++) {
reportBuilder.addBlocks(blocks[i]); reportBuilder.addBlocks(blocks[i]);
} }
builder.addReports(reportBuilder.build());
} }
BlockReportRequestProto req = BlockReportRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId).addReports(reportBuilder.build()).build();
BlockReportResponseProto resp; BlockReportResponseProto resp;
try { try {
resp = rpcProxy.blockReport(NULL_CONTROLLER, req); resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se); throw ProtobufHelper.getRemoteException(se);
} }
@ -220,23 +226,24 @@ public DatanodeCommand blockReport(DatanodeRegistration registration,
} }
@Override @Override
public void blockReceivedAndDeleted(DatanodeRegistration reg, public void blockReceivedAndDeleted(DatanodeRegistration registration,
String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
throws IOException { throws IOException {
StorageReceivedDeletedBlocksProto.Builder builder = BlockReceivedAndDeletedRequestProto.Builder builder =
StorageReceivedDeletedBlocksProto.newBuilder()
.setStorageID(reg.getStorageID());
if (receivedAndDeletedBlocks != null) {
for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
builder.addBlocks(PBHelper.convert(receivedAndDeletedBlocks[i]));
}
}
BlockReceivedAndDeletedRequestProto req =
BlockReceivedAndDeletedRequestProto.newBuilder() BlockReceivedAndDeletedRequestProto.newBuilder()
.setRegistration(PBHelper.convert(reg)) .setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId).addBlocks(builder.build()).build(); .setBlockPoolId(poolId);
for (StorageReceivedDeletedBlocks storageBlock : receivedAndDeletedBlocks) {
StorageReceivedDeletedBlocksProto.Builder repBuilder =
StorageReceivedDeletedBlocksProto.newBuilder();
repBuilder.setStorageID(storageBlock.getStorageID());
for (ReceivedDeletedBlockInfo rdBlock : storageBlock.getBlocks()) {
repBuilder.addBlocks(PBHelper.convert(rdBlock));
}
builder.addBlocks(repBuilder.build());
}
try { try {
rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, req); rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se); throw ProtobufHelper.getRemoteException(se);
} }

View File

@ -40,6 +40,8 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
@ -49,8 +51,12 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
@ -84,8 +90,12 @@ public RegisterDatanodeResponseProto registerDatanode(
DatanodeRegistration registration = PBHelper.convert(request DatanodeRegistration registration = PBHelper.convert(request
.getRegistration()); .getRegistration());
DatanodeRegistration registrationResp; DatanodeRegistration registrationResp;
DatanodeStorage[] storages = new DatanodeStorage[request.getStoragesCount()];
for (int i = 0; i < request.getStoragesCount(); i++) {
storages[i] = PBHelper.convert(request.getStorages(i));
}
try { try {
registrationResp = impl.registerDatanode(registration); registrationResp = impl.registerDatanode(registration, storages);
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
@ -98,11 +108,17 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
HeartbeatRequestProto request) throws ServiceException { HeartbeatRequestProto request) throws ServiceException {
DatanodeCommand[] cmds = null; DatanodeCommand[] cmds = null;
try { try {
StorageReportProto report = request.getReports(0); List<StorageReportProto> list = request.getReportsList();
StorageReport[] report = new StorageReport[list.size()];
int i = 0;
for (StorageReportProto p : list) {
report[i++] = new StorageReport(p.getStorageID(), p.getFailed(),
p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
p.getBlockPoolUsed());
}
cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
report.getCapacity(), report.getDfsUsed(), report.getRemaining(), report, request.getXmitsInProgress(), request.getXceiverCount(),
report.getBlockPoolUsed(), request.getXmitsInProgress(), request.getFailedVolumes());
request.getXceiverCount(), request.getFailedVolumes());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
@ -122,14 +138,21 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
public BlockReportResponseProto blockReport(RpcController controller, public BlockReportResponseProto blockReport(RpcController controller,
BlockReportRequestProto request) throws ServiceException { BlockReportRequestProto request) throws ServiceException {
DatanodeCommand cmd = null; DatanodeCommand cmd = null;
List<Long> blockIds = request.getReports(0).getBlocksList(); StorageBlockReport[] report =
long[] blocks = new long[blockIds.size()]; new StorageBlockReport[request.getReportsCount()];
for (int i = 0; i < blockIds.size(); i++) {
blocks[i] = blockIds.get(i); int index = 0;
for (StorageBlockReportProto s : request.getReportsList()) {
List<Long> blockIds = s.getBlocksList();
long[] blocks = new long[blockIds.size()];
for (int i = 0; i < blockIds.size(); i++) {
blocks[i] = blockIds.get(i);
}
report[index++] = new StorageBlockReport(s.getStorageID(), blocks);
} }
try { try {
cmd = impl.blockReport(PBHelper.convert(request.getRegistration()), cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
request.getBlockPoolId(), blocks); request.getBlockPoolId(), report);
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
@ -145,12 +168,18 @@ public BlockReportResponseProto blockReport(RpcController controller,
public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted( public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
RpcController controller, BlockReceivedAndDeletedRequestProto request) RpcController controller, BlockReceivedAndDeletedRequestProto request)
throws ServiceException { throws ServiceException {
List<ReceivedDeletedBlockInfoProto> rdbip = request.getBlocks(0) List<StorageReceivedDeletedBlocksProto> sBlocks = request.getBlocksList();
.getBlocksList(); StorageReceivedDeletedBlocks[] info =
ReceivedDeletedBlockInfo[] info = new StorageReceivedDeletedBlocks[sBlocks.size()];
new ReceivedDeletedBlockInfo[rdbip.size()]; for (int i = 0; i < sBlocks.size(); i++) {
for (int i = 0; i < rdbip.size(); i++) { StorageReceivedDeletedBlocksProto sBlock = sBlocks.get(i);
info[i] = PBHelper.convert(rdbip.get(i)); List<ReceivedDeletedBlockInfoProto> list = sBlock.getBlocksList();
ReceivedDeletedBlockInfo[] rdBlocks =
new ReceivedDeletedBlockInfo[list.size()];
for (int j = 0; j < list.size(); j++) {
rdBlocks[j] = PBHelper.convert(list.get(j));
}
info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageID(), rdBlocks);
} }
try { try {
impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()), impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()),

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@ -52,10 +53,13 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto.StorageState;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@ -108,6 +112,8 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@ -118,6 +124,7 @@
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -1236,4 +1243,41 @@ public static ContentSummaryProto convert(ContentSummary cs) {
setSpaceQuota(cs.getSpaceQuota()). setSpaceQuota(cs.getSpaceQuota()).
build(); build();
} }
public static DatanodeStorageProto convert(DatanodeStorage s) {
return DatanodeStorageProto.newBuilder()
.setState(PBHelper.convert(s.getState()))
.setStorageID(s.getStorageID()).build();
}
private static StorageState convert(State state) {
switch(state) {
case READ_ONLY:
return StorageState.READ_ONLY;
case NORMAL:
default:
return StorageState.NORMAL;
}
}
public static DatanodeStorage convert(DatanodeStorageProto s) {
return new DatanodeStorage(s.getStorageID(), PBHelper.convert(s.getState()));
}
private static State convert(StorageState state) {
switch(state) {
case READ_ONLY:
return DatanodeStorage.State.READ_ONLY;
case NORMAL:
default:
return DatanodeStorage.State.NORMAL;
}
}
public static StorageReportProto convert(StorageReport r) {
return StorageReportProto.newBuilder()
.setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
.setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
.setStorageID(r.getStorageID()).build();
}
} }

View File

@ -37,7 +37,6 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -47,14 +46,17 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -295,8 +297,10 @@ private void reportReceivedDeletedBlocks() throws IOException {
} }
} }
if (receivedAndDeletedBlockArray != null) { if (receivedAndDeletedBlockArray != null) {
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
bpRegistration.getStorageID(), receivedAndDeletedBlockArray) };
bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(), bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(),
receivedAndDeletedBlockArray); report);
synchronized (receivedAndDeletedBlockList) { synchronized (receivedAndDeletedBlockList) {
for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) { for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]); receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
@ -365,8 +369,9 @@ DatanodeCommand blockReport() throws IOException {
// Send block report // Send block report
long brSendStartTime = now(); long brSendStartTime = now();
cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport StorageBlockReport[] report = { new StorageBlockReport(
.getBlockListAsLongs()); bpRegistration.getStorageID(), bReport.getBlockListAsLongs()) };
cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), report);
// Log the block report processing stats from Datanode perspective // Log the block report processing stats from Datanode perspective
long brSendCost = now() - brSendStartTime; long brSendCost = now() - brSendStartTime;
@ -398,11 +403,11 @@ DatanodeCommand blockReport() throws IOException {
DatanodeCommand [] sendHeartBeat() throws IOException { DatanodeCommand [] sendHeartBeat() throws IOException {
return bpNamenode.sendHeartbeat(bpRegistration, // reports number of failed volumes
dn.data.getCapacity(), StorageReport[] report = { new StorageReport(bpRegistration.getStorageID(),
dn.data.getDfsUsed(), false, dn.data.getCapacity(), dn.data.getDfsUsed(),
dn.data.getRemaining(), dn.data.getRemaining(), dn.data.getBlockPoolUsed(getBlockPoolId())) };
dn.data.getBlockPoolUsed(getBlockPoolId()), return bpNamenode.sendHeartbeat(bpRegistration, report,
dn.xmitsInProgress.get(), dn.xmitsInProgress.get(),
dn.getXceiverCount(), dn.data.getNumFailedVolumes()); dn.getXceiverCount(), dn.data.getNumFailedVolumes());
} }
@ -572,7 +577,8 @@ void register() throws IOException {
while (shouldRun()) { while (shouldRun()) {
try { try {
// Use returned registration from namenode with updated machine name. // Use returned registration from namenode with updated machine name.
bpRegistration = bpNamenode.registerDatanode(bpRegistration); bpRegistration = bpNamenode.registerDatanode(bpRegistration,
new DatanodeStorage[0]);
break; break;
} catch(SocketTimeoutException e) { // namenode is busy } catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr); LOG.info("Problem connecting to server: " + nnAddr);

View File

@ -87,6 +87,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@ -94,8 +95,10 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -841,8 +844,8 @@ public String getLinkTarget(String path) throws IOException {
@Override // DatanodeProtocol @Override // DatanodeProtocol
public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg) public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg,
throws IOException { DatanodeStorage[] storages) throws IOException {
verifyVersion(nodeReg.getVersion()); verifyVersion(nodeReg.getVersion());
namesystem.registerDatanode(nodeReg); namesystem.registerDatanode(nodeReg);
@ -851,19 +854,20 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
@Override // DatanodeProtocol @Override // DatanodeProtocol
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg, public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining, long blockPoolUsed, StorageReport[] report, int xmitsInProgress, int xceiverCount,
int xmitsInProgress, int xceiverCount, int failedVolumes) int failedVolumes) throws IOException {
throws IOException {
verifyRequest(nodeReg); verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining, return namesystem.handleHeartbeat(nodeReg, report[0].getCapacity(),
blockPoolUsed, xceiverCount, xmitsInProgress, failedVolumes); report[0].getDfsUsed(), report[0].getRemaining(),
report[0].getBlockPoolUsed(), xceiverCount, xmitsInProgress,
failedVolumes);
} }
@Override // DatanodeProtocol @Override // DatanodeProtocol
public DatanodeCommand blockReport(DatanodeRegistration nodeReg, public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
String poolId, long[] blocks) throws IOException { String poolId, StorageBlockReport[] reports) throws IOException {
verifyRequest(nodeReg); verifyRequest(nodeReg);
BlockListAsLongs blist = new BlockListAsLongs(blocks); BlockListAsLongs blist = new BlockListAsLongs(reports[0].getBlocks());
if(stateChangeLog.isDebugEnabled()) { if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.blockReport: " stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+ "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks() + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
@ -878,7 +882,7 @@ public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
@Override // DatanodeProtocol @Override // DatanodeProtocol
public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException { StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
verifyRequest(nodeReg); verifyRequest(nodeReg);
if(stateChangeLog.isDebugEnabled()) { if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
@ -886,7 +890,7 @@ public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
+" blocks."); +" blocks.");
} }
namesystem.getBlockManager().blockReceivedAndDeleted( namesystem.getBlockManager().blockReceivedAndDeleted(
nodeReg, poolId, receivedAndDeletedBlocks); nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks());
} }
@Override // DatanodeProtocol @Override // DatanodeProtocol

View File

@ -80,13 +80,16 @@ public interface DatanodeProtocol extends VersionedProtocol {
* Register Datanode. * Register Datanode.
* *
* @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem#registerDatanode(DatanodeRegistration) * @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem#registerDatanode(DatanodeRegistration)
* * @param registration datanode registration information
* @param storages list of storages on the datanode``
* @return updated {@link org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration}, which contains * @return updated {@link org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration}, which contains
* new storageID if the datanode did not have one and * new storageID if the datanode did not have one and
* registration ID for further communication. * registration ID for further communication.
*/ */
public DatanodeRegistration registerDatanode(DatanodeRegistration registration public DatanodeRegistration registerDatanode(
) throws IOException; DatanodeRegistration registration, DatanodeStorage[] storages)
throws IOException;
/** /**
* sendHeartbeat() tells the NameNode that the DataNode is still * sendHeartbeat() tells the NameNode that the DataNode is still
* alive and well. Includes some status info, too. * alive and well. Includes some status info, too.
@ -95,19 +98,14 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration
* A DatanodeCommand tells the DataNode to invalidate local block(s), * A DatanodeCommand tells the DataNode to invalidate local block(s),
* or to copy them to other DataNodes, etc. * or to copy them to other DataNodes, etc.
* @param registration datanode registration information * @param registration datanode registration information
* @param capacity total storage capacity available at the datanode * @param reports utilization report per storage
* @param dfsUsed storage used by HDFS
* @param remaining remaining storage available for HDFS
* @param blockPoolUsed storage used by the block pool
* @param xmitsInProgress number of transfers from this datanode to others * @param xmitsInProgress number of transfers from this datanode to others
* @param xceiverCount number of active transceiver threads * @param xceiverCount number of active transceiver threads
* @param failedVolumes number of failed volumes * @param failedVolumes number of failed volumes
* @throws IOException on error * @throws IOException on error
*/ */
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration, public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
long capacity, StorageReport[] reports,
long dfsUsed, long remaining,
long blockPoolUsed,
int xmitsInProgress, int xmitsInProgress,
int xceiverCount, int xceiverCount,
int failedVolumes) throws IOException; int failedVolumes) throws IOException;
@ -120,7 +118,7 @@ public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
* infrequently afterwards. * infrequently afterwards.
* @param registration * @param registration
* @param poolId - the block pool ID for the blocks * @param poolId - the block pool ID for the blocks
* @param blocks - the block list as an array of longs. * @param reports - report of blocks per storage
* Each block is represented as 2 longs. * Each block is represented as 2 longs.
* This is done instead of Block[] to reduce memory used by block reports. * This is done instead of Block[] to reduce memory used by block reports.
* *
@ -128,8 +126,7 @@ public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
* @throws IOException * @throws IOException
*/ */
public DatanodeCommand blockReport(DatanodeRegistration registration, public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, String poolId, StorageBlockReport[] reports) throws IOException;
long[] blocks) throws IOException;
/** /**
* blockReceivedAndDeleted() allows the DataNode to tell the NameNode about * blockReceivedAndDeleted() allows the DataNode to tell the NameNode about
@ -143,7 +140,7 @@ public DatanodeCommand blockReport(DatanodeRegistration registration,
*/ */
public void blockReceivedAndDeleted(DatanodeRegistration registration, public void blockReceivedAndDeleted(DatanodeRegistration registration,
String poolId, String poolId,
ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) StorageReceivedDeletedBlocks[] rcvdAndDeletedBlocks)
throws IOException; throws IOException;
/** /**

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
/**
* Class capatures information about a storage in Datanode
*/
public class DatanodeStorage {
public enum State {
NORMAL,
READ_ONLY
}
private final String storageID;
private final State state;
public DatanodeStorage(String sid, State s) {
storageID = sid;
state = s;
}
public String getStorageID() {
return storageID;
}
public State getState() {
return state;
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
/**
* Block report for a Datanode storage
*/
public class StorageBlockReport {
private final String storageID;
private final long[] blocks;
public StorageBlockReport(String sid, long[] blocks) {
this.storageID = sid;
this.blocks = blocks;
}
public String getStorageID() {
return storageID;
}
public long[] getBlocks() {
return blocks;
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
/**
* Report of block received and deleted per Datanode
* storage.
*/
public class StorageReceivedDeletedBlocks {
private final String storageID;
private final ReceivedDeletedBlockInfo[] blocks;
public String getStorageID() {
return storageID;
}
public ReceivedDeletedBlockInfo[] getBlocks() {
return blocks;
}
public StorageReceivedDeletedBlocks(final String storageID,
final ReceivedDeletedBlockInfo[] blocks) {
this.storageID = storageID;
this.blocks = blocks;
}
}

View File

@ -0,0 +1,47 @@
package org.apache.hadoop.hdfs.server.protocol;
/**
* Utilization report for a Datanode storage
*/
public class StorageReport {
private final String storageID;
private final boolean failed;
private final long capacity;
private final long dfsUsed;
private final long remaining;
private final long blockPoolUsed;
public StorageReport(String sid, boolean failed, long capacity, long dfsUsed,
long remaining, long bpUsed) {
this.storageID = sid;
this.failed = failed;
this.capacity = capacity;
this.dfsUsed = dfsUsed;
this.remaining = remaining;
this.blockPoolUsed = bpUsed;
}
public String getStorageID() {
return storageID;
}
public boolean isFailed() {
return failed;
}
public long getCapacity() {
return capacity;
}
public long getDfsUsed() {
return dfsUsed;
}
public long getRemaining() {
return remaining;
}
public long getBlockPoolUsed() {
return blockPoolUsed;
}
}

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -146,8 +147,9 @@ public void blockReport_01() throws IOException {
DataNode dn = cluster.getDataNodes().get(DN_N0); DataNode dn = cluster.getDataNodes().get(DN_N0);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
cluster.getNameNodeRpc().blockReport(dnR, poolId, StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
new BlockListAsLongs(blocks, null).getBlockListAsLongs()); new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
List<LocatedBlock> blocksAfterReport = List<LocatedBlock> blocksAfterReport =
DFSTestUtil.getAllBlocks(fs.open(filePath)); DFSTestUtil.getAllBlocks(fs.open(filePath));
@ -180,7 +182,7 @@ public void blockReport_02() throws IOException {
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
DFSTestUtil.createFile(fs, filePath, DFSTestUtil.createFile(fs, filePath,
(long) FILE_SIZE, REPL_FACTOR, rand.nextLong()); FILE_SIZE, REPL_FACTOR, rand.nextLong());
// mock around with newly created blocks and delete some // mock around with newly created blocks and delete some
File dataDir = new File(cluster.getDataDirectory()); File dataDir = new File(cluster.getDataDirectory());
@ -226,8 +228,9 @@ public void blockReport_02() throws IOException {
DataNode dn = cluster.getDataNodes().get(DN_N0); DataNode dn = cluster.getDataNodes().get(DN_N0);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
cluster.getNameNodeRpc().blockReport(dnR, poolId, StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
new BlockListAsLongs(blocks, null).getBlockListAsLongs()); new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem() BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
.getBlockManager()); .getBlockManager());
@ -266,9 +269,10 @@ public void blockReport_03() throws IOException {
DataNode dn = cluster.getDataNodes().get(DN_N0); DataNode dn = cluster.getDataNodes().get(DN_N0);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
DatanodeCommand dnCmd = DatanodeCommand dnCmd =
cluster.getNameNodeRpc().blockReport(dnR, poolId, cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Got the command: " + dnCmd); LOG.debug("Got the command: " + dnCmd);
} }
@ -284,9 +288,8 @@ public void blockReport_03() throws IOException {
* This test isn't a representative case for BlockReport * This test isn't a representative case for BlockReport
* The empty method is going to be left here to keep the naming * The empty method is going to be left here to keep the naming
* of the test plan in synch with the actual implementation * of the test plan in synch with the actual implementation
* @throws IOException in case of errors
*/ */
public void blockReport_04() throws IOException { public void blockReport_04() {
} }
// Client requests new block from NN. The test corrupts this very block // Client requests new block from NN. The test corrupts this very block
@ -295,7 +298,7 @@ public void blockReport_04() throws IOException {
// BlockScanner which is out of scope of this test // BlockScanner which is out of scope of this test
// Keeping the name to be in synch with the test plan // Keeping the name to be in synch with the test plan
// //
public void blockReport_05() throws IOException { public void blockReport_05() {
} }
/** /**
@ -319,8 +322,9 @@ public void blockReport_06() throws IOException {
DataNode dn = cluster.getDataNodes().get(DN_N1); DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
cluster.getNameNodeRpc().blockReport(dnR, poolId, StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
new BlockListAsLongs(blocks, null).getBlockListAsLongs()); new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats(); printStats();
assertEquals("Wrong number of PendingReplication Blocks", assertEquals("Wrong number of PendingReplication Blocks",
0, cluster.getNamesystem().getUnderReplicatedBlocks()); 0, cluster.getNamesystem().getUnderReplicatedBlocks());
@ -368,8 +372,9 @@ public void blockReport_07() throws IOException {
DataNode dn = cluster.getDataNodes().get(DN_N1); DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
cluster.getNameNodeRpc().blockReport(dnR, poolId, StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
new BlockListAsLongs(blocks, null).getBlockListAsLongs()); new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats(); printStats();
assertEquals("Wrong number of Corrupted blocks", assertEquals("Wrong number of Corrupted blocks",
1, cluster.getNamesystem().getCorruptReplicaBlocks() + 1, cluster.getNamesystem().getCorruptReplicaBlocks() +
@ -390,8 +395,9 @@ public void blockReport_07() throws IOException {
LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName()); LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName());
} }
cluster.getNameNodeRpc().blockReport(dnR, poolId, report[0] = new StorageBlockReport(dnR.getStorageID(),
new BlockListAsLongs(blocks, null).getBlockListAsLongs()); new BlockListAsLongs(blocks, null).getBlockListAsLongs());
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats(); printStats();
assertEquals("Wrong number of Corrupted blocks", assertEquals("Wrong number of Corrupted blocks",
@ -440,8 +446,9 @@ public void blockReport_08() throws IOException {
DataNode dn = cluster.getDataNodes().get(DN_N1); DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
cluster.getNameNodeRpc().blockReport(dnR, poolId, StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
new BlockListAsLongs(blocks, null).getBlockListAsLongs()); new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats(); printStats();
assertEquals("Wrong number of PendingReplication blocks", assertEquals("Wrong number of PendingReplication blocks",
blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks()); blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
@ -486,8 +493,9 @@ public void blockReport_09() throws IOException {
DataNode dn = cluster.getDataNodes().get(DN_N1); DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
cluster.getNameNodeRpc().blockReport(dnR, poolId, StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
new BlockListAsLongs(blocks, null).getBlockListAsLongs()); new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats(); printStats();
assertEquals("Wrong number of PendingReplication blocks", assertEquals("Wrong number of PendingReplication blocks",
2, cluster.getNamesystem().getPendingReplicationBlocks()); 2, cluster.getNamesystem().getPendingReplicationBlocks());
@ -550,7 +558,7 @@ protected Object passThrough(InvocationOnMock invocation)
.when(spy).blockReport( .when(spy).blockReport(
Mockito.<DatanodeRegistration>anyObject(), Mockito.<DatanodeRegistration>anyObject(),
Mockito.anyString(), Mockito.anyString(),
Mockito.<long[]>anyObject()); Mockito.<StorageBlockReport[]>anyObject());
// Force a block report to be generated. The block report will have // Force a block report to be generated. The block report will have
// an RBW replica in it. Wait for the RPC to be sent, but block // an RBW replica in it. Wait for the RPC to be sent, but block
@ -638,8 +646,7 @@ private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
// Write file and start second data node. // Write file and start second data node.
private ArrayList<Block> writeFile(final String METHOD_NAME, private ArrayList<Block> writeFile(final String METHOD_NAME,
final long fileSize, final long fileSize,
Path filePath) Path filePath) {
throws IOException {
ArrayList<Block> blocks = null; ArrayList<Block> blocks = null;
try { try {
REPL_FACTOR = 2; REPL_FACTOR = 2;

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -144,8 +145,9 @@ public void testVolumeFailure() throws IOException {
DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3 DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
String bpid = cluster.getNamesystem().getBlockPoolId(); String bpid = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid); DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs(); StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
cluster.getNameNodeRpc().blockReport(dnR, bpid, bReport); dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs()) };
cluster.getNameNodeRpc().blockReport(dnR, bpid, report);
// verify number of blocks and files... // verify number of blocks and files...
verify(filename, filesize); verify(filename, filesize);

View File

@ -25,8 +25,6 @@
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import javax.security.auth.login.LoginException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
@ -46,9 +44,13 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
@ -104,7 +106,7 @@ public class NNThroughputBenchmark {
static NameNode nameNode; static NameNode nameNode;
static NamenodeProtocols nameNodeProto; static NamenodeProtocols nameNodeProto;
NNThroughputBenchmark(Configuration conf) throws IOException, LoginException { NNThroughputBenchmark(Configuration conf) throws IOException {
config = conf; config = conf;
// We do not need many handlers, since each thread simulates a handler // We do not need many handlers, since each thread simulates a handler
// by calling name-node methods directly // by calling name-node methods directly
@ -125,7 +127,7 @@ public class NNThroughputBenchmark {
nameNodeProto = nameNode.getRpcServer(); nameNodeProto = nameNode.getRpcServer();
} }
void close() throws IOException { void close() {
nameNode.stop(); nameNode.stop();
} }
@ -795,7 +797,10 @@ void register() throws IOException {
dnRegistration.setStorageInfo(new DataStorage(nsInfo, "")); dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
DataNode.setNewStorageID(dnRegistration); DataNode.setNewStorageID(dnRegistration);
// register datanode // register datanode
dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
DatanodeStorage[] storages = { new DatanodeStorage(
dnRegistration.getStorageID(), DatanodeStorage.State.NORMAL) };
dnRegistration = nameNodeProto.registerDatanode(dnRegistration, storages);
} }
/** /**
@ -805,8 +810,10 @@ void register() throws IOException {
void sendHeartbeat() throws IOException { void sendHeartbeat() throws IOException {
// register datanode // register datanode
// TODO:FEDERATION currently a single block pool is supported // TODO:FEDERATION currently a single block pool is supported
StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(),
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0); rep, 0, 0, 0);
if(cmds != null) { if(cmds != null) {
for (DatanodeCommand cmd : cmds ) { for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
@ -849,9 +856,10 @@ public int compareTo(String name) {
@SuppressWarnings("unused") // keep it for future blockReceived benchmark @SuppressWarnings("unused") // keep it for future blockReceived benchmark
int replicateBlocks() throws IOException { int replicateBlocks() throws IOException {
// register datanode // register datanode
// TODO:FEDERATION currently a single block pool is supported StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(),
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0); rep, 0, 0, 0);
if (cmds != null) { if (cmds != null) {
for (DatanodeCommand cmd : cmds) { for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
@ -881,10 +889,12 @@ private int transferBlocks( Block blocks[],
receivedDNReg.setStorageInfo( receivedDNReg.setStorageInfo(
new DataStorage(nsInfo, dnInfo.getStorageID())); new DataStorage(nsInfo, dnInfo.getStorageID()));
receivedDNReg.setInfoPort(dnInfo.getInfoPort()); receivedDNReg.setInfoPort(dnInfo.getInfoPort());
ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo(
blocks[i], DataNode.EMPTY_DEL_HINT) };
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
receivedDNReg.getStorageID(), rdBlocks) };
nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
.getNamesystem().getBlockPoolId(), .getNamesystem().getBlockPoolId(), report);
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
blocks[i], DataNode.EMPTY_DEL_HINT) });
} }
} }
return blocks.length; return blocks.length;
@ -916,7 +926,7 @@ class BlockReportStats extends OperationStatsBase {
config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3 * 60); config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3 * 60);
parseArguments(args); parseArguments(args);
// adjust replication to the number of data-nodes // adjust replication to the number of data-nodes
this.replication = (short)Math.min((int)replication, getNumDatanodes()); this.replication = (short)Math.min(replication, getNumDatanodes());
} }
/** /**
@ -996,10 +1006,12 @@ private ExtendedBlock addBlocks(String fileName, String clientName)
for(DatanodeInfo dnInfo : loc.getLocations()) { for(DatanodeInfo dnInfo : loc.getLocations()) {
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName()); int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock()); datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock());
ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo(
loc.getBlock().getLocalBlock(), "") };
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
datanodes[dnIdx].dnRegistration.getStorageID(), rdBlocks) };
nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
.getBlock().getBlockPoolId(), .getBlock().getBlockPoolId(), report);
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(loc
.getBlock().getLocalBlock(), "") });
} }
} }
return prevBlock; return prevBlock;
@ -1016,8 +1028,10 @@ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
assert daemonId < numThreads : "Wrong daemonId."; assert daemonId < numThreads : "Wrong daemonId.";
TinyDatanode dn = datanodes[daemonId]; TinyDatanode dn = datanodes[daemonId];
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
StorageBlockReport[] report = { new StorageBlockReport(
dn.dnRegistration.getStorageID(), dn.getBlockReportList()) };
nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem() nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
.getBlockPoolId(), dn.getBlockReportList()); .getBlockPoolId(), report);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
return end-start; return end-start;
} }

View File

@ -38,6 +38,9 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@ -108,19 +111,22 @@ public void testDeadDatanode() throws Exception {
ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo( ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo(
new Block(0), "") }; new Block(0), "") };
StorageReceivedDeletedBlocks[] storageBlocks = {
new StorageReceivedDeletedBlocks(reg.getStorageID(), blocks) };
// Ensure blockReceived call from dead datanode is rejected with IOException // Ensure blockReceived call from dead datanode is rejected with IOException
try { try {
dnp.blockReceivedAndDeleted(reg, poolId, blocks); dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
Assert.fail("Expected IOException is not thrown"); Assert.fail("Expected IOException is not thrown");
} catch (IOException ex) { } catch (IOException ex) {
// Expected // Expected
} }
// Ensure blockReport from dead datanode is rejected with IOException // Ensure blockReport from dead datanode is rejected with IOException
long[] blockReport = new long[] { 0L, 0L, 0L }; StorageBlockReport[] report = { new StorageBlockReport(reg.getStorageID(),
new long[] { 0L, 0L, 0L }) };
try { try {
dnp.blockReport(reg, poolId, blockReport); dnp.blockReport(reg, poolId, report);
Assert.fail("Expected IOException is not thrown"); Assert.fail("Expected IOException is not thrown");
} catch (IOException ex) { } catch (IOException ex) {
// Expected // Expected
@ -128,7 +134,9 @@ public void testDeadDatanode() throws Exception {
// Ensure heartbeat from dead datanode is rejected with a command // Ensure heartbeat from dead datanode is rejected with a command
// that asks datanode to register again // that asks datanode to register again
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, 0, 0, 0, 0, 0, 0, 0); StorageReport[] rep = { new StorageReport(reg.getStorageID(), false, 0, 0,
0, 0) };
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0, 0, 0);
Assert.assertEquals(1, cmd.length); Assert.assertEquals(1, cmd.length);
Assert.assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER Assert.assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction()); .getAction());