diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
new file mode 100644
index 0000000000..9413a46021
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.command;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+ .CommandStatusReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Handles CommandStatusReports from datanode.
+ */
+public class CommandStatusReportHandler implements
+ EventHandler {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(CommandStatusReportHandler.class);
+
+ @Override
+ public void onMessage(CommandStatusReportFromDatanode report,
+ EventPublisher publisher) {
+ Preconditions.checkNotNull(report);
+ List cmdStatusList = report.getReport().getCmdStatusList();
+ Preconditions.checkNotNull(cmdStatusList);
+ LOGGER.trace("Processing command status report for dn: {}", report
+ .getDatanodeDetails());
+
+ // Route command status to its watchers.
+ cmdStatusList.forEach(cmdStatus -> {
+ LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus
+ .getCmdId(), cmdStatus.getType());
+ switch (cmdStatus.getType()) {
+ case replicateContainerCommand:
+ publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new
+ ReplicationStatus(cmdStatus));
+ break;
+ case closeContainerCommand:
+ publisher.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
+ CloseContainerStatus(cmdStatus));
+ break;
+ case deleteBlocksCommand:
+ publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new
+ DeleteBlockCommandStatus(cmdStatus));
+ break;
+ default:
+ LOGGER.debug("CommandStatus of type:{} not handled in " +
+ "CommandStatusReportHandler.", cmdStatus.getType());
+ break;
+ }
+ });
+ }
+
+ /**
+ * Wrapper event for CommandStatus.
+ */
+ public static class CommandStatusEvent implements IdentifiableEventPayload {
+ private CommandStatus cmdStatus;
+
+ CommandStatusEvent(CommandStatus cmdStatus) {
+ this.cmdStatus = cmdStatus;
+ }
+
+ public CommandStatus getCmdStatus() {
+ return cmdStatus;
+ }
+
+ @Override
+ public String toString() {
+ return "CommandStatusEvent:" + cmdStatus.toString();
+ }
+
+ @Override
+ public long getId() {
+ return cmdStatus.getCmdId();
+ }
+ }
+
+ /**
+ * Wrapper event for Replicate Command.
+ */
+ public static class ReplicationStatus extends CommandStatusEvent {
+ ReplicationStatus(CommandStatus cmdStatus) {
+ super(cmdStatus);
+ }
+ }
+
+ /**
+ * Wrapper event for CloseContainer Command.
+ */
+ public static class CloseContainerStatus extends CommandStatusEvent {
+ CloseContainerStatus(CommandStatus cmdStatus) {
+ super(cmdStatus);
+ }
+ }
+
+ /**
+ * Wrapper event for DeleteBlock Command.
+ */
+ public static class DeleteBlockCommandStatus extends CommandStatusEvent {
+ DeleteBlockCommandStatus(CommandStatus cmdStatus) {
+ super(cmdStatus);
+ }
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java
new file mode 100644
index 0000000000..ba17fb9eea
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ * This package contains HDDS protocol related classes.
+ */
+
+/**
+ * This package contains HDDS protocol related classes.
+ */
+package org.apache.hadoop.hdds.scm.command;
+/*
+ * Classes related to commands issued from SCM to DataNode.
+ * */
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 485b3f58a7..46f1588b04 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hdds.scm.events;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.*;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.CommandStatusReportFromDatanode;
@@ -104,6 +105,29 @@ public final class SCMEvents {
public static final TypedEvent DEAD_NODE =
new TypedEvent<>(DatanodeDetails.class, "Dead_Node");
+ /**
+ * This event will be triggered by CommandStatusReportHandler whenever a
+ * status for Replication SCMCommand is received.
+ */
+ public static final Event REPLICATION_STATUS = new
+ TypedEvent<>(ReplicationStatus.class, "ReplicateCommandStatus");
+ /**
+ * This event will be triggered by CommandStatusReportHandler whenever a
+ * status for CloseContainer SCMCommand is received.
+ */
+ public static final Event
+ CLOSE_CONTAINER_STATUS =
+ new TypedEvent<>(CloseContainerStatus.class,
+ "CloseContainerCommandStatus");
+ /**
+ * This event will be triggered by CommandStatusReportHandler whenever a
+ * status for DeleteBlock SCMCommand is received.
+ */
+ public static final Event
+ DELETE_BLOCK_STATUS =
+ new TypedEvent(DeleteBlockCommandStatus.class,
+ "DeleteBlockCommandStatus");
+
/**
* Private Ctor. Never Constructed.
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index f37a0ed6ff..aba6410174 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
@@ -191,6 +192,8 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
new NodeReportHandler(scmNodeManager);
ContainerReportHandler containerReportHandler =
new ContainerReportHandler(scmContainerManager, node2ContainerMap);
+ CommandStatusReportHandler cmdStatusReportHandler =
+ new CommandStatusReportHandler();
NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap);
@@ -202,6 +205,7 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
+ eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
.OZONE_ADMINISTRATORS);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 7568bf313b..8d7a2c2971 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -17,8 +17,14 @@
package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol
.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol
+ .proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol
+ .proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -90,6 +96,18 @@ public static List createStorageReport(long capacity,
return reportList;
}
+ /**
+ * Create Command Status report object.
+ * @return CommandStatusReportsProto
+ */
+ public static CommandStatusReportsProto createCommandStatusReport(
+ List reports) {
+ CommandStatusReportsProto.Builder report = CommandStatusReportsProto
+ .newBuilder();
+ report.addAllCmdStatus(reports);
+ return report.build();
+ }
+
/**
* Get specified number of DatanodeDetails and registered them with node
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
new file mode 100644
index 0000000000..5e64e57b88
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.command;
+
+import org.apache.hadoop.hdds.HddsIdFactory;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+ .CommandStatusReportFromDatanode;
+
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class TestCommandStatusReportHandler implements EventPublisher {
+
+ private static Logger LOG = LoggerFactory
+ .getLogger(TestCommandStatusReportHandler.class);
+ private CommandStatusReportHandler cmdStatusReportHandler;
+ private String storagePath = GenericTestUtils.getRandomizedTempPath()
+ .concat("/" + UUID.randomUUID().toString());
+ ;
+
+ @Before
+ public void setup() {
+ cmdStatusReportHandler = new CommandStatusReportHandler();
+ }
+
+ @Test
+ public void testCommandStatusReport() {
+ GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+ .captureLogs(LOG);
+
+ CommandStatusReportFromDatanode report = this.getStatusReport(Collections
+ .emptyList());
+ cmdStatusReportHandler.onMessage(report, this);
+ assertFalse(logCapturer.getOutput().contains("DeleteBlockCommandStatus"));
+ assertFalse(logCapturer.getOutput().contains
+ ("CloseContainerCommandStatus"));
+ assertFalse(logCapturer.getOutput().contains
+ ("ReplicateCommandStatus"));
+
+
+ report = this.getStatusReport(this.getCommandStatusList());
+ cmdStatusReportHandler.onMessage(report, this);
+ assertTrue(logCapturer.getOutput().contains("firing event of type " +
+ "DeleteBlockCommandStatus"));
+ assertTrue(logCapturer.getOutput().contains("firing event of type " +
+ "CloseContainerCommandStatus"));
+ assertTrue(logCapturer.getOutput().contains("firing event of type " +
+ "ReplicateCommandStatus"));
+
+ assertTrue(logCapturer.getOutput().contains("type: " +
+ "closeContainerCommand"));
+ assertTrue(logCapturer.getOutput().contains("type: " +
+ "deleteBlocksCommand"));
+ assertTrue(logCapturer.getOutput().contains("type: " +
+ "replicateContainerCommand"));
+
+ }
+
+ private CommandStatusReportFromDatanode getStatusReport(List
+ reports) {
+ CommandStatusReportsProto report = TestUtils.createCommandStatusReport
+ (reports);
+ DatanodeDetails dn = TestUtils.getDatanodeDetails();
+ return new SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode
+ (dn, report);
+ }
+
+ @Override
+ public > void fireEvent
+ (EVENT_TYPE event, PAYLOAD payload) {
+ LOG.info("firing event of type {}, payload {}", event.getName(), payload
+ .toString());
+ }
+
+ private List getCommandStatusList() {
+ List reports = new ArrayList<>(3);
+
+ // Add status message for replication, close container and delete block
+ // command.
+ CommandStatus.Builder builder = CommandStatus.newBuilder();
+
+ builder.setCmdId(HddsIdFactory.getLongId())
+ .setStatus(CommandStatus.Status.EXECUTED)
+ .setType(Type.deleteBlocksCommand);
+ reports.add(builder.build());
+
+ builder.setCmdId(HddsIdFactory.getLongId())
+ .setStatus(CommandStatus.Status.EXECUTED)
+ .setType(Type.closeContainerCommand);
+ reports.add(builder.build());
+
+ builder.setMsg("Not enough space")
+ .setCmdId(HddsIdFactory.getLongId())
+ .setStatus(CommandStatus.Status.FAILED)
+ .setType(Type.replicateContainerCommand);
+ reports.add(builder.build());
+ return reports;
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/package-info.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/package-info.java
new file mode 100644
index 0000000000..f529c20e74
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * Make CheckStyle Happy.
+ */
+package org.apache.hadoop.hdds.scm.command;
\ No newline at end of file