diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 0283615a19..fd4bf08c69 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -48,4 +48,10 @@ private HddsConfigKeys() {
"hdds.command.status.report.interval";
public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
"60s";
+
+ public static final String HDDS_CONTAINER_ACTION_MAX_LIMIT =
+ "hdds.container.action.max.limit";
+ public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT =
+ 20;
+
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 69a382a013..84a3e0c230 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1098,4 +1098,14 @@
+
+ hdds.container.action.max.limit
+ 20
+ DATANODE
+
+ Maximum number of Container Actions sent by the datanode to SCM in a
+ single heartbeat.
+
+
+
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index faaff69b8d..7862cc6236 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -20,14 +20,18 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
-import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus
+ .CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +47,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
@@ -59,6 +64,7 @@ public class StateContext {
private final AtomicLong stateExecutionCount;
private final Configuration conf;
private final Queue reports;
+ private final Queue containerActions;
private DatanodeStateMachine.DatanodeStates state;
/**
@@ -76,6 +82,7 @@ public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates
commandQueue = new LinkedList<>();
cmdStatusMap = new ConcurrentHashMap<>();
reports = new LinkedList<>();
+ containerActions = new LinkedList<>();
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
}
@@ -187,15 +194,45 @@ public List getAllAvailableReports() {
* @return List
*/
public List getReports(int maxLimit) {
- List results = new ArrayList<>();
synchronized (reports) {
- GeneratedMessage report = reports.poll();
- while(results.size() < maxLimit && report != null) {
- results.add(report);
- report = reports.poll();
- }
+ return reports.parallelStream().limit(maxLimit)
+ .collect(Collectors.toList());
+ }
+ }
+
+
+ /**
+ * Adds the ContainerAction to ContainerAction queue.
+ *
+ * @param containerAction ContainerAction to be added
+ */
+ public void addContainerAction(ContainerAction containerAction) {
+ synchronized (containerActions) {
+ containerActions.add(containerAction);
+ }
+ }
+
+ /**
+ * Returns all the pending ContainerActions from the ContainerAction queue,
+ * or empty list if the queue is empty.
+ *
+ * @return List
+ */
+ public List getAllPendingContainerActions() {
+ return getPendingContainerAction(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Returns pending ContainerActions from the ContainerAction queue with a
+ * max limit on list size, or empty list if the queue is empty.
+ *
+ * @return List
+ */
+ public List getPendingContainerAction(int maxLimit) {
+ synchronized (containerActions) {
+ return containerActions.parallelStream().limit(maxLimit)
+ .collect(Collectors.toList());
}
- return results;
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 260a245ceb..020fb71426 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -24,6 +24,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -46,8 +50,14 @@
import java.io.IOException;
import java.time.ZonedDateTime;
+import java.util.List;
import java.util.concurrent.Callable;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_CONTAINER_ACTION_MAX_LIMIT;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT;
+
/**
* Heartbeat class for SCMs.
*/
@@ -59,6 +69,7 @@ public class HeartbeatEndpointTask
private final Configuration conf;
private DatanodeDetailsProto datanodeDetailsProto;
private StateContext context;
+ private int maxContainerActionsPerHB;
/**
* Constructs a SCM heart beat.
@@ -70,6 +81,8 @@ public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
this.rpcEndpoint = rpcEndpoint;
this.conf = conf;
this.context = context;
+ this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT,
+ HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT);
}
/**
@@ -107,7 +120,7 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetailsProto);
addReports(requestBuilder);
-
+ addContainerActions(requestBuilder);
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
.sendHeartbeat(requestBuilder.build());
processResponse(reponse, datanodeDetailsProto);
@@ -139,6 +152,24 @@ private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
}
}
+ /**
+ * Adds all the pending ContainerActions to the heartbeat.
+ *
+ * @param requestBuilder builder to which the report has to be added.
+ */
+ private void addContainerActions(
+ SCMHeartbeatRequestProto.Builder requestBuilder) {
+ List actions = context.getPendingContainerAction(
+ maxContainerActionsPerHB);
+ if (!actions.isEmpty()) {
+ ContainerActionsProto cap = ContainerActionsProto.newBuilder()
+ .addAllContainerActions(actions)
+ .build();
+ requestBuilder.setContainerActions(cap);
+ }
+ }
+
+
/**
* Returns a builder class for HeartbeatEndpointTask task.
* @return Builder.
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 4238389a20..d89567b5d9 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -79,8 +79,8 @@ message SCMHeartbeatRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
optional NodeReportProto nodeReport = 2;
optional ContainerReportsProto containerReport = 3;
- optional ContainerActionsProto containerActions = 4;
- optional CommandStatusReportsProto commandStatusReport = 5;
+ optional CommandStatusReportsProto commandStatusReport = 4;
+ optional ContainerActionsProto containerActions = 5;
}
/*
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index a0db2e8aa6..811599f01a 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.container.common.report;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,14 +27,8 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -178,22 +171,6 @@ public void testCommandStatusPublisher() throws InterruptedException {
executorService.shutdown();
}
- @Test
- public void testAddingReportToHeartbeat() {
- GeneratedMessage nodeReport = NodeReportProto.getDefaultInstance();
- GeneratedMessage containerReport = ContainerReportsProto
- .getDefaultInstance();
- SCMHeartbeatRequestProto.Builder heartbeatBuilder =
- SCMHeartbeatRequestProto.newBuilder();
- heartbeatBuilder.setDatanodeDetails(
- getDatanodeDetails().getProtoBufMessage());
- addReport(heartbeatBuilder, nodeReport);
- addReport(heartbeatBuilder, containerReport);
- SCMHeartbeatRequestProto heartbeat = heartbeatBuilder.build();
- Assert.assertTrue(heartbeat.hasNodeReport());
- Assert.assertTrue(heartbeat.hasContainerReport());
- }
-
/**
* Get a datanode details.
*
@@ -222,22 +199,4 @@ private static DatanodeDetails getDatanodeDetails() {
return builder.build();
}
- /**
- * Adds the report to heartbeat.
- *
- * @param requestBuilder builder to which the report has to be added.
- * @param report the report to be added.
- */
- private static void addReport(SCMHeartbeatRequestProto.Builder
- requestBuilder, GeneratedMessage report) {
- String reportName = report.getDescriptorForType().getFullName();
- for (Descriptors.FieldDescriptor descriptor :
- SCMHeartbeatRequestProto.getDescriptor().getFields()) {
- String heartbeatFieldName = descriptor.getMessageType().getFullName();
- if (heartbeatFieldName.equals(reportName)) {
- requestBuilder.setField(descriptor, report);
- }
- }
- }
-
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
new file mode 100644
index 0000000000..b4d718d4ff
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .DatanodeStateMachine.DatanodeStates;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocolPB
+ .StorageContainerDatanodeProtocolClientSideTranslatorPB;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.UUID;
+
+/**
+ * This class tests the functionality of HeartbeatEndpointTask.
+ */
+public class TestHeartbeatEndpointTask {
+
+
+ @Test
+ public void testheartbeatWithoutReports() throws Exception {
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ ArgumentCaptor argument = ArgumentCaptor
+ .forClass(SCMHeartbeatRequestProto.class);
+ Mockito.when(scm.sendHeartbeat(argument.capture()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .build());
+
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm);
+ endpointTask.call();
+ SCMHeartbeatRequestProto heartbeat = argument.getValue();
+ Assert.assertTrue(heartbeat.hasDatanodeDetails());
+ Assert.assertFalse(heartbeat.hasNodeReport());
+ Assert.assertFalse(heartbeat.hasContainerReport());
+ Assert.assertFalse(heartbeat.hasCommandStatusReport());
+ Assert.assertFalse(heartbeat.hasContainerActions());
+ }
+
+ @Test
+ public void testheartbeatWithNodeReports() throws Exception {
+ Configuration conf = new OzoneConfiguration();
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ Mockito.mock(DatanodeStateMachine.class));
+
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ ArgumentCaptor argument = ArgumentCaptor
+ .forClass(SCMHeartbeatRequestProto.class);
+ Mockito.when(scm.sendHeartbeat(argument.capture()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .build());
+
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+ conf, context, scm);
+ context.addReport(NodeReportProto.getDefaultInstance());
+ endpointTask.call();
+ SCMHeartbeatRequestProto heartbeat = argument.getValue();
+ Assert.assertTrue(heartbeat.hasDatanodeDetails());
+ Assert.assertTrue(heartbeat.hasNodeReport());
+ Assert.assertFalse(heartbeat.hasContainerReport());
+ Assert.assertFalse(heartbeat.hasCommandStatusReport());
+ Assert.assertFalse(heartbeat.hasContainerActions());
+ }
+
+ @Test
+ public void testheartbeatWithContainerReports() throws Exception {
+ Configuration conf = new OzoneConfiguration();
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ Mockito.mock(DatanodeStateMachine.class));
+
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ ArgumentCaptor argument = ArgumentCaptor
+ .forClass(SCMHeartbeatRequestProto.class);
+ Mockito.when(scm.sendHeartbeat(argument.capture()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .build());
+
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+ conf, context, scm);
+ context.addReport(ContainerReportsProto.getDefaultInstance());
+ endpointTask.call();
+ SCMHeartbeatRequestProto heartbeat = argument.getValue();
+ Assert.assertTrue(heartbeat.hasDatanodeDetails());
+ Assert.assertFalse(heartbeat.hasNodeReport());
+ Assert.assertTrue(heartbeat.hasContainerReport());
+ Assert.assertFalse(heartbeat.hasCommandStatusReport());
+ Assert.assertFalse(heartbeat.hasContainerActions());
+ }
+
+ @Test
+ public void testheartbeatWithCommandStatusReports() throws Exception {
+ Configuration conf = new OzoneConfiguration();
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ Mockito.mock(DatanodeStateMachine.class));
+
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ ArgumentCaptor argument = ArgumentCaptor
+ .forClass(SCMHeartbeatRequestProto.class);
+ Mockito.when(scm.sendHeartbeat(argument.capture()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .build());
+
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+ conf, context, scm);
+ context.addReport(CommandStatusReportsProto.getDefaultInstance());
+ endpointTask.call();
+ SCMHeartbeatRequestProto heartbeat = argument.getValue();
+ Assert.assertTrue(heartbeat.hasDatanodeDetails());
+ Assert.assertFalse(heartbeat.hasNodeReport());
+ Assert.assertFalse(heartbeat.hasContainerReport());
+ Assert.assertTrue(heartbeat.hasCommandStatusReport());
+ Assert.assertFalse(heartbeat.hasContainerActions());
+ }
+
+ @Test
+ public void testheartbeatWithContainerActions() throws Exception {
+ Configuration conf = new OzoneConfiguration();
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ Mockito.mock(DatanodeStateMachine.class));
+
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ ArgumentCaptor argument = ArgumentCaptor
+ .forClass(SCMHeartbeatRequestProto.class);
+ Mockito.when(scm.sendHeartbeat(argument.capture()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .build());
+
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+ conf, context, scm);
+ context.addContainerAction(getContainerAction());
+ endpointTask.call();
+ SCMHeartbeatRequestProto heartbeat = argument.getValue();
+ Assert.assertTrue(heartbeat.hasDatanodeDetails());
+ Assert.assertFalse(heartbeat.hasNodeReport());
+ Assert.assertFalse(heartbeat.hasContainerReport());
+ Assert.assertFalse(heartbeat.hasCommandStatusReport());
+ Assert.assertTrue(heartbeat.hasContainerActions());
+ }
+
+ @Test
+ public void testheartbeatWithAllReports() throws Exception {
+ Configuration conf = new OzoneConfiguration();
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ Mockito.mock(DatanodeStateMachine.class));
+
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ ArgumentCaptor argument = ArgumentCaptor
+ .forClass(SCMHeartbeatRequestProto.class);
+ Mockito.when(scm.sendHeartbeat(argument.capture()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .build());
+
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+ conf, context, scm);
+ context.addReport(NodeReportProto.getDefaultInstance());
+ context.addReport(ContainerReportsProto.getDefaultInstance());
+ context.addReport(CommandStatusReportsProto.getDefaultInstance());
+ context.addContainerAction(getContainerAction());
+ endpointTask.call();
+ SCMHeartbeatRequestProto heartbeat = argument.getValue();
+ Assert.assertTrue(heartbeat.hasDatanodeDetails());
+ Assert.assertTrue(heartbeat.hasNodeReport());
+ Assert.assertTrue(heartbeat.hasContainerReport());
+ Assert.assertTrue(heartbeat.hasCommandStatusReport());
+ Assert.assertTrue(heartbeat.hasContainerActions());
+ }
+
+ /**
+ * Creates HeartbeatEndpointTask for the given StorageContainerManager proxy.
+ *
+ * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
+ *
+ * @return HeartbeatEndpointTask
+ */
+ private HeartbeatEndpointTask getHeartbeatEndpointTask(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
+ Configuration conf = new OzoneConfiguration();
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ Mockito.mock(DatanodeStateMachine.class));
+ return getHeartbeatEndpointTask(conf, context, proxy);
+
+ }
+
+ /**
+ * Creates HeartbeatEndpointTask with the given conf, context and
+ * StorageContainerManager client side proxy.
+ *
+ * @param conf Configuration
+ * @param context StateContext
+ * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
+ *
+ * @return HeartbeatEndpointTask
+ */
+ private HeartbeatEndpointTask getHeartbeatEndpointTask(
+ Configuration conf,
+ StateContext context,
+ StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
+ DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
+ .setUuid(UUID.randomUUID().toString())
+ .setHostName("localhost")
+ .setIpAddress("127.0.0.1")
+ .build();
+ EndpointStateMachine endpointStateMachine = Mockito
+ .mock(EndpointStateMachine.class);
+ Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy);
+ return HeartbeatEndpointTask.newBuilder()
+ .setConfig(conf)
+ .setDatanodeDetails(datanodeDetails)
+ .setContext(context)
+ .setEndpointStateMachine(endpointStateMachine)
+ .build();
+ }
+
+ private ContainerAction getContainerAction() {
+ ContainerAction.Builder builder = ContainerAction.newBuilder();
+ ContainerInfo containerInfo = ContainerInfo.newBuilder()
+ .setContainerID(1L)
+ .build();
+ builder.setContainer(containerInfo)
+ .setAction(ContainerAction.Action.CLOSE)
+ .setReason(ContainerAction.Reason.CONTAINER_FULL);
+ return builder.build();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
new file mode 100644
index 0000000000..d120a5cd4b
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;