HDDS-3. Send NodeReport and ContainerReport when datanodes register. Contributed by Bharat Viswanadham.

This commit is contained in:
Xiaoyu Yao 2018-05-17 08:52:00 -07:00
parent 41ae5c5002
commit 59bde09920
15 changed files with 186 additions and 32 deletions

View File

@ -101,6 +101,7 @@ public void execute(ExecutorService executor) {
.setConfig(conf)
.setEndpointStateMachine(endpoint)
.setDatanodeDetails(context.getParent().getDatanodeDetails())
.setOzoneContainer(context.getParent().getContainer())
.build();
case HEARTBEAT:
return HeartbeatEndpointTask.newBuilder()

View File

@ -20,12 +20,16 @@
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,18 +49,21 @@ public final class RegisterEndpointTask implements
private final Configuration conf;
private Future<EndpointStateMachine.EndPointStates> result;
private DatanodeDetails datanodeDetails;
private final OzoneContainer datanodeContainerManager;
/**
* Creates a register endpoint task.
*
* @param rpcEndPoint - endpoint
* @param conf - conf
* @param ozoneContainer - container
*/
@VisibleForTesting
public RegisterEndpointTask(EndpointStateMachine rpcEndPoint,
Configuration conf) {
Configuration conf, OzoneContainer ozoneContainer) {
this.rpcEndPoint = rpcEndPoint;
this.conf = conf;
this.datanodeContainerManager = ozoneContainer;
}
@ -97,9 +104,13 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
rpcEndPoint.lock();
try {
ContainerReportsRequestProto contianerReport = datanodeContainerManager
.getContainerReport();
SCMNodeReport nodeReport = datanodeContainerManager.getNodeReport();
// TODO : Add responses to the command Queue.
SCMRegisteredCmdResponseProto response = rpcEndPoint.getEndPoint()
.register(datanodeDetails.getProtoBufMessage());
.register(datanodeDetails.getProtoBufMessage(), nodeReport,
contianerReport);
Preconditions.checkState(UUID.fromString(response.getDatanodeUUID())
.equals(datanodeDetails.getUuid()),
"Unexpected datanode ID in the response.");
@ -139,6 +150,7 @@ public static class Builder {
private EndpointStateMachine endPointStateMachine;
private Configuration conf;
private DatanodeDetails datanodeDetails;
private OzoneContainer container;
/**
* Constructs the builder class.
@ -179,6 +191,17 @@ public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
return this;
}
/**
* Sets the ozonecontainer.
* @param ozoneContainer
* @return Builder
*/
public Builder setOzoneContainer(OzoneContainer ozoneContainer) {
this.container = ozoneContainer;
return this;
}
public RegisterEndpointTask build() {
if (endPointStateMachine == null) {
LOG.error("No endpoint specified.");
@ -198,8 +221,14 @@ public RegisterEndpointTask build() {
"construct RegisterEndpoint task");
}
if (container == null) {
LOG.error("Container is not specified");
throw new IllegalArgumentException("Container is not specified to " +
"constrict RegisterEndpoint task");
}
RegisterEndpointTask task = new RegisterEndpointTask(this
.endPointStateMachine, this.conf);
.endPointStateMachine, this.conf, this.container);
task.setDatanodeDetails(datanodeDetails);
return task;
}

View File

@ -69,11 +69,13 @@ SCMHeartbeatResponseProto sendHeartbeat(DatanodeDetailsProto datanodeDetails,
/**
* Register Datanode.
* @param datanodeDetails - Datanode Details.
*
* @param nodeReport - Node Report.
* @param containerReportsRequestProto - Container Reports.
* @return SCM Command.
*/
SCMRegisteredCmdResponseProto register(DatanodeDetailsProto datanodeDetails)
throws IOException;
SCMRegisteredCmdResponseProto register(DatanodeDetailsProto datanodeDetails,
SCMNodeReport nodeReport, ContainerReportsRequestProto
containerReportsRequestProto) throws IOException;
/**
* Send a container report.

View File

@ -51,9 +51,11 @@ public interface StorageContainerNodeProtocol {
/**
* Register the node if the node finds that it is not registered with any SCM.
* @param datanodeDetails DatanodeDetails
* @param nodeReport SCMNodeReport
* @return SCMHeartbeatResponseProto
*/
SCMCommand register(DatanodeDetailsProto datanodeDetails);
SCMCommand register(DatanodeDetailsProto datanodeDetails, SCMNodeReport
nodeReport);
/**
* Send heartbeat to indicate the datanode is alive and doing well.

View File

@ -152,14 +152,20 @@ public SCMHeartbeatResponseProto sendHeartbeat(
* Register Datanode.
*
* @param datanodeDetailsProto - Datanode Details
* @param nodeReport - Node Report.
* @param containerReportsRequestProto - Container Reports.
* @return SCM Command.
*/
@Override
public SCMRegisteredCmdResponseProto register(
DatanodeDetailsProto datanodeDetailsProto) throws IOException {
DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport,
ContainerReportsRequestProto containerReportsRequestProto)
throws IOException {
SCMRegisterRequestProto.Builder req =
SCMRegisterRequestProto.newBuilder();
req.setDatanodeDetails(datanodeDetailsProto);
req.setContainerReport(containerReportsRequestProto);
req.setNodeReport(nodeReport);
final SCMRegisteredCmdResponseProto response;
try {
response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build());

View File

@ -29,6 +29,8 @@
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
@ -69,7 +71,12 @@ public StorageContainerDatanodeProtocolServerSideTranslatorPB(
register(RpcController controller, StorageContainerDatanodeProtocolProtos
.SCMRegisterRequestProto request) throws ServiceException {
try {
return impl.register(request.getDatanodeDetails());
ContainerReportsRequestProto containerRequestProto = null;
SCMNodeReport scmNodeReport = null;
containerRequestProto = request.getContainerReport();
scmNodeReport = request.getNodeReport();
return impl.register(request.getDatanodeDetails(), scmNodeReport,
containerRequestProto);
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -146,6 +146,8 @@ message SCMStorageReport {
message SCMRegisterRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
required SCMNodeReport nodeReport = 2;
required ContainerReportsRequestProto containerReport = 3;
}
/**

View File

@ -37,16 +37,13 @@
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -62,6 +59,7 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
// Map of datanode to containers
private Map<DatanodeDetails, Map<String, ContainerInfo>> nodeContainers =
new HashMap();
private Map<DatanodeDetails, SCMNodeReport> nodeReports = new HashMap<>();
/**
* Returns the number of heartbeats made to this class.
*
@ -200,9 +198,13 @@ private void sleepIfNeeded() {
@Override
public StorageContainerDatanodeProtocolProtos
.SCMRegisteredCmdResponseProto register(
DatanodeDetailsProto datanodeDetailsProto)
DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport,
StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto
containerReportsRequestProto)
throws IOException {
rpcCount.incrementAndGet();
sendContainerReport(containerReportsRequestProto);
updateNodeReport(datanodeDetailsProto, nodeReport);
sleepIfNeeded();
return StorageContainerDatanodeProtocolProtos
.SCMRegisteredCmdResponseProto
@ -212,6 +214,50 @@ private void sleepIfNeeded() {
.SCMRegisteredCmdResponseProto.ErrorCode.success).build();
}
/**
* Update nodeReport.
* @param datanodeDetailsProto
* @param nodeReport
*/
public void updateNodeReport(DatanodeDetailsProto datanodeDetailsProto,
SCMNodeReport nodeReport) {
DatanodeDetails datanode = DatanodeDetails.getFromProtoBuf(
datanodeDetailsProto);
SCMNodeReport.Builder datanodeReport = SCMNodeReport.newBuilder();
List<SCMStorageReport> storageReports =
nodeReport.getStorageReportList();
for(SCMStorageReport report : storageReports) {
datanodeReport.addStorageReport(report);
}
nodeReports.put(datanode, datanodeReport.build());
}
/**
* Return the number of StorageReports of a datanode.
* @param datanodeDetails
* @return count of containers of a datanode
*/
public int getNodeReportsCount(DatanodeDetails datanodeDetails) {
return nodeReports.get(datanodeDetails).getStorageReportCount();
}
/**
* Returns the number of containers of a datanode.
* @param datanodeDetails
* @return count of storage reports of a datanode
*/
public int getContainerCountsForDatanode(DatanodeDetails datanodeDetails) {
Map<String, ContainerInfo> cr = nodeContainers.get(datanodeDetails);
if(cr != null) {
return cr.size();
}
return 0;
}
/**
* Send a container report.
*

View File

@ -739,11 +739,13 @@ public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
* This function generates and assigns new datanode ID
* for the datanode. This allows SCM to be run independent
* of Namenode if required.
* @param nodeReport NodeReport.
*
* @return SCMHeartbeatResponseProto
*/
@Override
public SCMCommand register(DatanodeDetailsProto datanodeDetailsProto) {
public SCMCommand register(DatanodeDetailsProto datanodeDetailsProto,
SCMNodeReport nodeReport) {
String hostname = null;
String ip = null;
@ -788,6 +790,8 @@ public SCMCommand register(DatanodeDetailsProto datanodeDetailsProto) {
.setErrorCode(ErrorCode.errorNodeNotPermitted)
.build();
}
// Updating Node Report, as registration is successful
updateNodeStat(datanodeDetails.getUuid(), nodeReport);
LOG.info("Data node with ID: {} Registered.",
datanodeDetails.getUuid());
RegisteredCommand.Builder builder =

View File

@ -35,6 +35,8 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
@ -167,11 +169,19 @@ public SCMHeartbeatResponseProto sendHeartbeat(
@Override
public SCMRegisteredCmdResponseProto register(
HddsProtos.DatanodeDetailsProto datanodeDetails)
HddsProtos.DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport,
ContainerReportsRequestProto containerReportsRequestProto)
throws IOException {
// TODO : Return the list of Nodes that forms the SCM HA.
return getRegisteredResponse(scm.getScmNodeManager()
.register(datanodeDetails));
RegisteredCommand registeredCommand = (RegisteredCommand) scm
.getScmNodeManager().register(datanodeDetails, nodeReport);
SCMCmdType type = registeredCommand.getType();
if (type == SCMCmdType.registeredCommand && registeredCommand.getError()
== SCMRegisteredCmdResponseProto.ErrorCode.success) {
scm.getScmContainerManager().processContainerReports(
containerReportsRequestProto);
}
return getRegisteredResponse(registeredCommand);
}
@VisibleForTesting

View File

@ -16,6 +16,10 @@
*/
package org.apache.hadoop.hdds.scm;
import org.apache.hadoop.hdds.protocol
.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -46,10 +50,27 @@ public static DatanodeDetails getDatanodeDetails(SCMNodeManager nodeManager) {
public static DatanodeDetails getDatanodeDetails(SCMNodeManager nodeManager,
String uuid) {
DatanodeDetails datanodeDetails = getDatanodeDetails(uuid);
nodeManager.register(datanodeDetails.getProtoBufMessage());
nodeManager.register(datanodeDetails.getProtoBufMessage(), null);
return datanodeDetails;
}
/**
* Create Node Report object.
* @return SCMNodeReport
*/
public static SCMNodeReport createNodeReport() {
SCMNodeReport.Builder nodeReport = SCMNodeReport.newBuilder();
for (int i = 0; i < 1; i++) {
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
nodeReport.addStorageReport(i, srb.setStorageUuid("disk")
.setCapacity(100)
.setScmUsed(10)
.setRemaining(90)
.build());
}
return nodeReport.build();
}
/**
* Get specified number of DatanodeDetails and registered them with node
* manager.

View File

@ -373,10 +373,12 @@ public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
* SCM.
*
* @param datanodeDetails DatanodeDetailsProto
* @param nodeReport SCMNodeReport
* @return SCMHeartbeatResponseProto
*/
@Override
public SCMCommand register(HddsProtos.DatanodeDetailsProto datanodeDetails) {
public SCMCommand register(HddsProtos.DatanodeDetailsProto datanodeDetails,
SCMNodeReport nodeReport) {
return null;
}

View File

@ -282,7 +282,8 @@ public void testScmHeartbeatAfterRestart() throws Exception {
100, TimeUnit.MILLISECONDS);
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
try (SCMNodeManager nodemanager = createNodeManager(conf)) {
nodemanager.register(datanodeDetails.getProtoBufMessage());
nodemanager.register(datanodeDetails.getProtoBufMessage(),
TestUtils.createNodeReport());
List<SCMCommand> command = nodemanager.sendHeartbeat(
datanodeDetails.getProtoBufMessage(),
null, reportState);

View File

@ -55,12 +55,14 @@
.RegisterEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint
.VersionEndpointTask;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Time;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.net.InetSocketAddress;
@ -75,6 +77,7 @@
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils
.createEndpoint;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.when;
/**
* Tests the endpoints.
@ -208,11 +211,17 @@ public void testRegister() throws Exception {
createEndpoint(
SCMTestUtils.getConf(), serverAddress, 1000)) {
SCMRegisteredCmdResponseProto responseProto = rpcEndPoint.getEndPoint()
.register(nodeToRegister.getProtoBufMessage());
.register(nodeToRegister.getProtoBufMessage(),
TestUtils.createNodeReport(),
createContainerReport(10, nodeToRegister));
Assert.assertNotNull(responseProto);
Assert.assertEquals(nodeToRegister.getUuidString(),
responseProto.getDatanodeUUID());
Assert.assertNotNull(responseProto.getClusterID());
Assert.assertEquals(10, scmServerImpl.
getContainerCountsForDatanode(nodeToRegister));
Assert.assertEquals(1, scmServerImpl.getNodeReportsCount(
nodeToRegister));
}
}
@ -223,8 +232,13 @@ private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
createEndpoint(conf,
scmAddress, rpcTimeout);
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.REGISTER);
OzoneContainer ozoneContainer = mock(OzoneContainer.class);
when(ozoneContainer.getNodeReport()).thenReturn(TestUtils
.createNodeReport());
when(ozoneContainer.getContainerReport()).thenReturn(
createContainerReport(10, null));
RegisterEndpointTask endpointTask =
new RegisterEndpointTask(rpcEndPoint, conf);
new RegisterEndpointTask(rpcEndPoint, conf, ozoneContainer);
if (!clearDatanodeDetails) {
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
endpointTask.setDatanodeDetails(datanodeDetails);
@ -419,7 +433,8 @@ public void testContainerReport() throws Exception {
createEndpoint(SCMTestUtils.getConf(),
serverAddress, 1000)) {
ContainerReportsResponseProto responseProto = rpcEndPoint
.getEndPoint().sendContainerReport(createContainerReport(count));
.getEndPoint().sendContainerReport(createContainerReport(count,
null));
Assert.assertNotNull(responseProto);
}
Assert.assertEquals(1, scmServerImpl.getContainerReportsCount());
@ -430,7 +445,8 @@ public void testContainerReport() throws Exception {
Assert.assertEquals(expectedBytesUsed, scmServerImpl.getBytesUsed());
}
private ContainerReportsRequestProto createContainerReport(int count) {
private ContainerReportsRequestProto createContainerReport(
int count, DatanodeDetails datanodeDetails) {
StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
reportsBuilder = StorageContainerDatanodeProtocolProtos
.ContainerReportsRequestProto.newBuilder();
@ -448,8 +464,12 @@ private ContainerReportsRequestProto createContainerReport(int count) {
reportsBuilder.addReports(report.getProtoBufMessage());
}
if(datanodeDetails == null) {
reportsBuilder.setDatanodeDetails(getDatanodeDetails()
.getProtoBufMessage());
} else {
reportsBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage());
}
reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
.ContainerReportsRequestProto.reportType.fullReport);
return reportsBuilder.build();

View File

@ -279,11 +279,12 @@ public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
* Register the node if the node finds that it is not registered with any SCM.
*
* @param dd DatanodeDetailsProto
*
* @param nodeReport SCMNodeReport
* @return SCMHeartbeatResponseProto
*/
@Override
public SCMCommand register(HddsProtos.DatanodeDetailsProto dd) {
public SCMCommand register(HddsProtos.DatanodeDetailsProto dd,
SCMNodeReport nodeReport) {
return null;
}