diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index dec2c1c44a..8b449fbe19 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -17,7 +17,15 @@
*/
package org.apache.hadoop.hdds;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+/**
+ * Config class for HDDS.
+ */
public final class HddsConfigKeys {
private HddsConfigKeys() {
}
+ public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL =
+ "hdds.command.status.report.interval";
+ public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
+ ScmConfigKeys.OZONE_SCM_HEARBEAT_INTERVAL_DEFAULT;
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsIdFactory.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsIdFactory.java
new file mode 100644
index 0000000000..b244b8cf75
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsIdFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * HDDS Id generator.
+ */
+public final class HddsIdFactory {
+ private HddsIdFactory() {
+ }
+
+ private static final AtomicLong LONG_COUNTER = new AtomicLong(
+ System.currentTimeMillis());
+
+ /**
+ * Returns an incrementing long. This class doesn't
+ * persist initial value for long Id's, so incremental id's after restart
+ * may collide with previously generated Id's.
+ *
+ * @return long
+ */
+ public static long getLongId() {
+ return LONG_COUNTER.incrementAndGet();
+ }
+
+ /**
+ * Returns a uuid.
+ *
+ * @return UUID.
+ */
+ public static UUID getUUId() {
+ return UUID.randomUUID();
+ }
+
+}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index d5ce9e609d..1b6fb336c8 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1061,4 +1061,13 @@
+
+ hdds.command.status.report.interval
+ 30s
+ OZONE, DATANODE, MANAGEMENT
+ Time interval of the datanode to send status of commands
+ executed since last report. Unit could be defined with
+ postfix (ns,ms,s,m,h,d)
+
+
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestHddsIdFactory.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestHddsIdFactory.java
new file mode 100644
index 0000000000..a341ccc223
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestHddsIdFactory.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.hadoop.hdds.HddsIdFactory;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test the JMX interface for the rocksdb metastore implementation.
+ */
+public class TestHddsIdFactory {
+
+ private static final Set ID_SET = ConcurrentHashMap.newKeySet();
+ private static final int IDS_PER_THREAD = 10000;
+ private static final int NUM_OF_THREADS = 5;
+
+ @After
+ public void cleanup() {
+ ID_SET.clear();
+ }
+
+ @Test
+ public void testGetLongId() throws Exception {
+
+ ExecutorService executor = Executors.newFixedThreadPool(5);
+ List> tasks = new ArrayList<>(5);
+ addTasks(tasks);
+ List> result = executor.invokeAll(tasks);
+ assertEquals(IDS_PER_THREAD * NUM_OF_THREADS, ID_SET.size());
+ for (Future r : result) {
+ assertEquals(r.get().intValue(), IDS_PER_THREAD);
+ }
+ }
+
+ private void addTasks(List> tasks) {
+ for (int i = 0; i < NUM_OF_THREADS; i++) {
+ Callable task = () -> {
+ for (int idNum = 0; idNum < IDS_PER_THREAD; idNum++) {
+ long var = HddsIdFactory.getLongId();
+ if (ID_SET.contains(var)) {
+ Assert.fail("Duplicate id found");
+ }
+ ID_SET.add(var);
+ }
+ return IDS_PER_THREAD;
+ };
+ tasks.add(task);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
new file mode 100644
index 0000000000..ca5174a487
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.report;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
+
+/**
+ * Publishes CommandStatusReport which will be sent to SCM as part of
+ * heartbeat. CommandStatusReport consist of the following information:
+ * - type : type of command.
+ * - status : status of command execution (PENDING, EXECUTED, FAILURE).
+ * - cmdId : Command id.
+ * - msg : optional message.
+ */
+public class CommandStatusReportPublisher extends
+ ReportPublisher {
+
+ private long cmdStatusReportInterval = -1;
+
+ @Override
+ protected long getReportFrequency() {
+ if (cmdStatusReportInterval == -1) {
+ cmdStatusReportInterval = getConf().getTimeDuration(
+ HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL,
+ HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ }
+ return cmdStatusReportInterval;
+ }
+
+ @Override
+ protected CommandStatusReportsProto getReport() {
+ Map map = this.getContext()
+ .getCommandStatusMap();
+ Iterator iterator = map.keySet().iterator();
+ CommandStatusReportsProto.Builder builder = CommandStatusReportsProto
+ .newBuilder();
+
+ iterator.forEachRemaining(key -> {
+ CommandStatus cmdStatus = map.get(key);
+ builder.addCmdStatus(cmdStatus.getProtoBufMessage());
+ // If status is still pending then don't remove it from map as
+ // CommandHandler will change its status when it works on this command.
+ if (!cmdStatus.getStatus().equals(Status.PENDING)) {
+ map.remove(key);
+ }
+ });
+ return builder.build();
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
index 4ff47a0523..105f073e3f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
@@ -93,4 +93,13 @@ private void publishReport() {
*/
protected abstract T getReport();
+ /**
+ * Returns {@link StateContext}.
+ *
+ * @return stateContext report
+ */
+ protected StateContext getContext() {
+ return context;
+ }
+
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
index dc246d9428..ea89280729 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
@@ -19,6 +19,8 @@
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -49,6 +51,8 @@ public ReportPublisherFactory(Configuration conf) {
report2publisher.put(NodeReportProto.class, NodeReportPublisher.class);
report2publisher.put(ContainerReportsProto.class,
ContainerReportPublisher.class);
+ report2publisher.put(CommandStatusReportsProto.class,
+ CommandStatusReportPublisher.class);
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 245d76f0db..69a243e937 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -107,6 +108,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
.setStateContext(context)
.addPublisherFor(NodeReportProto.class)
.addPublisherFor(ContainerReportsProto.class)
+ .addPublisherFor(CommandStatusReportsProto.class)
.build();
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 98eb7a05f6..7ed30f86a9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -17,12 +17,17 @@
package org.apache.hadoop.ozone.container.common.statemachine;
import com.google.protobuf.GeneratedMessage;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +53,7 @@ public class StateContext {
static final Logger LOG =
LoggerFactory.getLogger(StateContext.class);
private final Queue commandQueue;
+ private final Map cmdStatusMap;
private final Lock lock;
private final DatanodeStateMachine parent;
private final AtomicLong stateExecutionCount;
@@ -68,6 +74,7 @@ public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates
this.state = state;
this.parent = parent;
commandQueue = new LinkedList<>();
+ cmdStatusMap = new ConcurrentHashMap<>();
reports = new LinkedList<>();
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
@@ -269,6 +276,7 @@ public void addCommand(SCMCommand command) {
} finally {
lock.unlock();
}
+ this.addCmdStatus(command);
}
/**
@@ -279,4 +287,66 @@ public long getExecutionCount() {
return stateExecutionCount.get();
}
+ /**
+ * Returns the next {@link CommandStatus} or null if it is empty.
+ *
+ * @return {@link CommandStatus} or Null.
+ */
+ public CommandStatus getCmdStatus(Long key) {
+ return cmdStatusMap.get(key);
+ }
+
+ /**
+ * Adds a {@link CommandStatus} to the State Machine.
+ *
+ * @param status - {@link CommandStatus}.
+ */
+ public void addCmdStatus(Long key, CommandStatus status) {
+ cmdStatusMap.put(key, status);
+ }
+
+ /**
+ * Adds a {@link CommandStatus} to the State Machine for given SCMCommand.
+ *
+ * @param cmd - {@link SCMCommand}.
+ */
+ public void addCmdStatus(SCMCommand cmd) {
+ this.addCmdStatus(cmd.getCmdId(),
+ CommandStatusBuilder.newBuilder()
+ .setCmdId(cmd.getCmdId())
+ .setStatus(Status.PENDING)
+ .setType(cmd.getType())
+ .build());
+ }
+
+ /**
+ * Get map holding all {@link CommandStatus} objects.
+ *
+ */
+ public Map getCommandStatusMap() {
+ return cmdStatusMap;
+ }
+
+ /**
+ * Remove object from cache in StateContext#cmdStatusMap.
+ *
+ */
+ public void removeCommandStatus(Long cmdId) {
+ cmdStatusMap.remove(cmdId);
+ }
+
+ /**
+ * Updates status of a pending status command.
+ * @param cmdId command id
+ * @param cmdExecuted SCMCommand
+ * @return true if command status updated successfully else false.
+ */
+ public boolean updateCommandStatus(Long cmdId, boolean cmdExecuted) {
+ if(cmdStatusMap.containsKey(cmdId)) {
+ cmdStatusMap.get(cmdId)
+ .setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED);
+ return true;
+ }
+ return false;
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 45f2bbd145..f58cbae343 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -41,6 +41,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
LoggerFactory.getLogger(CloseContainerCommandHandler.class);
private int invocationCount;
private long totalTime;
+ private boolean cmdExecuted;
/**
* Constructs a ContainerReport handler.
@@ -61,6 +62,7 @@ public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
LOG.debug("Processing Close Container command.");
invocationCount++;
+ cmdExecuted = false;
long startTime = Time.monotonicNow();
// TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
long containerID = -1;
@@ -88,10 +90,11 @@ public void handle(SCMCommand command, OzoneContainer container,
// submit the close container request for the XceiverServer to handle
container.submitContainerRequest(
request.build(), replicationType);
-
+ cmdExecuted = true;
} catch (Exception e) {
LOG.error("Can't close container " + containerID, e);
} finally {
+ updateCommandStatus(context, command, cmdExecuted, LOG);
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 60e2dc479d..20164193e1 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
/**
* Generic interface for handlers.
@@ -58,4 +59,14 @@ void handle(SCMCommand command, OzoneContainer container,
*/
long getAverageRunTime();
+ /**
+ * Default implementation for updating command status.
+ */
+ default void updateCommandStatus(StateContext context, SCMCommand command,
+ boolean cmdExecuted, Logger log) {
+ if (!context.updateCommandStatus(command.getCmdId(), cmdExecuted)) {
+ log.debug("{} with cmdId:{} not found.", command.getType(),
+ command.getCmdId());
+ }
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index c3d1596095..9640f93f1c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -21,7 +21,8 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+ .StorageContainerException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
@@ -54,7 +55,8 @@
import java.io.IOException;
import java.util.List;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .Result.CONTAINER_NOT_FOUND;
/**
* Handle block deletion commands.
@@ -68,6 +70,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
private final Configuration conf;
private int invocationCount;
private long totalTime;
+ private boolean cmdExecuted;
public DeleteBlocksCommandHandler(ContainerSet cset,
Configuration conf) {
@@ -78,93 +81,98 @@ public DeleteBlocksCommandHandler(ContainerSet cset,
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
- if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
- LOG.warn("Skipping handling command, expected command "
- + "type {} but found {}",
- SCMCommandProto.Type.deleteBlocksCommand, command.getType());
- return;
- }
- LOG.debug("Processing block deletion command.");
- invocationCount++;
+ cmdExecuted = false;
long startTime = Time.monotonicNow();
-
- // move blocks to deleting state.
- // this is a metadata update, the actual deletion happens in another
- // recycling thread.
- DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
- List containerBlocks = cmd.blocksTobeDeleted();
-
-
- DeletedContainerBlocksSummary summary =
- DeletedContainerBlocksSummary.getFrom(containerBlocks);
- LOG.info("Start to delete container blocks, TXIDs={}, "
- + "numOfContainers={}, numOfBlocks={}",
- summary.getTxIDSummary(),
- summary.getNumOfContainers(),
- summary.getNumOfBlocks());
-
- ContainerBlocksDeletionACKProto.Builder resultBuilder =
- ContainerBlocksDeletionACKProto.newBuilder();
- containerBlocks.forEach(entry -> {
- DeleteBlockTransactionResult.Builder txResultBuilder =
- DeleteBlockTransactionResult.newBuilder();
- txResultBuilder.setTxID(entry.getTxID());
- try {
- long containerId = entry.getContainerID();
- Container cont = containerSet.getContainer(containerId);
- if(cont == null) {
- throw new StorageContainerException("Unable to find the container "
- + containerId, CONTAINER_NOT_FOUND);
- }
- ContainerProtos.ContainerType containerType = cont.getContainerType();
- switch (containerType) {
- case KeyValueContainer:
- KeyValueContainerData containerData = (KeyValueContainerData)
- cont.getContainerData();
- deleteKeyValueContainerBlocks(containerData, entry);
- txResultBuilder.setSuccess(true);
- break;
- default:
- LOG.error("Delete Blocks Command Handler is not implemented for " +
- "containerType {}", containerType);
- }
- } catch (IOException e) {
- LOG.warn("Failed to delete blocks for container={}, TXID={}",
- entry.getContainerID(), entry.getTxID(), e);
- txResultBuilder.setSuccess(false);
+ try {
+ if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
+ LOG.warn("Skipping handling command, expected command "
+ + "type {} but found {}",
+ SCMCommandProto.Type.deleteBlocksCommand, command.getType());
+ return;
}
- resultBuilder.addResults(txResultBuilder.build());
- });
- ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
+ LOG.debug("Processing block deletion command.");
+ invocationCount++;
- // Send ACK back to SCM as long as meta updated
- // TODO Or we should wait until the blocks are actually deleted?
- if (!containerBlocks.isEmpty()) {
- for (EndpointStateMachine endPoint : connectionManager.getValues()) {
+ // move blocks to deleting state.
+ // this is a metadata update, the actual deletion happens in another
+ // recycling thread.
+ DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
+ List containerBlocks = cmd.blocksTobeDeleted();
+
+ DeletedContainerBlocksSummary summary =
+ DeletedContainerBlocksSummary.getFrom(containerBlocks);
+ LOG.info("Start to delete container blocks, TXIDs={}, "
+ + "numOfContainers={}, numOfBlocks={}",
+ summary.getTxIDSummary(),
+ summary.getNumOfContainers(),
+ summary.getNumOfBlocks());
+
+ ContainerBlocksDeletionACKProto.Builder resultBuilder =
+ ContainerBlocksDeletionACKProto.newBuilder();
+ containerBlocks.forEach(entry -> {
+ DeleteBlockTransactionResult.Builder txResultBuilder =
+ DeleteBlockTransactionResult.newBuilder();
+ txResultBuilder.setTxID(entry.getTxID());
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending following block deletion ACK to SCM");
- for (DeleteBlockTransactionResult result :
- blockDeletionACK.getResultsList()) {
- LOG.debug(result.getTxID() + " : " + result.getSuccess());
- }
+ long containerId = entry.getContainerID();
+ Container cont = containerSet.getContainer(containerId);
+ if (cont == null) {
+ throw new StorageContainerException("Unable to find the container "
+ + containerId, CONTAINER_NOT_FOUND);
+ }
+ ContainerProtos.ContainerType containerType = cont.getContainerType();
+ switch (containerType) {
+ case KeyValueContainer:
+ KeyValueContainerData containerData = (KeyValueContainerData)
+ cont.getContainerData();
+ deleteKeyValueContainerBlocks(containerData, entry);
+ txResultBuilder.setSuccess(true);
+ break;
+ default:
+ LOG.error(
+ "Delete Blocks Command Handler is not implemented for " +
+ "containerType {}", containerType);
}
- endPoint.getEndPoint()
- .sendContainerBlocksDeletionACK(blockDeletionACK);
} catch (IOException e) {
- LOG.error("Unable to send block deletion ACK to SCM {}",
- endPoint.getAddress().toString(), e);
+ LOG.warn("Failed to delete blocks for container={}, TXID={}",
+ entry.getContainerID(), entry.getTxID(), e);
+ txResultBuilder.setSuccess(false);
+ }
+ resultBuilder.addResults(txResultBuilder.build());
+ });
+ ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
+
+ // Send ACK back to SCM as long as meta updated
+ // TODO Or we should wait until the blocks are actually deleted?
+ if (!containerBlocks.isEmpty()) {
+ for (EndpointStateMachine endPoint : connectionManager.getValues()) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending following block deletion ACK to SCM");
+ for (DeleteBlockTransactionResult result :
+ blockDeletionACK.getResultsList()) {
+ LOG.debug(result.getTxID() + " : " + result.getSuccess());
+ }
+ }
+ endPoint.getEndPoint()
+ .sendContainerBlocksDeletionACK(blockDeletionACK);
+ } catch (IOException e) {
+ LOG.error("Unable to send block deletion ACK to SCM {}",
+ endPoint.getAddress().toString(), e);
+ }
}
}
+ cmdExecuted = true;
+ } finally {
+ updateCommandStatus(context, command, cmdExecuted, LOG);
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
}
-
- long endTime = Time.monotonicNow();
- totalTime += endTime - startTime;
}
/**
- * Move a bunch of blocks from a container to deleting state.
- * This is a meta update, the actual deletes happen in async mode.
+ * Move a bunch of blocks from a container to deleting state. This is a meta
+ * update, the actual deletes happen in async mode.
*
* @param containerData - KeyValueContainerData
* @param delTX a block deletion transaction.
@@ -222,7 +230,7 @@ private void deleteKeyValueContainerBlocks(
}
} else {
LOG.debug("Block {} not found or already under deletion in"
- + " container {}, skip deleting it.", blk, containerId);
+ + " container {}, skip deleting it.", blk, containerId);
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index b4e83b7d40..fe1d4e81af 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -39,12 +39,17 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
private int invocationCount;
private long totalTime;
+ private boolean cmdExecuted;
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
LOG.warn("Replicate command is not yet handled");
-
+ try {
+ cmdExecuted = true;
+ } finally {
+ updateCommandStatus(context, command, cmdExecuted, LOG);
+ }
}
@Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
index c7d8df5715..6b7c22cf9e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
@@ -1,19 +1,18 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
@@ -24,7 +23,6 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
-
/**
* Asks datanode to close a container.
*/
@@ -36,6 +34,15 @@ public class CloseContainerCommand
public CloseContainerCommand(long containerID,
HddsProtos.ReplicationType replicationType) {
+ super();
+ this.containerID = containerID;
+ this.replicationType = replicationType;
+ }
+
+ // Should be called only for protobuf conversion
+ private CloseContainerCommand(long containerID,
+ HddsProtos.ReplicationType replicationType, long cmdId) {
+ super(cmdId);
this.containerID = containerID;
this.replicationType = replicationType;
}
@@ -63,6 +70,7 @@ public byte[] getProtoBufMessage() {
public CloseContainerCommandProto getProto() {
return CloseContainerCommandProto.newBuilder()
.setContainerID(containerID)
+ .setCmdId(getCmdId())
.setReplicationType(replicationType).build();
}
@@ -70,8 +78,8 @@ public static CloseContainerCommand getFromProtobuf(
CloseContainerCommandProto closeContainerProto) {
Preconditions.checkNotNull(closeContainerProto);
return new CloseContainerCommand(closeContainerProto.getContainerID(),
- closeContainerProto.getReplicationType());
-
+ closeContainerProto.getReplicationType(), closeContainerProto
+ .getCmdId());
}
public long getContainerID() {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java
new file mode 100644
index 0000000000..bf9970097a
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+
+/**
+ * A class that is used to communicate status of datanode commands.
+ */
+public class CommandStatus {
+
+ private SCMCommandProto.Type type;
+ private Long cmdId;
+ private Status status;
+ private String msg;
+
+ public Type getType() {
+ return type;
+ }
+
+ public Long getCmdId() {
+ return cmdId;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ /**
+ * To allow change of status once commandStatus is initialized.
+ *
+ * @param status
+ */
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+
+ /**
+ * Returns a CommandStatus from the protocol buffers.
+ *
+ * @param cmdStatusProto - protoBuf Message
+ * @return CommandStatus
+ */
+ public CommandStatus getFromProtoBuf(
+ StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) {
+ return CommandStatusBuilder.newBuilder()
+ .setCmdId(cmdStatusProto.getCmdId())
+ .setStatus(cmdStatusProto.getStatus())
+ .setType(cmdStatusProto.getType())
+ .setMsg(cmdStatusProto.getMsg()).build();
+ }
+ /**
+ * Returns a CommandStatus from the protocol buffers.
+ *
+ * @return StorageContainerDatanodeProtocolProtos.CommandStatus
+ */
+ public StorageContainerDatanodeProtocolProtos.CommandStatus
+ getProtoBufMessage() {
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder =
+ StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder()
+ .setCmdId(this.getCmdId())
+ .setStatus(this.getStatus())
+ .setType(this.getType());
+ if (this.getMsg() != null) {
+ builder.setMsg(this.getMsg());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Builder class for CommandStatus.
+ */
+ public static final class CommandStatusBuilder {
+
+ private SCMCommandProto.Type type;
+ private Long cmdId;
+ private StorageContainerDatanodeProtocolProtos.CommandStatus.Status status;
+ private String msg;
+
+ private CommandStatusBuilder() {
+ }
+
+ public static CommandStatusBuilder newBuilder() {
+ return new CommandStatusBuilder();
+ }
+
+ public CommandStatusBuilder setType(Type type) {
+ this.type = type;
+ return this;
+ }
+
+ public CommandStatusBuilder setCmdId(Long cmdId) {
+ this.cmdId = cmdId;
+ return this;
+ }
+
+ public CommandStatusBuilder setStatus(Status status) {
+ this.status = status;
+ return this;
+ }
+
+ public CommandStatusBuilder setMsg(String msg) {
+ this.msg = msg;
+ return this;
+ }
+
+ public CommandStatus build() {
+ CommandStatus commandStatus = new CommandStatus();
+ commandStatus.type = this.type;
+ commandStatus.msg = this.msg;
+ commandStatus.status = this.status;
+ commandStatus.cmdId = this.cmdId;
+ return commandStatus;
+ }
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
index 4fa33f68b0..46af7941ca 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -36,6 +36,14 @@ public class DeleteBlocksCommand extends
public DeleteBlocksCommand(List blocks) {
+ super();
+ this.blocksTobeDeleted = blocks;
+ }
+
+ // Should be called only for protobuf conversion
+ private DeleteBlocksCommand(List blocks,
+ long cmdId) {
+ super(cmdId);
this.blocksTobeDeleted = blocks;
}
@@ -56,11 +64,12 @@ public byte[] getProtoBufMessage() {
public static DeleteBlocksCommand getFromProtobuf(
DeleteBlocksCommandProto deleteBlocksProto) {
return new DeleteBlocksCommand(deleteBlocksProto
- .getDeletedBlocksTransactionsList());
+ .getDeletedBlocksTransactionsList(), deleteBlocksProto.getCmdId());
}
public DeleteBlocksCommandProto getProto() {
return DeleteBlocksCommandProto.newBuilder()
+ .setCmdId(getCmdId())
.addAllDeletedBlocksTransactions(blocksTobeDeleted).build();
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
index 834318b145..e860c933d2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
@@ -30,7 +30,6 @@
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
import com.google.common.base.Preconditions;
@@ -41,11 +40,19 @@ public class ReplicateContainerCommand
extends SCMCommand {
private final long containerID;
-
private final List sourceDatanodes;
public ReplicateContainerCommand(long containerID,
List sourceDatanodes) {
+ super();
+ this.containerID = containerID;
+ this.sourceDatanodes = sourceDatanodes;
+ }
+
+ // Should be called only for protobuf conversion
+ public ReplicateContainerCommand(long containerID,
+ List sourceDatanodes, long cmdId) {
+ super(cmdId);
this.containerID = containerID;
this.sourceDatanodes = sourceDatanodes;
}
@@ -62,6 +69,7 @@ public byte[] getProtoBufMessage() {
public ReplicateContainerCommandProto getProto() {
Builder builder = ReplicateContainerCommandProto.newBuilder()
+ .setCmdId(getCmdId())
.setContainerID(containerID);
for (DatanodeDetails dd : sourceDatanodes) {
builder.addSources(dd.getProtoBufMessage());
@@ -75,12 +83,12 @@ public static ReplicateContainerCommand getFromProtobuf(
List datanodeDetails =
protoMessage.getSourcesList()
- .stream()
- .map(DatanodeDetails::getFromProtoBuf)
- .collect(Collectors.toList());
+ .stream()
+ .map(DatanodeDetails::getFromProtoBuf)
+ .collect(Collectors.toList());
return new ReplicateContainerCommand(protoMessage.getContainerID(),
- datanodeDetails);
+ datanodeDetails, protoMessage.getCmdId());
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
index 953e31a02e..d55710475a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
@@ -49,6 +49,16 @@ public byte[] getProtoBufMessage() {
return getProto().toByteArray();
}
+ /**
+ * Not implemented for ReregisterCommand.
+ *
+ * @return cmdId.
+ */
+ @Override
+ public long getCmdId() {
+ return 0;
+ }
+
public ReregisterCommandProto getProto() {
return ReregisterCommandProto
.newBuilder()
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
index 35ca802bee..6cda59176b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.protocol.commands;
import com.google.protobuf.GeneratedMessage;
+import org.apache.hadoop.hdds.HddsIdFactory;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
@@ -27,6 +28,15 @@
* @param
*/
public abstract class SCMCommand {
+ private long cmdId;
+
+ SCMCommand() {
+ this.cmdId = HddsIdFactory.getLongId();
+ }
+
+ SCMCommand(long cmdId) {
+ this.cmdId = cmdId;
+ }
/**
* Returns the type of this command.
* @return Type
@@ -38,4 +48,13 @@ public abstract class SCMCommand {
* @return A protobuf message.
*/
public abstract byte[] getProtoBufMessage();
+
+ /**
+ * Gets the commandId of this object.
+ * @return uuid.
+ */
+ public long getCmdId() {
+ return cmdId;
+ }
+
}
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 54230c1e9f..4238389a20 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -80,6 +80,7 @@ message SCMHeartbeatRequestProto {
optional NodeReportProto nodeReport = 2;
optional ContainerReportsProto containerReport = 3;
optional ContainerActionsProto containerActions = 4;
+ optional CommandStatusReportsProto commandStatusReport = 5;
}
/*
@@ -127,6 +128,22 @@ message ContainerReportsProto {
repeated ContainerInfo reports = 1;
}
+message CommandStatusReportsProto {
+ repeated CommandStatus cmdStatus = 1;
+}
+
+message CommandStatus {
+ enum Status {
+ PENDING = 1;
+ EXECUTED = 2;
+ FAILED = 3;
+ }
+ required int64 cmdId = 1;
+ required Status status = 2 [default = PENDING];
+ required SCMCommandProto.Type type = 3;
+ optional string msg = 4;
+}
+
message ContainerActionsProto {
repeated ContainerAction containerActions = 1;
}
@@ -193,6 +210,7 @@ message ReregisterCommandProto {}
// HB response from SCM, contains a list of block deletion transactions.
message DeleteBlocksCommandProto {
repeated DeletedBlocksTransaction deletedBlocksTransactions = 1;
+ required int64 cmdId = 3;
}
// The deleted blocks which are stored in deletedBlock.db of scm.
@@ -226,6 +244,7 @@ This command asks the datanode to close a specific container.
message CloseContainerCommandProto {
required int64 containerID = 1;
required hadoop.hdds.ReplicationType replicationType = 2;
+ required int64 cmdId = 3;
}
/**
@@ -233,6 +252,7 @@ This command asks the datanode to delete a specific container.
*/
message DeleteContainerCommandProto {
required int64 containerID = 1;
+ required int64 cmdId = 2;
}
/**
@@ -241,6 +261,7 @@ This command asks the datanode to replicate a container from specific sources.
message ReplicateContainerCommandProto {
required int64 containerID = 1;
repeated DatanodeDetailsProto sources = 2;
+ required int64 cmdId = 3;
}
/**
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index 8f4b0e3fb0..fb8e7c1d05 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -18,6 +18,8 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CommandStatus;
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
@@ -59,6 +61,9 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
private Map> nodeContainers =
new HashMap();
private Map nodeReports = new HashMap<>();
+ private AtomicInteger commandStatusReport = new AtomicInteger(0);
+ private List cmdStatusList = new LinkedList<>();
+ private List scmCommandRequests = new LinkedList<>();
/**
* Returns the number of heartbeats made to this class.
*
@@ -180,10 +185,12 @@ private void sleepIfNeeded() {
sendHeartbeat(SCMHeartbeatRequestProto heartbeat) throws IOException {
rpcCount.incrementAndGet();
heartbeatCount.incrementAndGet();
+ if(heartbeat.hasCommandStatusReport()){
+ cmdStatusList.addAll(heartbeat.getCommandStatusReport().getCmdStatusList());
+ commandStatusReport.incrementAndGet();
+ }
sleepIfNeeded();
- List
- cmdResponses = new LinkedList<>();
- return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
+ return SCMHeartbeatResponseProto.newBuilder().addAllCommands(scmCommandRequests)
.setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid())
.build();
}
@@ -302,4 +309,24 @@ public void reset() {
nodeContainers.clear();
}
+
+ public int getCommandStatusReportCount() {
+ return commandStatusReport.get();
+ }
+
+ public List getCmdStatusList() {
+ return cmdStatusList;
+ }
+
+ public List getScmCommandRequests() {
+ return scmCommandRequests;
+ }
+
+ public void clearScmCommandRequests() {
+ scmCommandRequests.clear();
+ }
+
+ public void addScmCommandRequest(SCMCommandProto scmCmd) {
+ scmCommandRequests.add(scmCmd);
+ }
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index 5fd9cf6047..026e7aadc7 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -20,18 +20,27 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessage;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsIdFactory;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -42,12 +51,20 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Test cases to test {@link ReportPublisher}.
*/
public class TestReportPublisher {
+ private static Configuration config;
+
+ @BeforeClass
+ public static void setup() {
+ config = new OzoneConfiguration();
+ }
+
/**
* Dummy report publisher for testing.
*/
@@ -93,9 +110,9 @@ public void testScheduledReport() throws InterruptedException {
.setNameFormat("Unit test ReportManager Thread - %d").build());
publisher.init(dummyContext, executorService);
Thread.sleep(150);
- Assert.assertEquals(1, ((DummyReportPublisher)publisher).getReportCount);
+ Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
Thread.sleep(150);
- Assert.assertEquals(2, ((DummyReportPublisher)publisher).getReportCount);
+ Assert.assertEquals(2, ((DummyReportPublisher) publisher).getReportCount);
executorService.shutdown();
}
@@ -110,11 +127,57 @@ public void testPublishReport() throws InterruptedException {
publisher.init(dummyContext, executorService);
Thread.sleep(150);
executorService.shutdown();
- Assert.assertEquals(1, ((DummyReportPublisher)publisher).getReportCount);
+ Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
verify(dummyContext, times(1)).addReport(null);
}
+ @Test
+ public void testCommandStatusPublisher() throws InterruptedException {
+ StateContext dummyContext = Mockito.mock(StateContext.class);
+ ReportPublisher publisher = new CommandStatusReportPublisher();
+ final Map cmdStatusMap = new ConcurrentHashMap<>();
+ when(dummyContext.getCommandStatusMap()).thenReturn(cmdStatusMap);
+ publisher.setConf(config);
+
+ ScheduledExecutorService executorService = HadoopExecutors
+ .newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Unit test ReportManager Thread - %d").build());
+ publisher.init(dummyContext, executorService);
+ Assert.assertEquals(0,
+ ((CommandStatusReportPublisher) publisher).getReport()
+ .getCmdStatusCount());
+
+ // Insert to status object to state context map and then get the report.
+ CommandStatus obj1 = CommandStatus.CommandStatusBuilder.newBuilder()
+ .setCmdId(HddsIdFactory.getLongId())
+ .setType(Type.deleteBlocksCommand)
+ .setStatus(Status.PENDING)
+ .build();
+ CommandStatus obj2 = CommandStatus.CommandStatusBuilder.newBuilder()
+ .setCmdId(HddsIdFactory.getLongId())
+ .setType(Type.closeContainerCommand)
+ .setStatus(Status.EXECUTED)
+ .build();
+ cmdStatusMap.put(obj1.getCmdId(), obj1);
+ cmdStatusMap.put(obj2.getCmdId(), obj2);
+ Assert.assertEquals("Should publish report with 2 status objects", 2,
+ ((CommandStatusReportPublisher) publisher).getReport()
+ .getCmdStatusCount());
+ Assert.assertEquals(
+ "Next report should have 1 status objects as command status o"
+ + "bjects are still in Pending state",
+ 1, ((CommandStatusReportPublisher) publisher).getReport()
+ .getCmdStatusCount());
+ Assert.assertTrue(
+ "Next report should have 1 status objects as command status "
+ + "objects are still in Pending state",
+ ((CommandStatusReportPublisher) publisher).getReport()
+ .getCmdStatusList().get(0).getStatus().equals(Status.PENDING));
+ executorService.shutdown();
+ }
+
@Test
public void testAddingReportToHeartbeat() {
Configuration conf = new OzoneConfiguration();
@@ -168,10 +231,10 @@ private static DatanodeDetails getDatanodeDetails() {
* Adds the report to heartbeat.
*
* @param requestBuilder builder to which the report has to be added.
- * @param report the report to be added.
+ * @param report the report to be added.
*/
- private static void addReport(SCMHeartbeatRequestProto.Builder requestBuilder,
- GeneratedMessage report) {
+ private static void addReport(SCMHeartbeatRequestProto.Builder
+ requestBuilder, GeneratedMessage report) {
String reportName = report.getDescriptorForType().getFullName();
for (Descriptors.FieldDescriptor descriptor :
SCMHeartbeatRequestProto.getDescriptor().getFields()) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 0afd675581..485b3f58a7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -21,8 +21,12 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+ .CommandStatusReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+ .ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+ .NodeReportFromDatanode;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.TypedEvent;
@@ -34,47 +38,54 @@
public final class SCMEvents {
/**
- * NodeReports are sent out by Datanodes. This report is
- * received by SCMDatanodeHeartbeatDispatcher and NodeReport Event is
- * generated.
+ * NodeReports are sent out by Datanodes. This report is received by
+ * SCMDatanodeHeartbeatDispatcher and NodeReport Event is generated.
*/
public static final TypedEvent NODE_REPORT =
new TypedEvent<>(NodeReportFromDatanode.class, "Node_Report");
/**
- * ContainerReports are send out by Datanodes. This report
- * is received by SCMDatanodeHeartbeatDispatcher and Container_Report Event
- * i generated.
+ * ContainerReports are send out by Datanodes. This report is received by
+ * SCMDatanodeHeartbeatDispatcher and Container_Report Event
+ * isTestSCMDatanodeHeartbeatDispatcher generated.
*/
public static final TypedEvent CONTAINER_REPORT =
new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report");
+ /**
+ * A Command status report will be sent by datanodes. This repoort is received
+ * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
+ */
+ public static final TypedEvent
+ CMD_STATUS_REPORT =
+ new TypedEvent<>(CommandStatusReportFromDatanode.class,
+ "Cmd_Status_Report");
+
/**
* When ever a command for the Datanode needs to be issued by any component
- * inside SCM, a Datanode_Command event is generated. NodeManager listens
- * to these events and dispatches them to Datanode for further processing.
+ * inside SCM, a Datanode_Command event is generated. NodeManager listens to
+ * these events and dispatches them to Datanode for further processing.
*/
public static final Event DATANODE_COMMAND =
new TypedEvent<>(CommandForDatanode.class, "Datanode_Command");
/**
- * A Close Container Event can be triggered under many condition.
- * Some of them are:
- * 1. A Container is full, then we stop writing further information to
- * that container. DN's let SCM know that current state and sends a
- * informational message that allows SCM to close the container.
- *
- * 2. If a pipeline is open; for example Ratis; if a single node fails,
- * we will proactively close these containers.
- *
- * Once a command is dispatched to DN, we will also listen to updates from
- * the datanode which lets us know that this command completed or timed out.
+ * A Close Container Event can be triggered under many condition. Some of them
+ * are: 1. A Container is full, then we stop writing further information to
+ * that container. DN's let SCM know that current state and sends a
+ * informational message that allows SCM to close the container.
+ *
+ * 2. If a pipeline is open; for example Ratis; if a single node fails, we
+ * will proactively close these containers.
+ *
+ * Once a command is dispatched to DN, we will also listen to updates from the
+ * datanode which lets us know that this command completed or timed out.
*/
public static final TypedEvent CLOSE_CONTAINER =
new TypedEvent<>(ContainerID.class, "Close_Container");
/**
- * This event will be triggered whenever a new datanode is
- * registered with SCM.
+ * This event will be triggered whenever a new datanode is registered with
+ * SCM.
*/
public static final TypedEvent NEW_NODE =
new TypedEvent<>(DatanodeDetails.class, "New_Node");
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 4cfa98fbd8..2461d375d1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -19,6 +19,8 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -37,7 +39,7 @@
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
-
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
/**
* This class is responsible for dispatching heartbeat from datanode to
* appropriate EventHandler at SCM.
@@ -86,6 +88,13 @@ public List dispatch(SCMHeartbeatRequestProto heartbeat) {
heartbeat.getContainerReport()));
}
+
+ if (heartbeat.hasCommandStatusReport()) {
+ eventPublisher.fireEvent(CMD_STATUS_REPORT,
+ new CommandStatusReportFromDatanode(datanodeDetails,
+ heartbeat.getCommandStatusReport()));
+ }
+
return commands;
}
@@ -136,4 +145,16 @@ public ContainerReportFromDatanode(DatanodeDetails datanodeDetails,
}
}
+ /**
+ * Container report event payload with origin.
+ */
+ public static class CommandStatusReportFromDatanode
+ extends ReportFromDatanode {
+
+ public CommandStatusReportFromDatanode(DatanodeDetails datanodeDetails,
+ CommandStatusReportsProto report) {
+ super(datanodeDetails, report);
+ }
+ }
+
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java
index 042e3ccfe1..1b79ebf3f9 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java
@@ -21,6 +21,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
+import org.apache.hadoop.hdds.scm.server.
+ SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -42,6 +46,7 @@
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
/**
* This class tests the behavior of SCMDatanodeHeartbeatDispatcher.
@@ -91,6 +96,8 @@ public void testContainerReportDispatcher() throws IOException {
ContainerReportsProto containerReport =
ContainerReportsProto.getDefaultInstance();
+ CommandStatusReportsProto commandStatusReport =
+ CommandStatusReportsProto.getDefaultInstance();
SCMDatanodeHeartbeatDispatcher dispatcher =
new SCMDatanodeHeartbeatDispatcher(Mockito.mock(NodeManager.class),
@@ -98,9 +105,18 @@ public void testContainerReportDispatcher() throws IOException {
@Override
public > void fireEvent(
EVENT_TYPE event, PAYLOAD payload) {
- Assert.assertEquals(event, CONTAINER_REPORT);
- Assert.assertEquals(containerReport,
- ((ContainerReportFromDatanode)payload).getReport());
+ Assert.assertTrue(
+ event.equals(CONTAINER_REPORT)
+ || event.equals(CMD_STATUS_REPORT));
+
+ if (payload instanceof ContainerReportFromDatanode) {
+ Assert.assertEquals(containerReport,
+ ((ContainerReportFromDatanode) payload).getReport());
+ }
+ if (payload instanceof CommandStatusReportFromDatanode) {
+ Assert.assertEquals(commandStatusReport,
+ ((CommandStatusReportFromDatanode) payload).getReport());
+ }
eventReceived.incrementAndGet();
}
});
@@ -111,9 +127,10 @@ public > void fireEvent(
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.setContainerReport(containerReport)
+ .setCommandStatusReport(commandStatusReport)
.build();
dispatcher.dispatch(heartbeat);
- Assert.assertEquals(1, eventReceived.get());
+ Assert.assertEquals(2, eventReceived.get());
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 9db9e802ec..be8bd8767f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -16,12 +16,29 @@
*/
package org.apache.hadoop.ozone.container.common;
+import java.util.Map;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.DeleteBlocksCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -54,6 +71,7 @@
import org.apache.hadoop.ozone.container.common.states.endpoint
.VersionEndpointTask;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Time;
import org.junit.AfterClass;
@@ -74,6 +92,9 @@
.createEndpoint;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.when;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
/**
* Tests the endpoints.
@@ -83,6 +104,7 @@ public class TestEndPoint {
private static RPC.Server scmServer;
private static ScmTestMock scmServerImpl;
private static File testDir;
+ private static Configuration config;
@AfterClass
public static void tearDown() throws Exception {
@@ -99,6 +121,12 @@ public static void setUp() throws Exception {
scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
scmServerImpl, serverAddress, 10);
testDir = PathUtils.getTestDir(TestEndPoint.class);
+ config = SCMTestUtils.getConf();
+ config.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
+ config.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ config
+ .setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
+ config.set(HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL,"1s");
}
@Test
@@ -312,7 +340,87 @@ public void testHeartbeat() throws Exception {
}
}
- private void heartbeatTaskHelper(InetSocketAddress scmAddress,
+ @Test
+ public void testHeartbeatWithCommandStatusReport() throws Exception {
+ DatanodeDetails dataNode = getDatanodeDetails();
+ try (EndpointStateMachine rpcEndPoint =
+ createEndpoint(SCMTestUtils.getConf(),
+ serverAddress, 1000)) {
+ String storageId = UUID.randomUUID().toString();
+ // Add some scmCommands for heartbeat response
+ addScmCommands();
+
+
+ SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder()
+ .setDatanodeDetails(dataNode.getProtoBufMessage())
+ .setNodeReport(TestUtils.createNodeReport(
+ getStorageReports(storageId)))
+ .build();
+
+ SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
+ .sendHeartbeat(request);
+ assertNotNull(responseProto);
+ assertEquals(3, responseProto.getCommandsCount());
+ assertEquals(0, scmServerImpl.getCommandStatusReportCount());
+
+ // Send heartbeat again from heartbeat endpoint task
+ final StateContext stateContext = heartbeatTaskHelper(serverAddress, 3000);
+ Map map = stateContext.getCommandStatusMap();
+ assertNotNull(map);
+ assertEquals("Should have 3 objects", 3, map.size());
+ assertTrue(map.containsKey(Long.valueOf(1)));
+ assertTrue(map.containsKey(Long.valueOf(2)));
+ assertTrue(map.containsKey(Long.valueOf(3)));
+ assertTrue(map.get(Long.valueOf(1)).getType()
+ .equals(Type.closeContainerCommand));
+ assertTrue(map.get(Long.valueOf(2)).getType()
+ .equals(Type.replicateContainerCommand));
+ assertTrue(
+ map.get(Long.valueOf(3)).getType().equals(Type.deleteBlocksCommand));
+ assertTrue(map.get(Long.valueOf(1)).getStatus().equals(Status.PENDING));
+ assertTrue(map.get(Long.valueOf(2)).getStatus().equals(Status.PENDING));
+ assertTrue(map.get(Long.valueOf(3)).getStatus().equals(Status.PENDING));
+
+ scmServerImpl.clearScmCommandRequests();
+ }
+ }
+
+ private void addScmCommands() {
+ SCMCommandProto closeCommand = SCMCommandProto.newBuilder()
+ .setCloseContainerCommandProto(
+ CloseContainerCommandProto.newBuilder().setCmdId(1)
+ .setContainerID(1)
+ .setReplicationType(ReplicationType.RATIS)
+ .build())
+ .setCommandType(Type.closeContainerCommand)
+ .build();
+ SCMCommandProto replicationCommand = SCMCommandProto.newBuilder()
+ .setReplicateContainerCommandProto(
+ ReplicateContainerCommandProto.newBuilder()
+ .setCmdId(2)
+ .setContainerID(2)
+ .build())
+ .setCommandType(Type.replicateContainerCommand)
+ .build();
+ SCMCommandProto deleteBlockCommand = SCMCommandProto.newBuilder()
+ .setDeleteBlocksCommandProto(
+ DeleteBlocksCommandProto.newBuilder()
+ .setCmdId(3)
+ .addDeletedBlocksTransactions(
+ DeletedBlocksTransaction.newBuilder()
+ .setContainerID(45)
+ .setCount(1)
+ .setTxID(23)
+ .build())
+ .build())
+ .setCommandType(Type.deleteBlocksCommand)
+ .build();
+ scmServerImpl.addScmCommandRequest(closeCommand);
+ scmServerImpl.addScmCommandRequest(deleteBlockCommand);
+ scmServerImpl.addScmCommandRequest(replicationCommand);
+ }
+
+ private StateContext heartbeatTaskHelper(InetSocketAddress scmAddress,
int rpcTimeout) throws Exception {
Configuration conf = SCMTestUtils.getConf();
conf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
@@ -344,6 +452,7 @@ private void heartbeatTaskHelper(InetSocketAddress scmAddress,
Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
rpcEndPoint.getState());
+ return stateContext;
}
}