HDFS-3086. Change Datanode not to send storage list in registration.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1303318 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
19495b2f4b
commit
9e108e61fb
@ -241,7 +241,10 @@ Release 0.23.3 - UNRELEASED
|
|||||||
|
|
||||||
HDFS-3088. Move FSDatasetInterface inner classes to a package. (szetszwo)
|
HDFS-3088. Move FSDatasetInterface inner classes to a package. (szetszwo)
|
||||||
|
|
||||||
HDFS-3105. Add DatanodeStorage information to block recovery. (szetszwo)
|
HDFS-3105. Add DatanodeStorage information to block recovery. (szetszwo)
|
||||||
|
|
||||||
|
HDFS-3086. Change Datanode not to send storage list in registration.
|
||||||
|
(szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
|
@ -41,7 +41,6 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.NNHAStatusHeartbeatProto;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
|
||||||
@ -50,12 +49,10 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||||
@ -69,7 +66,6 @@
|
|||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
||||||
import org.apache.hadoop.ipc.ProtocolSignature;
|
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.RpcClientUtil;
|
import org.apache.hadoop.ipc.RpcClientUtil;
|
||||||
@ -145,14 +141,10 @@ public void close() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatanodeRegistration registerDatanode(DatanodeRegistration registration,
|
public DatanodeRegistration registerDatanode(DatanodeRegistration registration
|
||||||
DatanodeStorage[] storages) throws IOException {
|
) throws IOException {
|
||||||
RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto
|
RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto
|
||||||
.newBuilder().setRegistration(PBHelper.convert(registration));
|
.newBuilder().setRegistration(PBHelper.convert(registration));
|
||||||
for (DatanodeStorage s : storages) {
|
|
||||||
builder.addStorages(PBHelper.convert(s));
|
|
||||||
}
|
|
||||||
|
|
||||||
RegisterDatanodeResponseProto resp;
|
RegisterDatanodeResponseProto resp;
|
||||||
try {
|
try {
|
||||||
resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build());
|
resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build());
|
||||||
@ -198,7 +190,7 @@ public DatanodeCommand blockReport(DatanodeRegistration registration,
|
|||||||
|
|
||||||
for (StorageBlockReport r : reports) {
|
for (StorageBlockReport r : reports) {
|
||||||
StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
|
StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
|
||||||
.newBuilder().setStorageID(r.getStorageID());
|
.newBuilder().setStorage(PBHelper.convert(r.getStorage()));
|
||||||
long[] blocks = r.getBlocks();
|
long[] blocks = r.getBlocks();
|
||||||
for (int i = 0; i < blocks.length; i++) {
|
for (int i = 0; i < blocks.length; i++) {
|
||||||
reportBuilder.addBlocks(blocks[i]);
|
reportBuilder.addBlocks(blocks[i]);
|
||||||
|
@ -50,7 +50,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||||
@ -88,12 +87,8 @@ public RegisterDatanodeResponseProto registerDatanode(
|
|||||||
DatanodeRegistration registration = PBHelper.convert(request
|
DatanodeRegistration registration = PBHelper.convert(request
|
||||||
.getRegistration());
|
.getRegistration());
|
||||||
DatanodeRegistration registrationResp;
|
DatanodeRegistration registrationResp;
|
||||||
DatanodeStorage[] storages = new DatanodeStorage[request.getStoragesCount()];
|
|
||||||
for (int i = 0; i < request.getStoragesCount(); i++) {
|
|
||||||
storages[i] = PBHelper.convert(request.getStorages(i));
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
registrationResp = impl.registerDatanode(registration, storages);
|
registrationResp = impl.registerDatanode(registration);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
@ -148,7 +143,8 @@ public BlockReportResponseProto blockReport(RpcController controller,
|
|||||||
for (int i = 0; i < blockIds.size(); i++) {
|
for (int i = 0; i < blockIds.size(); i++) {
|
||||||
blocks[i] = blockIds.get(i);
|
blocks[i] = blockIds.get(i);
|
||||||
}
|
}
|
||||||
report[index++] = new StorageBlockReport(s.getStorageID(), blocks);
|
report[index++] = new StorageBlockReport(PBHelper.convert(s.getStorage()),
|
||||||
|
blocks);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
|
cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
|
||||||
|
@ -24,7 +24,6 @@
|
|||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
@ -384,7 +383,8 @@ DatanodeCommand blockReport() throws IOException {
|
|||||||
// Send block report
|
// Send block report
|
||||||
long brSendStartTime = now();
|
long brSendStartTime = now();
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
bpRegistration.getStorageID(), bReport.getBlockListAsLongs()) };
|
new DatanodeStorage(bpRegistration.getStorageID()),
|
||||||
|
bReport.getBlockListAsLongs()) };
|
||||||
cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), report);
|
cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), report);
|
||||||
|
|
||||||
// Log the block report processing stats from Datanode perspective
|
// Log the block report processing stats from Datanode perspective
|
||||||
@ -603,8 +603,7 @@ void register() throws IOException {
|
|||||||
while (shouldRun()) {
|
while (shouldRun()) {
|
||||||
try {
|
try {
|
||||||
// Use returned registration from namenode with updated machine name.
|
// Use returned registration from namenode with updated machine name.
|
||||||
bpRegistration = bpNamenode.registerDatanode(bpRegistration,
|
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
|
||||||
new DatanodeStorage[0]);
|
|
||||||
break;
|
break;
|
||||||
} catch(SocketTimeoutException e) { // namenode is busy
|
} catch(SocketTimeoutException e) { // namenode is busy
|
||||||
LOG.info("Problem connecting to server: " + nnAddr);
|
LOG.info("Problem connecting to server: " + nnAddr);
|
||||||
|
@ -46,7 +46,6 @@
|
|||||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
|
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
|
||||||
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
|
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
|
||||||
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
@ -95,7 +94,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||||
@ -830,11 +828,10 @@ public String getLinkTarget(String path) throws IOException {
|
|||||||
|
|
||||||
|
|
||||||
@Override // DatanodeProtocol
|
@Override // DatanodeProtocol
|
||||||
public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg,
|
public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg
|
||||||
DatanodeStorage[] storages) throws IOException {
|
) throws IOException {
|
||||||
verifyVersion(nodeReg.getVersion());
|
verifyVersion(nodeReg.getVersion());
|
||||||
namesystem.registerDatanode(nodeReg);
|
namesystem.registerDatanode(nodeReg);
|
||||||
|
|
||||||
return nodeReg;
|
return nodeReg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,14 +80,12 @@ public interface DatanodeProtocol {
|
|||||||
*
|
*
|
||||||
* @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem#registerDatanode(DatanodeRegistration)
|
* @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem#registerDatanode(DatanodeRegistration)
|
||||||
* @param registration datanode registration information
|
* @param registration datanode registration information
|
||||||
* @param storages list of storages on the datanode``
|
|
||||||
* @return updated {@link org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration}, which contains
|
* @return updated {@link org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration}, which contains
|
||||||
* new storageID if the datanode did not have one and
|
* new storageID if the datanode did not have one and
|
||||||
* registration ID for further communication.
|
* registration ID for further communication.
|
||||||
*/
|
*/
|
||||||
public DatanodeRegistration registerDatanode(
|
public DatanodeRegistration registerDatanode(DatanodeRegistration registration
|
||||||
DatanodeRegistration registration, DatanodeStorage[] storages)
|
) throws IOException;
|
||||||
throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* sendHeartbeat() tells the NameNode that the DataNode is still
|
* sendHeartbeat() tells the NameNode that the DataNode is still
|
||||||
|
@ -18,9 +18,10 @@
|
|||||||
package org.apache.hadoop.hdfs.server.protocol;
|
package org.apache.hadoop.hdfs.server.protocol;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class capatures information about a storage in Datanode
|
* Class captures information of a storage in Datanode.
|
||||||
*/
|
*/
|
||||||
public class DatanodeStorage {
|
public class DatanodeStorage {
|
||||||
|
/** The state of the storage. */
|
||||||
public enum State {
|
public enum State {
|
||||||
NORMAL,
|
NORMAL,
|
||||||
READ_ONLY
|
READ_ONLY
|
||||||
@ -28,7 +29,15 @@ public enum State {
|
|||||||
|
|
||||||
private final String storageID;
|
private final String storageID;
|
||||||
private final State state;
|
private final State state;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a storage with {@link State#NORMAL}.
|
||||||
|
* @param storageID
|
||||||
|
*/
|
||||||
|
public DatanodeStorage(String storageID) {
|
||||||
|
this(storageID, State.NORMAL);
|
||||||
|
}
|
||||||
|
|
||||||
public DatanodeStorage(String sid, State s) {
|
public DatanodeStorage(String sid, State s) {
|
||||||
storageID = sid;
|
storageID = sid;
|
||||||
state = s;
|
state = s;
|
||||||
|
@ -22,16 +22,16 @@
|
|||||||
* Block report for a Datanode storage
|
* Block report for a Datanode storage
|
||||||
*/
|
*/
|
||||||
public class StorageBlockReport {
|
public class StorageBlockReport {
|
||||||
private final String storageID;
|
private final DatanodeStorage storage;
|
||||||
private final long[] blocks;
|
private final long[] blocks;
|
||||||
|
|
||||||
public StorageBlockReport(String sid, long[] blocks) {
|
public StorageBlockReport(DatanodeStorage storage, long[] blocks) {
|
||||||
this.storageID = sid;
|
this.storage = storage;
|
||||||
this.blocks = blocks;
|
this.blocks = blocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getStorageID() {
|
public DatanodeStorage getStorage() {
|
||||||
return storageID;
|
return storage;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long[] getBlocks() {
|
public long[] getBlocks() {
|
||||||
|
@ -149,7 +149,6 @@ message UpgradeCommandProto {
|
|||||||
*/
|
*/
|
||||||
message RegisterDatanodeRequestProto {
|
message RegisterDatanodeRequestProto {
|
||||||
required DatanodeRegistrationProto registration = 1; // Datanode info
|
required DatanodeRegistrationProto registration = 1; // Datanode info
|
||||||
repeated DatanodeStorageProto storages = 2; // Storages on the datanode
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -227,7 +226,7 @@ message BlockReportRequestProto {
|
|||||||
* Report of blocks in a storage
|
* Report of blocks in a storage
|
||||||
*/
|
*/
|
||||||
message StorageBlockReportProto {
|
message StorageBlockReportProto {
|
||||||
required string storageID = 1; // Storage ID
|
required DatanodeStorageProto storage = 1; // Storage
|
||||||
repeated uint64 blocks = 2 [packed=true];
|
repeated uint64 blocks = 2 [packed=true];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,7 +17,9 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
@ -36,7 +38,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
|
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
|
||||||
@ -76,7 +77,7 @@ public class TestBPOfferService {
|
|||||||
private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
|
private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
|
||||||
private int heartbeatCounts[] = new int[2];
|
private int heartbeatCounts[] = new int[2];
|
||||||
private DataNode mockDn;
|
private DataNode mockDn;
|
||||||
private FSDatasetInterface mockFSDataset;
|
private FSDatasetInterface<?> mockFSDataset;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setupMocks() throws Exception {
|
public void setupMocks() throws Exception {
|
||||||
@ -114,8 +115,7 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx)
|
|||||||
.when(mock).versionRequest();
|
.when(mock).versionRequest();
|
||||||
|
|
||||||
Mockito.doReturn(new DatanodeRegistration("fake-node"))
|
Mockito.doReturn(new DatanodeRegistration("fake-node"))
|
||||||
.when(mock).registerDatanode(Mockito.any(DatanodeRegistration.class),
|
.when(mock).registerDatanode(Mockito.any(DatanodeRegistration.class));
|
||||||
Mockito.any(DatanodeStorage[].class));
|
|
||||||
|
|
||||||
Mockito.doAnswer(new HeartbeatAnswer(nnIdx))
|
Mockito.doAnswer(new HeartbeatAnswer(nnIdx))
|
||||||
.when(mock).sendHeartbeat(
|
.when(mock).sendHeartbeat(
|
||||||
@ -161,10 +161,10 @@ public void testBasicFunctionality() throws Exception {
|
|||||||
waitForInitialization(bpos);
|
waitForInitialization(bpos);
|
||||||
|
|
||||||
// The DN should have register to both NNs.
|
// The DN should have register to both NNs.
|
||||||
Mockito.verify(mockNN1).registerDatanode(Mockito.any(DatanodeRegistration.class),
|
Mockito.verify(mockNN1).registerDatanode(
|
||||||
Mockito.any(DatanodeStorage[].class));
|
Mockito.any(DatanodeRegistration.class));
|
||||||
Mockito.verify(mockNN2).registerDatanode(Mockito.any(DatanodeRegistration.class),
|
Mockito.verify(mockNN2).registerDatanode(
|
||||||
Mockito.any(DatanodeStorage[].class));
|
Mockito.any(DatanodeRegistration.class));
|
||||||
|
|
||||||
// Should get block reports from both NNs
|
// Should get block reports from both NNs
|
||||||
waitForBlockReport(mockNN1);
|
waitForBlockReport(mockNN1);
|
||||||
|
@ -62,7 +62,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
||||||
@ -138,8 +137,7 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
|
|||||||
return (DatanodeRegistration) invocation.getArguments()[0];
|
return (DatanodeRegistration) invocation.getArguments()[0];
|
||||||
}
|
}
|
||||||
}).when(namenode).registerDatanode(
|
}).when(namenode).registerDatanode(
|
||||||
Mockito.any(DatanodeRegistration.class),
|
Mockito.any(DatanodeRegistration.class));
|
||||||
Mockito.any(DatanodeStorage[].class));
|
|
||||||
|
|
||||||
when(namenode.versionRequest()).thenReturn(new NamespaceInfo
|
when(namenode.versionRequest()).thenReturn(new NamespaceInfo
|
||||||
(1, CLUSTER_ID, POOL_ID, 1L, 1));
|
(1, CLUSTER_ID, POOL_ID, 1L, 1));
|
||||||
|
@ -39,6 +39,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
@ -147,7 +148,8 @@ public void blockReport_01() throws IOException {
|
|||||||
DataNode dn = cluster.getDataNodes().get(DN_N0);
|
DataNode dn = cluster.getDataNodes().get(DN_N0);
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
|
new DatanodeStorage(dnR.getStorageID()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
|
|
||||||
@ -228,7 +230,8 @@ public void blockReport_02() throws IOException {
|
|||||||
// all blocks belong to the same file, hence same BP
|
// all blocks belong to the same file, hence same BP
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
|
new DatanodeStorage(dnR.getStorageID()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
|
|
||||||
@ -269,7 +272,8 @@ public void blockReport_03() throws IOException {
|
|||||||
DataNode dn = cluster.getDataNodes().get(DN_N0);
|
DataNode dn = cluster.getDataNodes().get(DN_N0);
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
|
new DatanodeStorage(dnR.getStorageID()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
DatanodeCommand dnCmd =
|
DatanodeCommand dnCmd =
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
@ -322,7 +326,8 @@ public void blockReport_06() throws IOException {
|
|||||||
DataNode dn = cluster.getDataNodes().get(DN_N1);
|
DataNode dn = cluster.getDataNodes().get(DN_N1);
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
|
new DatanodeStorage(dnR.getStorageID()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
printStats();
|
printStats();
|
||||||
@ -372,7 +377,8 @@ public void blockReport_07() throws IOException {
|
|||||||
DataNode dn = cluster.getDataNodes().get(DN_N1);
|
DataNode dn = cluster.getDataNodes().get(DN_N1);
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
|
new DatanodeStorage(dnR.getStorageID()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
printStats();
|
printStats();
|
||||||
@ -395,7 +401,8 @@ public void blockReport_07() throws IOException {
|
|||||||
LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName());
|
LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName());
|
||||||
}
|
}
|
||||||
|
|
||||||
report[0] = new StorageBlockReport(dnR.getStorageID(),
|
report[0] = new StorageBlockReport(
|
||||||
|
new DatanodeStorage(dnR.getStorageID()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
printStats();
|
printStats();
|
||||||
@ -446,7 +453,8 @@ public void blockReport_08() throws IOException {
|
|||||||
DataNode dn = cluster.getDataNodes().get(DN_N1);
|
DataNode dn = cluster.getDataNodes().get(DN_N1);
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
|
new DatanodeStorage(dnR.getStorageID()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
printStats();
|
printStats();
|
||||||
@ -493,7 +501,8 @@ public void blockReport_09() throws IOException {
|
|||||||
DataNode dn = cluster.getDataNodes().get(DN_N1);
|
DataNode dn = cluster.getDataNodes().get(DN_N1);
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
|
new DatanodeStorage(dnR.getStorageID()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
printStats();
|
printStats();
|
||||||
|
@ -44,6 +44,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
@ -146,7 +147,8 @@ public void testVolumeFailure() throws IOException {
|
|||||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
|
||||||
final StorageBlockReport[] report = {
|
final StorageBlockReport[] report = {
|
||||||
new StorageBlockReport(dnR.getStorageID(),
|
new StorageBlockReport(
|
||||||
|
new DatanodeStorage(dnR.getStorageID()),
|
||||||
DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid
|
DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid
|
||||||
).getBlockListAsLongs())
|
).getBlockListAsLongs())
|
||||||
};
|
};
|
||||||
|
@ -31,6 +31,8 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
@ -51,8 +53,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.net.DNS;
|
import org.apache.hadoop.net.DNS;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
@ -612,7 +612,6 @@ void parseArguments(List<String> args) {
|
|||||||
super.parseArguments(args);
|
super.parseArguments(args);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
void generateInputs(int[] opsPerThread) throws IOException {
|
void generateInputs(int[] opsPerThread) throws IOException {
|
||||||
// create files using opsPerThread
|
// create files using opsPerThread
|
||||||
String[] createArgs = new String[] {
|
String[] createArgs = new String[] {
|
||||||
@ -742,7 +741,6 @@ void generateInputs(int[] opsPerThread) throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
long executeOp(int daemonId, int inputIdx, String ignore)
|
long executeOp(int daemonId, int inputIdx, String ignore)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
@ -762,6 +760,7 @@ private static class TinyDatanode implements Comparable<String> {
|
|||||||
|
|
||||||
NamespaceInfo nsInfo;
|
NamespaceInfo nsInfo;
|
||||||
DatanodeRegistration dnRegistration;
|
DatanodeRegistration dnRegistration;
|
||||||
|
DatanodeStorage storage; //only one storage
|
||||||
ArrayList<Block> blocks;
|
ArrayList<Block> blocks;
|
||||||
int nrBlocks; // actual number of blocks
|
int nrBlocks; // actual number of blocks
|
||||||
long[] blockReportList;
|
long[] blockReportList;
|
||||||
@ -797,10 +796,15 @@ void register() throws IOException {
|
|||||||
dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
|
dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
|
||||||
DataNode.setNewStorageID(dnRegistration);
|
DataNode.setNewStorageID(dnRegistration);
|
||||||
// register datanode
|
// register datanode
|
||||||
|
dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
|
||||||
DatanodeStorage[] storages = { new DatanodeStorage(
|
//first block reports
|
||||||
dnRegistration.getStorageID(), DatanodeStorage.State.NORMAL) };
|
storage = new DatanodeStorage(dnRegistration.getStorageID());
|
||||||
dnRegistration = nameNodeProto.registerDatanode(dnRegistration, storages);
|
final StorageBlockReport[] reports = {
|
||||||
|
new StorageBlockReport(storage,
|
||||||
|
new BlockListAsLongs(null, null).getBlockListAsLongs())
|
||||||
|
};
|
||||||
|
nameNodeProto.blockReport(dnRegistration,
|
||||||
|
nameNode.getNamesystem().getBlockPoolId(), reports);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1032,7 +1036,7 @@ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
|
|||||||
TinyDatanode dn = datanodes[daemonId];
|
TinyDatanode dn = datanodes[daemonId];
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
dn.dnRegistration.getStorageID(), dn.getBlockReportList()) };
|
dn.storage, dn.getBlockReportList()) };
|
||||||
nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
|
nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
|
||||||
.getBlockPoolId(), report);
|
.getBlockPoolId(), report);
|
||||||
long end = System.currentTimeMillis();
|
long end = System.currentTimeMillis();
|
||||||
|
@ -36,6 +36,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
@ -125,7 +126,8 @@ public void testDeadDatanode() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Ensure blockReport from dead datanode is rejected with IOException
|
// Ensure blockReport from dead datanode is rejected with IOException
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(reg.getStorageID(),
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
|
new DatanodeStorage(reg.getStorageID()),
|
||||||
new long[] { 0L, 0L, 0L }) };
|
new long[] { 0L, 0L, 0L }) };
|
||||||
try {
|
try {
|
||||||
dnp.blockReport(reg, poolId, report);
|
dnp.blockReport(reg, poolId, report);
|
||||||
|
Loading…
Reference in New Issue
Block a user