HDDS-260. Support in Datanode for sending ContainerActions to SCM. Contributed by Nanda kumar.
This commit is contained in:
parent
9be25e3476
commit
347c955013
@ -48,4 +48,10 @@ private HddsConfigKeys() {
|
||||
"hdds.command.status.report.interval";
|
||||
public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
|
||||
"60s";
|
||||
|
||||
public static final String HDDS_CONTAINER_ACTION_MAX_LIMIT =
|
||||
"hdds.container.action.max.limit";
|
||||
public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT =
|
||||
20;
|
||||
|
||||
}
|
||||
|
@ -1098,4 +1098,14 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hdds.container.action.max.limit</name>
|
||||
<value>20</value>
|
||||
<tag>DATANODE</tag>
|
||||
<description>
|
||||
Maximum number of Container Actions sent by the datanode to SCM in a
|
||||
single heartbeat.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
@ -20,14 +20,18 @@
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerAction;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
|
||||
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
|
||||
import org.apache.hadoop.ozone.container.common.states.datanode
|
||||
.InitDatanodeState;
|
||||
import org.apache.hadoop.ozone.container.common.states.datanode
|
||||
.RunningDatanodeState;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CommandStatus
|
||||
.CommandStatusBuilder;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -43,6 +47,7 @@
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
|
||||
|
||||
@ -59,6 +64,7 @@ public class StateContext {
|
||||
private final AtomicLong stateExecutionCount;
|
||||
private final Configuration conf;
|
||||
private final Queue<GeneratedMessage> reports;
|
||||
private final Queue<ContainerAction> containerActions;
|
||||
private DatanodeStateMachine.DatanodeStates state;
|
||||
|
||||
/**
|
||||
@ -76,6 +82,7 @@ public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates
|
||||
commandQueue = new LinkedList<>();
|
||||
cmdStatusMap = new ConcurrentHashMap<>();
|
||||
reports = new LinkedList<>();
|
||||
containerActions = new LinkedList<>();
|
||||
lock = new ReentrantLock();
|
||||
stateExecutionCount = new AtomicLong(0);
|
||||
}
|
||||
@ -187,15 +194,45 @@ public List<GeneratedMessage> getAllAvailableReports() {
|
||||
* @return List<reports>
|
||||
*/
|
||||
public List<GeneratedMessage> getReports(int maxLimit) {
|
||||
List<GeneratedMessage> results = new ArrayList<>();
|
||||
synchronized (reports) {
|
||||
GeneratedMessage report = reports.poll();
|
||||
while(results.size() < maxLimit && report != null) {
|
||||
results.add(report);
|
||||
report = reports.poll();
|
||||
}
|
||||
return reports.parallelStream().limit(maxLimit)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Adds the ContainerAction to ContainerAction queue.
|
||||
*
|
||||
* @param containerAction ContainerAction to be added
|
||||
*/
|
||||
public void addContainerAction(ContainerAction containerAction) {
|
||||
synchronized (containerActions) {
|
||||
containerActions.add(containerAction);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all the pending ContainerActions from the ContainerAction queue,
|
||||
* or empty list if the queue is empty.
|
||||
*
|
||||
* @return List<ContainerAction>
|
||||
*/
|
||||
public List<ContainerAction> getAllPendingContainerActions() {
|
||||
return getPendingContainerAction(Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns pending ContainerActions from the ContainerAction queue with a
|
||||
* max limit on list size, or empty list if the queue is empty.
|
||||
*
|
||||
* @return List<ContainerAction>
|
||||
*/
|
||||
public List<ContainerAction> getPendingContainerAction(int maxLimit) {
|
||||
synchronized (containerActions) {
|
||||
return containerActions.parallelStream().limit(maxLimit)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -24,6 +24,10 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerAction;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
@ -46,8 +50,14 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys
|
||||
.HDDS_CONTAINER_ACTION_MAX_LIMIT;
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys
|
||||
.HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT;
|
||||
|
||||
/**
|
||||
* Heartbeat class for SCMs.
|
||||
*/
|
||||
@ -59,6 +69,7 @@ public class HeartbeatEndpointTask
|
||||
private final Configuration conf;
|
||||
private DatanodeDetailsProto datanodeDetailsProto;
|
||||
private StateContext context;
|
||||
private int maxContainerActionsPerHB;
|
||||
|
||||
/**
|
||||
* Constructs a SCM heart beat.
|
||||
@ -70,6 +81,8 @@ public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
|
||||
this.rpcEndpoint = rpcEndpoint;
|
||||
this.conf = conf;
|
||||
this.context = context;
|
||||
this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT,
|
||||
HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -107,7 +120,7 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
|
||||
SCMHeartbeatRequestProto.newBuilder()
|
||||
.setDatanodeDetails(datanodeDetailsProto);
|
||||
addReports(requestBuilder);
|
||||
|
||||
addContainerActions(requestBuilder);
|
||||
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
|
||||
.sendHeartbeat(requestBuilder.build());
|
||||
processResponse(reponse, datanodeDetailsProto);
|
||||
@ -139,6 +152,24 @@ private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds all the pending ContainerActions to the heartbeat.
|
||||
*
|
||||
* @param requestBuilder builder to which the report has to be added.
|
||||
*/
|
||||
private void addContainerActions(
|
||||
SCMHeartbeatRequestProto.Builder requestBuilder) {
|
||||
List<ContainerAction> actions = context.getPendingContainerAction(
|
||||
maxContainerActionsPerHB);
|
||||
if (!actions.isEmpty()) {
|
||||
ContainerActionsProto cap = ContainerActionsProto.newBuilder()
|
||||
.addAllContainerActions(actions)
|
||||
.build();
|
||||
requestBuilder.setContainerActions(cap);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns a builder class for HeartbeatEndpointTask task.
|
||||
* @return Builder.
|
||||
|
@ -79,8 +79,8 @@ message SCMHeartbeatRequestProto {
|
||||
required DatanodeDetailsProto datanodeDetails = 1;
|
||||
optional NodeReportProto nodeReport = 2;
|
||||
optional ContainerReportsProto containerReport = 3;
|
||||
optional ContainerActionsProto containerActions = 4;
|
||||
optional CommandStatusReportsProto commandStatusReport = 5;
|
||||
optional CommandStatusReportsProto commandStatusReport = 4;
|
||||
optional ContainerActionsProto containerActions = 5;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -18,7 +18,6 @@
|
||||
package org.apache.hadoop.ozone.container.common.report;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.GeneratedMessage;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -28,14 +27,8 @@
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.
|
||||
StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.
|
||||
StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
|
||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||
@ -178,22 +171,6 @@ public void testCommandStatusPublisher() throws InterruptedException {
|
||||
executorService.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddingReportToHeartbeat() {
|
||||
GeneratedMessage nodeReport = NodeReportProto.getDefaultInstance();
|
||||
GeneratedMessage containerReport = ContainerReportsProto
|
||||
.getDefaultInstance();
|
||||
SCMHeartbeatRequestProto.Builder heartbeatBuilder =
|
||||
SCMHeartbeatRequestProto.newBuilder();
|
||||
heartbeatBuilder.setDatanodeDetails(
|
||||
getDatanodeDetails().getProtoBufMessage());
|
||||
addReport(heartbeatBuilder, nodeReport);
|
||||
addReport(heartbeatBuilder, containerReport);
|
||||
SCMHeartbeatRequestProto heartbeat = heartbeatBuilder.build();
|
||||
Assert.assertTrue(heartbeat.hasNodeReport());
|
||||
Assert.assertTrue(heartbeat.hasContainerReport());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a datanode details.
|
||||
*
|
||||
@ -222,22 +199,4 @@ private static DatanodeDetails getDatanodeDetails() {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the report to heartbeat.
|
||||
*
|
||||
* @param requestBuilder builder to which the report has to be added.
|
||||
* @param report the report to be added.
|
||||
*/
|
||||
private static void addReport(SCMHeartbeatRequestProto.Builder
|
||||
requestBuilder, GeneratedMessage report) {
|
||||
String reportName = report.getDescriptorForType().getFullName();
|
||||
for (Descriptors.FieldDescriptor descriptor :
|
||||
SCMHeartbeatRequestProto.getDescriptor().getFields()) {
|
||||
String heartbeatFieldName = descriptor.getMessageType().getFullName();
|
||||
if (heartbeatFieldName.equals(reportName)) {
|
||||
requestBuilder.setField(descriptor, report);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,300 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.states.endpoint;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerAction;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine
|
||||
.DatanodeStateMachine;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine
|
||||
.DatanodeStateMachine.DatanodeStates;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine
|
||||
.EndpointStateMachine;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
||||
import org.apache.hadoop.ozone.protocolPB
|
||||
.StorageContainerDatanodeProtocolClientSideTranslatorPB;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* This class tests the functionality of HeartbeatEndpointTask.
|
||||
*/
|
||||
public class TestHeartbeatEndpointTask {
|
||||
|
||||
|
||||
@Test
|
||||
public void testheartbeatWithoutReports() throws Exception {
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
|
||||
Mockito.mock(
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
|
||||
ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
|
||||
.forClass(SCMHeartbeatRequestProto.class);
|
||||
Mockito.when(scm.sendHeartbeat(argument.capture()))
|
||||
.thenAnswer(invocation ->
|
||||
SCMHeartbeatResponseProto.newBuilder()
|
||||
.setDatanodeUUID(
|
||||
((SCMHeartbeatRequestProto)invocation.getArgument(0))
|
||||
.getDatanodeDetails().getUuid())
|
||||
.build());
|
||||
|
||||
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm);
|
||||
endpointTask.call();
|
||||
SCMHeartbeatRequestProto heartbeat = argument.getValue();
|
||||
Assert.assertTrue(heartbeat.hasDatanodeDetails());
|
||||
Assert.assertFalse(heartbeat.hasNodeReport());
|
||||
Assert.assertFalse(heartbeat.hasContainerReport());
|
||||
Assert.assertFalse(heartbeat.hasCommandStatusReport());
|
||||
Assert.assertFalse(heartbeat.hasContainerActions());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testheartbeatWithNodeReports() throws Exception {
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
|
||||
Mockito.mock(DatanodeStateMachine.class));
|
||||
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
|
||||
Mockito.mock(
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
|
||||
ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
|
||||
.forClass(SCMHeartbeatRequestProto.class);
|
||||
Mockito.when(scm.sendHeartbeat(argument.capture()))
|
||||
.thenAnswer(invocation ->
|
||||
SCMHeartbeatResponseProto.newBuilder()
|
||||
.setDatanodeUUID(
|
||||
((SCMHeartbeatRequestProto)invocation.getArgument(0))
|
||||
.getDatanodeDetails().getUuid())
|
||||
.build());
|
||||
|
||||
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
|
||||
conf, context, scm);
|
||||
context.addReport(NodeReportProto.getDefaultInstance());
|
||||
endpointTask.call();
|
||||
SCMHeartbeatRequestProto heartbeat = argument.getValue();
|
||||
Assert.assertTrue(heartbeat.hasDatanodeDetails());
|
||||
Assert.assertTrue(heartbeat.hasNodeReport());
|
||||
Assert.assertFalse(heartbeat.hasContainerReport());
|
||||
Assert.assertFalse(heartbeat.hasCommandStatusReport());
|
||||
Assert.assertFalse(heartbeat.hasContainerActions());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testheartbeatWithContainerReports() throws Exception {
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
|
||||
Mockito.mock(DatanodeStateMachine.class));
|
||||
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
|
||||
Mockito.mock(
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
|
||||
ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
|
||||
.forClass(SCMHeartbeatRequestProto.class);
|
||||
Mockito.when(scm.sendHeartbeat(argument.capture()))
|
||||
.thenAnswer(invocation ->
|
||||
SCMHeartbeatResponseProto.newBuilder()
|
||||
.setDatanodeUUID(
|
||||
((SCMHeartbeatRequestProto)invocation.getArgument(0))
|
||||
.getDatanodeDetails().getUuid())
|
||||
.build());
|
||||
|
||||
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
|
||||
conf, context, scm);
|
||||
context.addReport(ContainerReportsProto.getDefaultInstance());
|
||||
endpointTask.call();
|
||||
SCMHeartbeatRequestProto heartbeat = argument.getValue();
|
||||
Assert.assertTrue(heartbeat.hasDatanodeDetails());
|
||||
Assert.assertFalse(heartbeat.hasNodeReport());
|
||||
Assert.assertTrue(heartbeat.hasContainerReport());
|
||||
Assert.assertFalse(heartbeat.hasCommandStatusReport());
|
||||
Assert.assertFalse(heartbeat.hasContainerActions());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testheartbeatWithCommandStatusReports() throws Exception {
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
|
||||
Mockito.mock(DatanodeStateMachine.class));
|
||||
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
|
||||
Mockito.mock(
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
|
||||
ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
|
||||
.forClass(SCMHeartbeatRequestProto.class);
|
||||
Mockito.when(scm.sendHeartbeat(argument.capture()))
|
||||
.thenAnswer(invocation ->
|
||||
SCMHeartbeatResponseProto.newBuilder()
|
||||
.setDatanodeUUID(
|
||||
((SCMHeartbeatRequestProto)invocation.getArgument(0))
|
||||
.getDatanodeDetails().getUuid())
|
||||
.build());
|
||||
|
||||
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
|
||||
conf, context, scm);
|
||||
context.addReport(CommandStatusReportsProto.getDefaultInstance());
|
||||
endpointTask.call();
|
||||
SCMHeartbeatRequestProto heartbeat = argument.getValue();
|
||||
Assert.assertTrue(heartbeat.hasDatanodeDetails());
|
||||
Assert.assertFalse(heartbeat.hasNodeReport());
|
||||
Assert.assertFalse(heartbeat.hasContainerReport());
|
||||
Assert.assertTrue(heartbeat.hasCommandStatusReport());
|
||||
Assert.assertFalse(heartbeat.hasContainerActions());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testheartbeatWithContainerActions() throws Exception {
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
|
||||
Mockito.mock(DatanodeStateMachine.class));
|
||||
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
|
||||
Mockito.mock(
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
|
||||
ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
|
||||
.forClass(SCMHeartbeatRequestProto.class);
|
||||
Mockito.when(scm.sendHeartbeat(argument.capture()))
|
||||
.thenAnswer(invocation ->
|
||||
SCMHeartbeatResponseProto.newBuilder()
|
||||
.setDatanodeUUID(
|
||||
((SCMHeartbeatRequestProto)invocation.getArgument(0))
|
||||
.getDatanodeDetails().getUuid())
|
||||
.build());
|
||||
|
||||
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
|
||||
conf, context, scm);
|
||||
context.addContainerAction(getContainerAction());
|
||||
endpointTask.call();
|
||||
SCMHeartbeatRequestProto heartbeat = argument.getValue();
|
||||
Assert.assertTrue(heartbeat.hasDatanodeDetails());
|
||||
Assert.assertFalse(heartbeat.hasNodeReport());
|
||||
Assert.assertFalse(heartbeat.hasContainerReport());
|
||||
Assert.assertFalse(heartbeat.hasCommandStatusReport());
|
||||
Assert.assertTrue(heartbeat.hasContainerActions());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testheartbeatWithAllReports() throws Exception {
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
|
||||
Mockito.mock(DatanodeStateMachine.class));
|
||||
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
|
||||
Mockito.mock(
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
|
||||
ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
|
||||
.forClass(SCMHeartbeatRequestProto.class);
|
||||
Mockito.when(scm.sendHeartbeat(argument.capture()))
|
||||
.thenAnswer(invocation ->
|
||||
SCMHeartbeatResponseProto.newBuilder()
|
||||
.setDatanodeUUID(
|
||||
((SCMHeartbeatRequestProto)invocation.getArgument(0))
|
||||
.getDatanodeDetails().getUuid())
|
||||
.build());
|
||||
|
||||
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
|
||||
conf, context, scm);
|
||||
context.addReport(NodeReportProto.getDefaultInstance());
|
||||
context.addReport(ContainerReportsProto.getDefaultInstance());
|
||||
context.addReport(CommandStatusReportsProto.getDefaultInstance());
|
||||
context.addContainerAction(getContainerAction());
|
||||
endpointTask.call();
|
||||
SCMHeartbeatRequestProto heartbeat = argument.getValue();
|
||||
Assert.assertTrue(heartbeat.hasDatanodeDetails());
|
||||
Assert.assertTrue(heartbeat.hasNodeReport());
|
||||
Assert.assertTrue(heartbeat.hasContainerReport());
|
||||
Assert.assertTrue(heartbeat.hasCommandStatusReport());
|
||||
Assert.assertTrue(heartbeat.hasContainerActions());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates HeartbeatEndpointTask for the given StorageContainerManager proxy.
|
||||
*
|
||||
* @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
|
||||
*
|
||||
* @return HeartbeatEndpointTask
|
||||
*/
|
||||
private HeartbeatEndpointTask getHeartbeatEndpointTask(
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
|
||||
Mockito.mock(DatanodeStateMachine.class));
|
||||
return getHeartbeatEndpointTask(conf, context, proxy);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates HeartbeatEndpointTask with the given conf, context and
|
||||
* StorageContainerManager client side proxy.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param context StateContext
|
||||
* @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
|
||||
*
|
||||
* @return HeartbeatEndpointTask
|
||||
*/
|
||||
private HeartbeatEndpointTask getHeartbeatEndpointTask(
|
||||
Configuration conf,
|
||||
StateContext context,
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
|
||||
DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
|
||||
.setUuid(UUID.randomUUID().toString())
|
||||
.setHostName("localhost")
|
||||
.setIpAddress("127.0.0.1")
|
||||
.build();
|
||||
EndpointStateMachine endpointStateMachine = Mockito
|
||||
.mock(EndpointStateMachine.class);
|
||||
Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy);
|
||||
return HeartbeatEndpointTask.newBuilder()
|
||||
.setConfig(conf)
|
||||
.setDatanodeDetails(datanodeDetails)
|
||||
.setContext(context)
|
||||
.setEndpointStateMachine(endpointStateMachine)
|
||||
.build();
|
||||
}
|
||||
|
||||
private ContainerAction getContainerAction() {
|
||||
ContainerAction.Builder builder = ContainerAction.newBuilder();
|
||||
ContainerInfo containerInfo = ContainerInfo.newBuilder()
|
||||
.setContainerID(1L)
|
||||
.build();
|
||||
builder.setContainer(containerInfo)
|
||||
.setAction(ContainerAction.Action.CLOSE)
|
||||
.setReason(ContainerAction.Reason.CONTAINER_FULL);
|
||||
return builder.build();
|
||||
}
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.states.endpoint;
|
Loading…
Reference in New Issue
Block a user