From 7c368575a319f5ba98019418166524bac982086f Mon Sep 17 00:00:00 2001 From: Mukul Kumar Singh Date: Thu, 2 Aug 2018 17:34:17 +0530 Subject: [PATCH] HDDS-304. Process ContainerAction from datanode heartbeat in SCM. Contributed by Nanda Kumar. --- .../container/ContainerActionsHandler.java | 60 ++++++++++++++++ .../hadoop/hdds/scm/events/SCMEvents.java | 16 ++++- .../SCMDatanodeHeartbeatDispatcher.java | 22 ++++++ .../scm/server/StorageContainerManager.java | 3 + .../TestContainerActionsHandler.java | 68 +++++++++++++++++++ 5 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java create mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerActionsHandler.java diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java new file mode 100644 index 0000000000..ce399eb89b --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .ContainerActionsFromDatanode; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles container reports from datanode. + */ +public class ContainerActionsHandler implements + EventHandler { + + private static final Logger LOG = LoggerFactory.getLogger( + ContainerActionsHandler.class); + + @Override + public void onMessage( + ContainerActionsFromDatanode containerReportFromDatanode, + EventPublisher publisher) { + DatanodeDetails dd = containerReportFromDatanode.getDatanodeDetails(); + for (ContainerAction action : containerReportFromDatanode.getReport() + .getContainerActionsList()) { + ContainerID containerId = ContainerID.valueof(action.getContainerID()); + switch (action.getAction()) { + case CLOSE: + LOG.debug("Closing container {} in datanode {} because the" + + " container is {}.", containerId, dd, action.getReason()); + publisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerId); + break; + default: + LOG.warn("Invalid action {} with reason {}, from datanode {}. ", + action.getAction(), action.getReason(), dd); } + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index ad1702bcc4..d49dd4fe5c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -20,8 +20,15 @@ package org.apache.hadoop.hdds.scm.events; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.*; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler + .CloseContainerStatus; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler + .DeleteBlockCommandStatus; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler + .ReplicationStatus; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .ContainerActionsFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .CommandStatusReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher @@ -56,6 +63,13 @@ public final class SCMEvents { public static final TypedEvent CONTAINER_REPORT = new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report"); + /** + * ContainerActions are sent by Datanode. This event is received by + * SCMDatanodeHeartbeatDispatcher and CONTAINER_ACTIONS event is generated. + */ + public static final TypedEvent + CONTAINER_ACTIONS = new TypedEvent<>(ContainerActionsFromDatanode.class, + "Container_Actions"); /** * A Command status report will be sent by datanodes. This repoort is received * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index 2461d375d1..c2591418e9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -19,6 +19,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerActionsProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto @@ -37,6 +39,7 @@ import java.util.List; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT; @@ -89,6 +92,13 @@ public List dispatch(SCMHeartbeatRequestProto heartbeat) { } + if (heartbeat.hasContainerActions()) { + LOG.debug("Dispatching Container Actions."); + eventPublisher.fireEvent(CONTAINER_ACTIONS, + new ContainerActionsFromDatanode(datanodeDetails, + heartbeat.getContainerActions())); + } + if (heartbeat.hasCommandStatusReport()) { eventPublisher.fireEvent(CMD_STATUS_REPORT, new CommandStatusReportFromDatanode(datanodeDetails, @@ -145,6 +155,18 @@ public ContainerReportFromDatanode(DatanodeDetails datanodeDetails, } } + /** + * Container action event payload with origin. + */ + public static class ContainerActionsFromDatanode + extends ReportFromDatanode { + + public ContainerActionsFromDatanode(DatanodeDetails datanodeDetails, + ContainerActionsProto actions) { + super(datanodeDetails, actions); + } + } + /** * Container report event payload with origin. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index be8fb43e09..9cb13181d3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; +import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.Mapping; @@ -209,10 +210,12 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap); StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap); + ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler); + eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler); eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler); eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler); eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerActionsHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerActionsHandler.java new file mode 100644 index 0000000000..0997e1f5bc --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerActionsHandler.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerActionsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerActionsFromDatanode; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests ContainerActionsHandler. + */ +public class TestContainerActionsHandler { + + @Test + public void testCloseContainerAction() { + EventQueue queue = new EventQueue(); + ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); + CloseContainerEventHandler closeContainerEventHandler = Mockito.mock( + CloseContainerEventHandler.class); + queue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerEventHandler); + queue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler); + + ContainerAction action = ContainerAction.newBuilder() + .setContainerID(1L) + .setAction(ContainerAction.Action.CLOSE) + .setReason(ContainerAction.Reason.CONTAINER_FULL) + .build(); + + ContainerActionsProto cap = ContainerActionsProto.newBuilder() + .addContainerActions(action) + .build(); + + ContainerActionsFromDatanode containerActions = + new ContainerActionsFromDatanode( + TestUtils.randomDatanodeDetails(), cap); + + queue.fireEvent(SCMEvents.CONTAINER_ACTIONS, containerActions); + + verify(closeContainerEventHandler, times(1)) + .onMessage(ContainerID.valueof(1L), queue); + + } + +} \ No newline at end of file