HDDS-1178. Healthy pipeline Chill Mode Rule.

Closes #518
This commit is contained in:
Bharat Viswanadham 2019-02-25 17:17:15 -08:00 committed by Márton Elek
parent 0e450202a6
commit 6c8c422beb
No known key found for this signature in database
GPG Key ID: D51EA8F00EE79B28
10 changed files with 318 additions and 50 deletions

View File

@ -80,6 +80,15 @@ public final class HddsConfigKeys {
public static final String HDDS_SCM_CHILLMODE_THRESHOLD_PCT = public static final String HDDS_SCM_CHILLMODE_THRESHOLD_PCT =
"hdds.scm.chillmode.threshold.pct"; "hdds.scm.chillmode.threshold.pct";
public static final double HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT = 0.99; public static final double HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT = 0.99;
// percentage of healthy pipelines, where all 3 datanodes are reported in the
// pipeline.
public static final String HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT =
"hdds.scm.chillmode.healthy.pipelie.pct";
public static final double
HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT = 0.10;
public static final String HDDS_LOCK_MAX_CONCURRENCY = public static final String HDDS_LOCK_MAX_CONCURRENCY =
"hdds.lock.max.concurrency"; "hdds.lock.max.concurrency";
public static final int HDDS_LOCK_MAX_CONCURRENCY_DEFAULT = 100; public static final int HDDS_LOCK_MAX_CONCURRENCY_DEFAULT = 100;

View File

@ -1315,6 +1315,16 @@
</description> </description>
</property> </property>
<property>
<name>hdds.scm.chillmode.healthy.pipelie.pct</name>
<value>0.10</value>
<tag>HDDS,SCM,OPERATION</tag>
<description>
Percentage of healthy pipelines, where all 3 datanodes are reported in the
pipeline.
</description>
</property>
<property> <property>
<name>hdds.container.action.max.limit</name> <name>hdds.container.action.max.limit</name>
<value>20</value> <value>20</value>

View File

@ -17,8 +17,9 @@
*/ */
package org.apache.hadoop.hdds.scm.chillmode; package org.apache.hadoop.hdds.scm.chillmode;
import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@ -30,54 +31,66 @@
import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventPublisher;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Class defining Chill mode exit criteria for Pipelines. * Class defining Chill mode exit criteria for Pipelines.
*
* This rule defines percentage of healthy pipelines need to be reported.
* Once chill mode exit happens, this rules take care of writes can go
* through in a cluster.
*/ */
public class PipelineChillModeRule public class HealthyPipelineChillModeRule
implements ChillModeExitRule<PipelineReportFromDatanode>, implements ChillModeExitRule<PipelineReportFromDatanode>,
EventHandler<PipelineReportFromDatanode> { EventHandler<PipelineReportFromDatanode> {
/** Pipeline availability.*/
private AtomicBoolean isPipelineAvailable = new AtomicBoolean(false);
private static final Logger LOG =
LoggerFactory.getLogger(HealthyPipelineChillModeRule.class);
private final PipelineManager pipelineManager; private final PipelineManager pipelineManager;
private final SCMChillModeManager chillModeManager; private final SCMChillModeManager chillModeManager;
private final int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0;
PipelineChillModeRule(PipelineManager pipelineManager, HealthyPipelineChillModeRule(PipelineManager pipelineManager,
SCMChillModeManager manager) { SCMChillModeManager manager, Configuration configuration) {
this.pipelineManager = pipelineManager; this.pipelineManager = pipelineManager;
this.chillModeManager = manager; this.chillModeManager = manager;
double healthyPipelinesPercent =
configuration.getDouble(HddsConfigKeys.
HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
HddsConfigKeys.
HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);
// As we want to wait for 3 node pipelines
int pipelineCount =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE).size();
// This value will be zero when pipeline count is 0.
// On a fresh installed cluster, there will be zero pipelines in the SCM
// pipeline DB.
healthyPipelineThresholdCount =
(int) Math.ceil((healthyPipelinesPercent / 100) * pipelineCount);
LOG.info(" Total pipeline count is {}, healthy pipeline " +
"threshold count is {}", pipelineCount, healthyPipelineThresholdCount);
} }
@Override @Override
public boolean validate() { public boolean validate() {
return isPipelineAvailable.get(); if (currentHealthyPipelineCount >= healthyPipelineThresholdCount) {
} return true;
@Override
public void process(PipelineReportFromDatanode report) {
// No need to deal with
}
@Override
public void cleanup() {
// No need to deal with
}
@Override
public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
EventPublisher publisher) {
// If we are already in pipeline available state,
// skipping following check.
if (validate()) {
chillModeManager.validateChillModeExitRules(publisher);
return;
} }
return false;
}
@Override
public void process(PipelineReportFromDatanode pipelineReportFromDatanode) {
Pipeline pipeline; Pipeline pipeline;
Preconditions.checkNotNull(pipelineReportFromDatanode); Preconditions.checkNotNull(pipelineReportFromDatanode);
PipelineReportsProto pipelineReport = pipelineReportFromDatanode PipelineReportsProto pipelineReport =
.getReport(); pipelineReportFromDatanode.getReport();
for (PipelineReport report : pipelineReport.getPipelineReportList()) { for (PipelineReport report : pipelineReport.getPipelineReportList()) {
PipelineID pipelineID = PipelineID PipelineID pipelineID = PipelineID
@ -89,17 +102,38 @@ public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
} }
if (pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) { if (pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
// ensure there is an OPEN state pipeline and then allowed // If the pipeline is open state mean, all 3 datanodes are reported
// to exit chill mode // for this pipeline.
isPipelineAvailable.set(true); currentHealthyPipelineCount++;
if (chillModeManager.getInChillMode()) {
SCMChillModeManager.getLogger()
.info("SCM in chill mode. 1 Pipeline reported, 1 required.");
}
break;
} }
} }
}
@Override
public void cleanup() {
// No need to deal with
}
@Override
public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
EventPublisher publisher) {
// If we have already reached healthy pipeline threshold, skip processing
// pipeline report from datanode.
if (validate()) {
chillModeManager.validateChillModeExitRules(publisher);
return;
}
// Process pipeline report from datanode
process(pipelineReportFromDatanode);
if (chillModeManager.getInChillMode()) {
SCMChillModeManager.getLogger().info(
"SCM in chill mode. Healthy pipelines reported count is {}, " +
"required healthy pipeline reported count is {}",
currentHealthyPipelineCount, healthyPipelineThresholdCount);
}
if (validate()) { if (validate()) {
chillModeManager.validateChillModeExitRules(publisher); chillModeManager.validateChillModeExitRules(publisher);

View File

@ -60,7 +60,8 @@ public class SCMChillModeManager implements
private Configuration config; private Configuration config;
private static final String CONT_EXIT_RULE = "ContainerChillModeRule"; private static final String CONT_EXIT_RULE = "ContainerChillModeRule";
private static final String DN_EXIT_RULE = "DataNodeChillModeRule"; private static final String DN_EXIT_RULE = "DataNodeChillModeRule";
private static final String PIPELINE_EXIT_RULE = "PipelineChillModeRule"; private static final String HEALTHY_PIPELINE_EXIT_RULE =
"HealthyPipelineChillModeRule";
private final EventQueue eventPublisher; private final EventQueue eventPublisher;
private final PipelineManager pipelineManager; private final PipelineManager pipelineManager;
@ -83,10 +84,10 @@ public SCMChillModeManager(Configuration conf,
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK,
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT) HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT)
&& pipelineManager != null) { && pipelineManager != null) {
PipelineChillModeRule rule = new PipelineChillModeRule(pipelineManager, HealthyPipelineChillModeRule rule = new HealthyPipelineChillModeRule(
this); pipelineManager, this, config);
exitRules.put(PIPELINE_EXIT_RULE, rule); exitRules.put(HEALTHY_PIPELINE_EXIT_RULE, rule);
eventPublisher.addHandler(SCMEvents.PIPELINE_REPORT, rule); eventPublisher.addHandler(SCMEvents.PROCESSED_PIPELINE_REPORT, rule);
} }
emitChillModeStatus(); emitChillModeStatus();
} else { } else {
@ -172,4 +173,10 @@ public double getCurrentContainerThreshold() {
.getCurrentContainerThreshold(); .getCurrentContainerThreshold();
} }
@VisibleForTesting
public HealthyPipelineChillModeRule getHealthyPipelineChillModeRule() {
return (HealthyPipelineChillModeRule)
exitRules.get(HEALTHY_PIPELINE_EXIT_RULE);
}
} }

View File

@ -103,6 +103,14 @@ public final class SCMEvents {
public static final TypedEvent<PipelineReportFromDatanode> PIPELINE_REPORT = public static final TypedEvent<PipelineReportFromDatanode> PIPELINE_REPORT =
new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report"); new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report");
/**
* PipelineReport processed by pipeline report handler. This event is
* received by HealthyPipelineChillModeRule.
*/
public static final TypedEvent<PipelineReportFromDatanode>
PROCESSED_PIPELINE_REPORT = new TypedEvent<>(
PipelineReportFromDatanode.class, "Processed_Pipeline_Report");
/** /**
* PipelineActions are sent by Datanode. This event is received by * PipelineActions are sent by Datanode. This event is received by
* SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated. * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.

View File

@ -20,11 +20,14 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport; .StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventHandler;
@ -33,6 +36,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Objects;
/** /**
* Handles Pipeline Reports from datanode. * Handles Pipeline Reports from datanode.
@ -44,12 +48,21 @@ public class PipelineReportHandler implements
.getLogger(PipelineReportHandler.class); .getLogger(PipelineReportHandler.class);
private final PipelineManager pipelineManager; private final PipelineManager pipelineManager;
private final Configuration conf; private final Configuration conf;
private final SCMChillModeManager scmChillModeManager;
private final boolean pipelineAvailabilityCheck;
public PipelineReportHandler(PipelineManager pipelineManager, public PipelineReportHandler(SCMChillModeManager scmChillModeManager,
PipelineManager pipelineManager,
Configuration conf) { Configuration conf) {
Preconditions.checkNotNull(pipelineManager); Preconditions.checkNotNull(pipelineManager);
Objects.requireNonNull(scmChillModeManager);
this.scmChillModeManager = scmChillModeManager;
this.pipelineManager = pipelineManager; this.pipelineManager = pipelineManager;
this.conf = conf; this.conf = conf;
this.pipelineAvailabilityCheck = conf.getBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK,
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT);
} }
@Override @Override
@ -70,6 +83,11 @@ public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
report, dn, e); report, dn, e);
} }
} }
if (pipelineAvailabilityCheck && scmChillModeManager.getInChillMode()) {
publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
pipelineReportFromDatanode);
}
} }
private void processPipelineReport(PipelineReport report, DatanodeDetails dn) private void processPipelineReport(PipelineReport report, DatanodeDetails dn)

View File

@ -289,7 +289,7 @@ public StorageContainerManager(OzoneConfiguration conf,
NodeReportHandler nodeReportHandler = NodeReportHandler nodeReportHandler =
new NodeReportHandler(scmNodeManager); new NodeReportHandler(scmNodeManager);
PipelineReportHandler pipelineReportHandler = PipelineReportHandler pipelineReportHandler =
new PipelineReportHandler(pipelineManager, conf); new PipelineReportHandler(scmChillModeManager, pipelineManager, conf);
CommandStatusReportHandler cmdStatusReportHandler = CommandStatusReportHandler cmdStatusReportHandler =
new CommandStatusReportHandler(); new CommandStatusReportHandler();

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.chillmode;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* This class tests HealthyPipelineChillMode rule.
*/
public class TestHealthyPipelineChillModeRule {
@Test
public void testHealthyPipelineChillModeRuleWithNoPipelines()
throws Exception {
String storageDir = GenericTestUtils.getTempPath(
TestHealthyPipelineChillModeRule.class.getName() + UUID.randomUUID());
try {
EventQueue eventQueue = new EventQueue();
List<ContainerInfo> containers = new ArrayList<>();
containers.addAll(HddsTestUtils.getContainerInfo(1));
OzoneConfiguration config = new OzoneConfiguration();
MockNodeManager nodeManager = new MockNodeManager(true, 0);
config.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
// enable pipeline check
config.setBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, true);
PipelineManager pipelineManager = new SCMPipelineManager(config,
nodeManager, eventQueue);
SCMChillModeManager scmChillModeManager = new SCMChillModeManager(
config, containers, pipelineManager, eventQueue);
HealthyPipelineChillModeRule healthyPipelineChillModeRule =
scmChillModeManager.getHealthyPipelineChillModeRule();
// This should be immediately satisfied, as no pipelines are there yet.
Assert.assertTrue(healthyPipelineChillModeRule.validate());
} finally {
FileUtil.fullyDelete(new File(storageDir));
}
}
@Test
public void testHealthyPipelineChillModeRuleWithPipelines() throws Exception {
String storageDir = GenericTestUtils.getTempPath(
TestHealthyPipelineChillModeRule.class.getName() + UUID.randomUUID());
try {
EventQueue eventQueue = new EventQueue();
List<ContainerInfo> containers = new ArrayList<>();
containers.addAll(HddsTestUtils.getContainerInfo(1));
OzoneConfiguration config = new OzoneConfiguration();
// In Mock Node Manager, first 8 nodes are healthy, next 2 nodes are
// stale and last one is dead, and this repeats. So for a 12 node, 9
// healthy, 2 stale and one dead.
MockNodeManager nodeManager = new MockNodeManager(true, 12);
config.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
// enable pipeline check
config.setBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, true);
PipelineManager pipelineManager = new SCMPipelineManager(config,
nodeManager, eventQueue);
// Create 3 pipelines
Pipeline pipeline1 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Pipeline pipeline2 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Pipeline pipeline3 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
SCMChillModeManager scmChillModeManager = new SCMChillModeManager(
config, containers, pipelineManager, eventQueue);
HealthyPipelineChillModeRule healthyPipelineChillModeRule =
scmChillModeManager.getHealthyPipelineChillModeRule();
// No datanodes have sent pipelinereport from datanode
Assert.assertFalse(healthyPipelineChillModeRule.validate());
// Fire pipeline report from all datanodes in first pipeline, as here we
// have 3 pipelines, 10% is 0.3, when doing ceil it is 1. So, we should
// validate should return true after fire pipeline event
//Here testing with out pipelinereport handler, so not moving created
// pipelines to allocated state, as pipelines changing to healthy is
// handled by pipeline report handler. So, leaving pipeline's in pipeline
// manager in open state for test case simplicity.
firePipelineEvent(pipeline1, eventQueue);
GenericTestUtils.waitFor(() -> healthyPipelineChillModeRule.validate(),
1000, 5000);
} finally {
FileUtil.fullyDelete(new File(storageDir));
}
}
private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) {
PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
.newBuilder();
reportBuilder.addPipelineReport(PipelineReport.newBuilder()
.setPipelineID(pipeline.getId().getProtobuf()));
// Here no need to fire event from 3 nodes, as already pipeline is in
// open state, but doing it.
eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
pipeline.getNodes().get(0), reportBuilder.build()));
}
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerManager;
@ -40,6 +41,7 @@
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -192,11 +194,15 @@ public void testPipelineCloseWithPipelineReport() throws IOException {
for (DatanodeDetails dn : pipeline.getNodes()) { for (DatanodeDetails dn : pipeline.getNodes()) {
PipelineReportFromDatanode pipelineReport = PipelineReportFromDatanode pipelineReport =
TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
EventQueue eventQueue = new EventQueue();
SCMChillModeManager scmChillModeManager =
new SCMChillModeManager(new OzoneConfiguration(),
new ArrayList<>(), pipelineManager, eventQueue);
PipelineReportHandler pipelineReportHandler = PipelineReportHandler pipelineReportHandler =
new PipelineReportHandler(pipelineManager, conf); new PipelineReportHandler(scmChillModeManager, pipelineManager, conf);
// on receiving pipeline report for the pipeline, pipeline report handler // on receiving pipeline report for the pipeline, pipeline report handler
// should destroy the pipeline for the dn // should destroy the pipeline for the dn
pipelineReportHandler.onMessage(pipelineReport, new EventQueue()); pipelineReportHandler.onMessage(pipelineReport, eventQueue);
} }
OzoneContainer ozoneContainer = OzoneContainer ozoneContainer =

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
@ -37,6 +38,7 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -131,8 +133,12 @@ public void testRemovePipeline() throws IOException {
@Test @Test
public void testPipelineReport() throws IOException { public void testPipelineReport() throws IOException {
EventQueue eventQueue = new EventQueue();
PipelineManager pipelineManager = PipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager, new EventQueue()); new SCMPipelineManager(conf, nodeManager, eventQueue);
SCMChillModeManager scmChillModeManager =
new SCMChillModeManager(new OzoneConfiguration(),
new ArrayList<>(), pipelineManager, eventQueue);
// create a pipeline in allocated state with no dns yet reported // create a pipeline in allocated state with no dns yet reported
Pipeline pipeline = pipelineManager Pipeline pipeline = pipelineManager
@ -145,7 +151,7 @@ public void testPipelineReport() throws IOException {
// get pipeline report from each dn in the pipeline // get pipeline report from each dn in the pipeline
PipelineReportHandler pipelineReportHandler = PipelineReportHandler pipelineReportHandler =
new PipelineReportHandler(pipelineManager, conf); new PipelineReportHandler(scmChillModeManager, pipelineManager, conf);
for (DatanodeDetails dn: pipeline.getNodes()) { for (DatanodeDetails dn: pipeline.getNodes()) {
PipelineReportFromDatanode pipelineReportFromDatanode = PipelineReportFromDatanode pipelineReportFromDatanode =
TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());