HDDS-1187. Healthy pipeline Chill Mode rule to consider only pipelines with replication factor three.
This commit is contained in:
parent
0d61facd37
commit
eae8819fd2
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||||
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
|
||||||
@ -34,6 +35,9 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class defining Chill mode exit criteria for Pipelines.
|
* Class defining Chill mode exit criteria for Pipelines.
|
||||||
*
|
*
|
||||||
@ -45,12 +49,14 @@ public class HealthyPipelineChillModeRule
|
|||||||
implements ChillModeExitRule<PipelineReportFromDatanode>,
|
implements ChillModeExitRule<PipelineReportFromDatanode>,
|
||||||
EventHandler<PipelineReportFromDatanode> {
|
EventHandler<PipelineReportFromDatanode> {
|
||||||
|
|
||||||
private static final Logger LOG =
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(HealthyPipelineChillModeRule.class);
|
LoggerFactory.getLogger(HealthyPipelineChillModeRule.class);
|
||||||
private final PipelineManager pipelineManager;
|
private final PipelineManager pipelineManager;
|
||||||
private final SCMChillModeManager chillModeManager;
|
private final SCMChillModeManager chillModeManager;
|
||||||
private final int healthyPipelineThresholdCount;
|
private final int healthyPipelineThresholdCount;
|
||||||
private int currentHealthyPipelineCount = 0;
|
private int currentHealthyPipelineCount = 0;
|
||||||
|
private final Set<DatanodeDetails> processedDatanodeDetails =
|
||||||
|
new HashSet<>();
|
||||||
|
|
||||||
HealthyPipelineChillModeRule(PipelineManager pipelineManager,
|
HealthyPipelineChillModeRule(PipelineManager pipelineManager,
|
||||||
SCMChillModeManager manager, Configuration configuration) {
|
SCMChillModeManager manager, Configuration configuration) {
|
||||||
@ -71,7 +77,7 @@ public class HealthyPipelineChillModeRule
|
|||||||
// On a fresh installed cluster, there will be zero pipelines in the SCM
|
// On a fresh installed cluster, there will be zero pipelines in the SCM
|
||||||
// pipeline DB.
|
// pipeline DB.
|
||||||
healthyPipelineThresholdCount =
|
healthyPipelineThresholdCount =
|
||||||
(int) Math.ceil((healthyPipelinesPercent / 100) * pipelineCount);
|
(int) Math.ceil(healthyPipelinesPercent * pipelineCount);
|
||||||
|
|
||||||
LOG.info(" Total pipeline count is {}, healthy pipeline " +
|
LOG.info(" Total pipeline count is {}, healthy pipeline " +
|
||||||
"threshold count is {}", pipelineCount, healthyPipelineThresholdCount);
|
"threshold count is {}", pipelineCount, healthyPipelineThresholdCount);
|
||||||
@ -101,7 +107,8 @@ public void process(PipelineReportFromDatanode pipelineReportFromDatanode) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
|
if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
|
||||||
|
pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
|
||||||
// If the pipeline is open state mean, all 3 datanodes are reported
|
// If the pipeline is open state mean, all 3 datanodes are reported
|
||||||
// for this pipeline.
|
// for this pipeline.
|
||||||
currentHealthyPipelineCount++;
|
currentHealthyPipelineCount++;
|
||||||
@ -125,14 +132,26 @@ public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process pipeline report from datanode
|
|
||||||
process(pipelineReportFromDatanode);
|
|
||||||
|
|
||||||
if (chillModeManager.getInChillMode()) {
|
// When SCM is in chill mode for long time, already registered
|
||||||
SCMChillModeManager.getLogger().info(
|
// datanode can send pipeline report again, then pipeline handler fires
|
||||||
"SCM in chill mode. Healthy pipelines reported count is {}, " +
|
// processed report event, we should not consider this pipeline report
|
||||||
"required healthy pipeline reported count is {}",
|
// from datanode again during threshold calculation.
|
||||||
currentHealthyPipelineCount, healthyPipelineThresholdCount);
|
DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails();
|
||||||
|
if (!processedDatanodeDetails.contains(
|
||||||
|
pipelineReportFromDatanode.getDatanodeDetails())) {
|
||||||
|
|
||||||
|
// Process pipeline report from datanode
|
||||||
|
process(pipelineReportFromDatanode);
|
||||||
|
|
||||||
|
if (chillModeManager.getInChillMode()) {
|
||||||
|
SCMChillModeManager.getLogger().info(
|
||||||
|
"SCM in chill mode. Healthy pipelines reported count is {}, " +
|
||||||
|
"required healthy pipeline reported count is {}",
|
||||||
|
currentHealthyPipelineCount, healthyPipelineThresholdCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
processedDatanodeDetails.add(dnDetails);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (validate()) {
|
if (validate()) {
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -153,6 +154,82 @@ public void testHealthyPipelineChillModeRuleWithPipelines() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHealthyPipelineChillModeRuleWithMixedPipelines()
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
String storageDir = GenericTestUtils.getTempPath(
|
||||||
|
TestHealthyPipelineChillModeRule.class.getName() + UUID.randomUUID());
|
||||||
|
|
||||||
|
try {
|
||||||
|
EventQueue eventQueue = new EventQueue();
|
||||||
|
List<ContainerInfo> containers = new ArrayList<>();
|
||||||
|
containers.addAll(HddsTestUtils.getContainerInfo(1));
|
||||||
|
|
||||||
|
OzoneConfiguration config = new OzoneConfiguration();
|
||||||
|
|
||||||
|
// In Mock Node Manager, first 8 nodes are healthy, next 2 nodes are
|
||||||
|
// stale and last one is dead, and this repeats. So for a 12 node, 9
|
||||||
|
// healthy, 2 stale and one dead.
|
||||||
|
MockNodeManager nodeManager = new MockNodeManager(true, 12);
|
||||||
|
config.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
|
||||||
|
// enable pipeline check
|
||||||
|
config.setBoolean(
|
||||||
|
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, true);
|
||||||
|
|
||||||
|
|
||||||
|
PipelineManager pipelineManager = new SCMPipelineManager(config,
|
||||||
|
nodeManager, eventQueue);
|
||||||
|
|
||||||
|
// Create 3 pipelines
|
||||||
|
Pipeline pipeline1 =
|
||||||
|
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
|
||||||
|
HddsProtos.ReplicationFactor.ONE);
|
||||||
|
Pipeline pipeline2 =
|
||||||
|
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
|
||||||
|
HddsProtos.ReplicationFactor.THREE);
|
||||||
|
Pipeline pipeline3 =
|
||||||
|
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
|
||||||
|
HddsProtos.ReplicationFactor.THREE);
|
||||||
|
|
||||||
|
|
||||||
|
SCMChillModeManager scmChillModeManager = new SCMChillModeManager(
|
||||||
|
config, containers, pipelineManager, eventQueue);
|
||||||
|
|
||||||
|
HealthyPipelineChillModeRule healthyPipelineChillModeRule =
|
||||||
|
scmChillModeManager.getHealthyPipelineChillModeRule();
|
||||||
|
|
||||||
|
|
||||||
|
// No datanodes have sent pipelinereport from datanode
|
||||||
|
Assert.assertFalse(healthyPipelineChillModeRule.validate());
|
||||||
|
|
||||||
|
|
||||||
|
GenericTestUtils.LogCapturer logCapturer =
|
||||||
|
GenericTestUtils.LogCapturer.captureLogs(LoggerFactory.getLogger(
|
||||||
|
SCMChillModeManager.class));
|
||||||
|
|
||||||
|
// fire event with pipeline report with ratis type and factor 1
|
||||||
|
// pipeline, validate() should return false
|
||||||
|
firePipelineEvent(pipeline1, eventQueue);
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
|
||||||
|
"reported count is 0"),
|
||||||
|
1000, 5000);
|
||||||
|
Assert.assertFalse(healthyPipelineChillModeRule.validate());
|
||||||
|
|
||||||
|
firePipelineEvent(pipeline2, eventQueue);
|
||||||
|
firePipelineEvent(pipeline3, eventQueue);
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(() -> healthyPipelineChillModeRule.validate(),
|
||||||
|
1000, 5000);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
FileUtil.fullyDelete(new File(storageDir));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) {
|
private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) {
|
||||||
PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
|
PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
|
||||||
.newBuilder();
|
.newBuilder();
|
||||||
|
@ -237,7 +237,7 @@ public void testChillModePipelineExitRule() throws Exception {
|
|||||||
String storageDir = GenericTestUtils.getTempPath(
|
String storageDir = GenericTestUtils.getTempPath(
|
||||||
TestSCMChillModeManager.class.getName() + UUID.randomUUID());
|
TestSCMChillModeManager.class.getName() + UUID.randomUUID());
|
||||||
try{
|
try{
|
||||||
MockNodeManager nodeManager = new MockNodeManager(true, 1);
|
MockNodeManager nodeManager = new MockNodeManager(true, 3);
|
||||||
config.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
|
config.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
|
||||||
// enable pipeline check
|
// enable pipeline check
|
||||||
config.setBoolean(
|
config.setBoolean(
|
||||||
@ -245,6 +245,15 @@ public void testChillModePipelineExitRule() throws Exception {
|
|||||||
|
|
||||||
PipelineManager pipelineManager = new SCMPipelineManager(config,
|
PipelineManager pipelineManager = new SCMPipelineManager(config,
|
||||||
nodeManager, queue);
|
nodeManager, queue);
|
||||||
|
|
||||||
|
Pipeline pipeline = pipelineManager.createPipeline(
|
||||||
|
HddsProtos.ReplicationType.RATIS,
|
||||||
|
HddsProtos.ReplicationFactor.THREE);
|
||||||
|
PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
|
||||||
|
.newBuilder();
|
||||||
|
reportBuilder.addPipelineReport(PipelineReport.newBuilder()
|
||||||
|
.setPipelineID(pipeline.getId().getProtobuf()));
|
||||||
|
|
||||||
scmChillModeManager = new SCMChillModeManager(
|
scmChillModeManager = new SCMChillModeManager(
|
||||||
config, containers, pipelineManager, queue);
|
config, containers, pipelineManager, queue);
|
||||||
queue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
|
queue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
|
||||||
@ -254,17 +263,10 @@ public void testChillModePipelineExitRule() throws Exception {
|
|||||||
HddsTestUtils.createNodeRegistrationContainerReport(containers));
|
HddsTestUtils.createNodeRegistrationContainerReport(containers));
|
||||||
assertTrue(scmChillModeManager.getInChillMode());
|
assertTrue(scmChillModeManager.getInChillMode());
|
||||||
|
|
||||||
// simulation a pipeline report to trigger the rule check
|
// Trigger the processed pipeline report event
|
||||||
Pipeline pipeline = pipelineManager.createPipeline(
|
queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
|
||||||
HddsProtos.ReplicationType.STAND_ALONE,
|
new PipelineReportFromDatanode(pipeline.getNodes().get(0),
|
||||||
HddsProtos.ReplicationFactor.ONE);
|
reportBuilder.build()));
|
||||||
PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
|
|
||||||
.newBuilder();
|
|
||||||
reportBuilder.addPipelineReport(PipelineReport.newBuilder()
|
|
||||||
.setPipelineID(pipeline.getId().getProtobuf()));
|
|
||||||
|
|
||||||
queue.fireEvent(SCMEvents.PIPELINE_REPORT, new PipelineReportFromDatanode(
|
|
||||||
pipeline.getNodes().get(0), reportBuilder.build()));
|
|
||||||
|
|
||||||
GenericTestUtils.waitFor(() -> {
|
GenericTestUtils.waitFor(() -> {
|
||||||
return !scmChillModeManager.getInChillMode();
|
return !scmChillModeManager.getInChillMode();
|
||||||
|
Loading…
Reference in New Issue
Block a user