HDDS-601. On restart, SCM throws 'No such datanode' exception.

This commit is contained in:
Hanisha Koneru 2018-10-11 13:20:32 -07:00
parent 2addebb94f
commit eb34b5f8af
9 changed files with 164 additions and 102 deletions

View File

@ -42,7 +42,7 @@
import java.util.concurrent.Future;
/**
* Register a container with SCM.
* Register a datanode with SCM.
*/
public final class RegisterEndpointTask implements
Callable<EndpointStateMachine.EndPointStates> {

View File

@ -67,4 +67,12 @@ RegisteredCommand register(DatanodeDetails datanodeDetails,
*/
List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails);
/**
* Check if node is registered or not.
* Return true if Node is registered and false otherwise.
* @param datanodeDetails - Datanode ID.
* @return true if Node is registered, false otherwise
*/
Boolean isNodeRegistered(DatanodeDetails datanodeDetails);
}

View File

@ -196,4 +196,11 @@ void addDatanodeInContainerMap(UUID uuid, Set<ContainerID> containerIDs)
* @param dnUuid datanode uuid.
*/
void processDeadNode(UUID dnUuid);
/**
* Get list of SCMCommands in the Command Queue for a particular Datanode.
* @param dnID - Datanode uuid.
* @return list of commands
*/
List<SCMCommand> getCommandQueue(UUID dnID);
}

View File

@ -52,7 +52,6 @@
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
@ -196,7 +195,7 @@ private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) {
try {
stat = nodeStateManager.getNodeStat(dnId);
} catch (NodeNotFoundException e) {
LOG.debug("SCM updateNodeStat based on heartbeat from previous" +
LOG.debug("SCM updateNodeStat based on heartbeat from previous " +
"dead datanode {}", dnId);
stat = new SCMNodeStat();
}
@ -277,7 +276,7 @@ public RegisteredCommand register(
nodeStateManager.setNodeStat(dnId, new SCMNodeStat());
// Updating Node Report, as registration is successful
updateNodeStat(datanodeDetails.getUuid(), nodeReport);
LOG.info("Data node with ID: {} Registered.", datanodeDetails.getUuid());
LOG.info("Registered Data node : {}", datanodeDetails);
} catch (NodeAlreadyExistsException e) {
LOG.trace("Datanode is already registered. Datanode: {}",
datanodeDetails.toString());
@ -304,14 +303,22 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
try {
nodeStateManager.updateLastHeartbeatTime(datanodeDetails);
} catch (NodeNotFoundException e) {
LOG.warn("SCM receive heartbeat from unregistered datanode {}",
datanodeDetails);
commandQueue.addCommand(datanodeDetails.getUuid(),
new ReregisterCommand());
LOG.error("SCM trying to process heartbeat from an " +
"unregistered node {}. Ignoring the heartbeat.", datanodeDetails);
}
return commandQueue.getCommand(datanodeDetails.getUuid());
}
@Override
public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) {
try {
nodeStateManager.getNode(datanodeDetails);
return true;
} catch (NodeNotFoundException e) {
return false;
}
}
/**
* Process node report.
*
@ -487,4 +494,9 @@ public void processDeadNode(UUID dnUuid) {
+ " doesn't exist or decommissioned already.", dnUuid);
}
}
@Override
public List<SCMCommand> getCommandQueue(UUID dnID) {
return commandQueue.getCommand(dnID);
}
}

View File

@ -35,13 +35,15 @@
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
@ -82,51 +84,78 @@ public SCMDatanodeHeartbeatDispatcher(NodeManager nodeManager,
public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {
DatanodeDetails datanodeDetails =
DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails());
// should we dispatch heartbeat through eventPublisher?
List<SCMCommand> commands = nodeManager.processHeartbeat(datanodeDetails);
if (heartbeat.hasNodeReport()) {
LOG.debug("Dispatching Node Report.");
eventPublisher.fireEvent(NODE_REPORT,
new NodeReportFromDatanode(datanodeDetails,
heartbeat.getNodeReport()));
}
List<SCMCommand> commands;
if (heartbeat.hasContainerReport()) {
LOG.debug("Dispatching Container Report.");
eventPublisher.fireEvent(CONTAINER_REPORT,
new ContainerReportFromDatanode(datanodeDetails,
heartbeat.getContainerReport()));
// If node is not registered, ask the node to re-register. Do not process
// Heartbeat for unregistered nodes.
if (!nodeManager.isNodeRegistered(datanodeDetails)) {
LOG.info("SCM received heartbeat from an unregistered datanode {}. " +
"Asking datanode to re-register.", datanodeDetails);
UUID dnID = datanodeDetails.getUuid();
nodeManager.addDatanodeCommand(dnID, new ReregisterCommand());
}
commands = nodeManager.getCommandQueue(dnID);
if (heartbeat.hasContainerActions()) {
LOG.debug("Dispatching Container Actions.");
eventPublisher.fireEvent(CONTAINER_ACTIONS,
new ContainerActionsFromDatanode(datanodeDetails,
heartbeat.getContainerActions()));
}
} else {
if (heartbeat.hasPipelineReports()) {
LOG.debug("Dispatching Pipeline Report.");
eventPublisher.fireEvent(PIPELINE_REPORT,
new PipelineReportFromDatanode(datanodeDetails,
heartbeat.getPipelineReports()));
// should we dispatch heartbeat through eventPublisher?
commands = nodeManager.processHeartbeat(datanodeDetails);
if (heartbeat.hasNodeReport()) {
LOG.debug("Dispatching Node Report.");
eventPublisher.fireEvent(
NODE_REPORT,
new NodeReportFromDatanode(
datanodeDetails,
heartbeat.getNodeReport()));
}
}
if (heartbeat.hasContainerReport()) {
LOG.debug("Dispatching Container Report.");
eventPublisher.fireEvent(
CONTAINER_REPORT,
new ContainerReportFromDatanode(
datanodeDetails,
heartbeat.getContainerReport()));
if (heartbeat.hasPipelineActions()) {
LOG.debug("Dispatching Pipeline Actions.");
eventPublisher.fireEvent(PIPELINE_ACTIONS,
new PipelineActionsFromDatanode(datanodeDetails,
heartbeat.getPipelineActions()));
}
}
if (heartbeat.getCommandStatusReportsCount() != 0) {
for (CommandStatusReportsProto commandStatusReport : heartbeat
.getCommandStatusReportsList()) {
eventPublisher.fireEvent(CMD_STATUS_REPORT,
new CommandStatusReportFromDatanode(datanodeDetails,
commandStatusReport));
if (heartbeat.hasContainerActions()) {
LOG.debug("Dispatching Container Actions.");
eventPublisher.fireEvent(
CONTAINER_ACTIONS,
new ContainerActionsFromDatanode(
datanodeDetails,
heartbeat.getContainerActions()));
}
if (heartbeat.hasPipelineReports()) {
LOG.debug("Dispatching Pipeline Report.");
eventPublisher.fireEvent(
PIPELINE_REPORT,
new PipelineReportFromDatanode(
datanodeDetails,
heartbeat.getPipelineReports()));
}
if (heartbeat.hasPipelineActions()) {
LOG.debug("Dispatching Pipeline Actions.");
eventPublisher.fireEvent(
PIPELINE_ACTIONS,
new PipelineActionsFromDatanode(
datanodeDetails,
heartbeat.getPipelineActions()));
}
if (heartbeat.getCommandStatusReportsCount() != 0) {
for (CommandStatusReportsProto commandStatusReport : heartbeat
.getCommandStatusReportsList()) {
eventPublisher.fireEvent(
CMD_STATUS_REPORT,
new CommandStatusReportFromDatanode(
datanodeDetails,
commandStatusReport));
}
}
}

View File

@ -407,6 +407,12 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
return null;
}
@Override
public Boolean isNodeRegistered(
DatanodeDetails datanodeDetails) {
return null;
}
@Override
public Map<String, Integer> getNodeCount() {
Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
@ -470,6 +476,11 @@ public void processDeadNode(UUID dnUuid) {
}
}
@Override
public List<SCMCommand> getCommandQueue(UUID dnID) {
return null;
}
/**
* A class to declare some values for the nodes so that our tests
* won't fail.

View File

@ -17,11 +17,8 @@
*/
package org.apache.hadoop.hdds.scm.node;
import com.google.common.base.Supplier;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
@ -213,56 +210,6 @@ public void testScmShutdown() throws IOException, InterruptedException,
//TODO: add assertion
}
/**
* Asserts scm informs datanodes to re-register with the nodemanager
* on a restart.
*
* @throws Exception
*/
@Test
public void testScmHeartbeatAfterRestart() throws Exception {
OzoneConfiguration conf = getConf();
conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
100, TimeUnit.MILLISECONDS);
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
UUID dnId = datanodeDetails.getUuid();
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
StorageReportProto report =
TestUtils.createStorageReport(dnId, storagePath, 100, 10, 90, null);
try (SCMNodeManager nodemanager = createNodeManager(conf)) {
nodemanager.register(datanodeDetails,
TestUtils.createNodeReport(report),
TestUtils.getRandomPipelineReports());
List<SCMCommand> command = nodemanager.processHeartbeat(datanodeDetails);
Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails));
Assert.assertTrue("On regular HB calls, SCM responses a "
+ "datanode with an empty command list", command.isEmpty());
}
// Sends heartbeat without registering to SCM.
// This happens when SCM restarts.
try (SCMNodeManager nodemanager = createNodeManager(conf)) {
Assert.assertFalse(nodemanager
.getAllNodes().contains(datanodeDetails));
try {
// SCM handles heartbeat asynchronously.
// It may need more than one heartbeat processing to
// send the notification.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
List<SCMCommand> command =
nodemanager.processHeartbeat(datanodeDetails);
return command.size() == 1 && command.get(0).getType()
.equals(SCMCommandProto.Type.reregisterCommand);
}
}, 100, 3 * 1000);
} catch (TimeoutException e) {
Assert.fail("Times out to verify that scm informs "
+ "datanode to re-register itself.");
}
}
}
/**
* Asserts that we detect as many healthy nodes as we have generated heartbeat
* for.

View File

@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.UUID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.
@ -39,6 +40,7 @@
.NodeReportFromDatanode;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.junit.Assert;
import org.junit.Test;
@ -61,8 +63,12 @@ public void testNodeReportDispatcher() throws IOException {
NodeReportProto nodeReport = NodeReportProto.getDefaultInstance();
NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
Mockito.when(mockNodeManager.isNodeRegistered(Mockito.any()))
.thenReturn(true);
SCMDatanodeHeartbeatDispatcher dispatcher =
new SCMDatanodeHeartbeatDispatcher(Mockito.mock(NodeManager.class),
new SCMDatanodeHeartbeatDispatcher(mockNodeManager,
new EventPublisher() {
@Override
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
@ -99,8 +105,13 @@ public void testContainerReportDispatcher() throws IOException {
CommandStatusReportsProto commandStatusReport =
CommandStatusReportsProto.getDefaultInstance();
NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
Mockito.when(mockNodeManager.isNodeRegistered(Mockito.any()))
.thenReturn(true);
SCMDatanodeHeartbeatDispatcher dispatcher =
new SCMDatanodeHeartbeatDispatcher(Mockito.mock(NodeManager.class),
new SCMDatanodeHeartbeatDispatcher(
mockNodeManager,
new EventPublisher() {
@Override
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
@ -135,4 +146,30 @@ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
}
/**
* Asserts scm informs datanodes to re-register on a restart.
*
* @throws Exception
*/
@Test
public void testScmHeartbeatAfterRestart() throws Exception {
NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
SCMDatanodeHeartbeatDispatcher dispatcher =
new SCMDatanodeHeartbeatDispatcher(
mockNodeManager, Mockito.mock(EventPublisher.class));
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
SCMHeartbeatRequestProto heartbeat =
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.build();
dispatcher.dispatch(heartbeat);
// If SCM receives heartbeat from a node after it restarts and the node
// is not registered, it should send a Re-Register command back to the node.
Mockito.verify(mockNodeManager, Mockito.times(1)).addDatanodeCommand(
Mockito.any(UUID.class), Mockito.any(ReregisterCommand.class));
}
}

View File

@ -295,6 +295,12 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails dd) {
return null;
}
@Override
public Boolean isNodeRegistered(
DatanodeDetails datanodeDetails) {
return null;
}
/**
* Clears all nodes from the node Manager.
*/
@ -341,4 +347,9 @@ public void onMessage(CommandForDatanode commandForDatanode,
public void processDeadNode(UUID dnUuid) {
// do nothing.
}
@Override
public List<SCMCommand> getCommandQueue(UUID dnID) {
return null;
}
}