From ff64d3571660ace3fb266ee47bea181cebfee8d9 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Fri, 7 Sep 2018 10:35:45 -0700 Subject: [PATCH] HDDS-351. Add chill mode state to SCM. Contributed by Ajay Kumar. --- .../apache/hadoop/hdds/HddsConfigKeys.java | 6 + .../src/main/resources/ozone-default.xml | 9 + .../hdds/scm/container/ContainerMapping.java | 5 + .../hadoop/hdds/scm/events/SCMEvents.java | 10 + .../hdds/scm/server/SCMChillModeManager.java | 198 ++++++++++++++++++ .../scm/server/SCMDatanodeProtocolServer.java | 19 ++ .../scm/server/StorageContainerManager.java | 17 +- .../apache/hadoop/hdds/scm/HddsTestUtils.java | 85 ++++++++ .../scm/server/TestSCMChillModeManager.java | 115 ++++++++++ .../apache/hadoop/ozone/MiniOzoneCluster.java | 23 +- .../hadoop/ozone/MiniOzoneClusterImpl.java | 97 ++++++--- .../ozone/TestStorageContainerManager.java | 188 ++++++++++++----- 12 files changed, 694 insertions(+), 78 deletions(-) create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java create mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java create mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMChillModeManager.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index 4dc7e0a51c..98efbf8dbe 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -75,4 +75,10 @@ private HddsConfigKeys() { "hdds.container.close.threshold"; public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f; + // % of containers which should have at least one reported replica + // before SCM comes out of chill mode. + public static final String HDDS_SCM_CHILLMODE_THRESHOLD_PCT = + "hdds.scm.chillmode.threshold.pct"; + public static final double HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT = 0.99; + } diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 778d641fb1..be19e90172 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1112,6 +1112,15 @@ + + hdds.scm.chillmode.threshold.pct + 0.99 + HDDS,SCM,OPERATION + % of containers which should have at least one + reported replica before SCM comes out of chill mode. + + + hdds.container.action.max.limit 20 diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index 3554339774..5678205a3e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -210,6 +211,10 @@ public ContainerWithPipeline getContainerWithPipeline(long containerID) // For close containers create pipeline from datanodes with replicas Set dnWithReplicas = containerStateManager .getContainerReplicas(contInfo.containerID()); + if (dnWithReplicas.size() == 0) { + throw new SCMException("Can't create a pipeline for container with " + + "no replica.", ResultCodes.NO_REPLICA_FOUND); + } pipeline = new Pipeline(dnWithReplicas.iterator().next().getUuidString(), contInfo.getState(), ReplicationType.STAND_ALONE, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 03df8eb0ae..6985834f26 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -45,6 +45,7 @@ .ReplicationCompleted; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport; import org.apache.hadoop.hdds.server.events.Event; import org.apache.hadoop.hdds.server.events.TypedEvent; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; @@ -60,6 +61,15 @@ public final class SCMEvents { */ public static final TypedEvent NODE_REPORT = new TypedEvent<>(NodeReportFromDatanode.class, "Node_Report"); + + /** + * Event generated on DataNode registration. + */ + public static final TypedEvent + NODE_REGISTRATION_CONT_REPORT = new TypedEvent<>( + NodeRegistrationContainerReport.class, + "Node_Registration_Container_Report"); + /** * ContainerReports are send out by Datanodes. This report is received by * SCMDatanodeHeartbeatDispatcher and Container_Report Event diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java new file mode 100644 index 0000000000..d2786372f5 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.server; + +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer + .NodeRegistrationContainerReport; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * StorageContainerManager enters chill mode on startup to allow system to + * reach a stable state before becoming fully functional. SCM will wait + * for certain resources to be reported before coming out of chill mode. + * + * ChillModeExitRule defines format to define new rules which must be satisfied + * to exit Chill mode. + * ContainerChillModeRule defines the only exit criteria right now. + * On every new datanode registration event this class adds replicas + * for reported containers and validates if cutoff threshold for + * containers is meet. + */ +public class SCMChillModeManager implements + EventHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(SCMChillModeManager.class); + private AtomicBoolean inChillMode = new AtomicBoolean(true); + private AtomicLong containerWithMinReplicas = new AtomicLong(0); + private Map exitRules = new HashMap(1); + private Configuration config; + private static final String CONT_EXIT_RULE = "ContainerChillModeRule"; + + SCMChillModeManager(Configuration conf, List allContainers) { + this.config = conf; + exitRules + .put(CONT_EXIT_RULE, new ContainerChillModeRule(config, allContainers)); + } + + private void validateChillModeExitRules(EventPublisher eventQueue) { + for (ChillModeExitRule exitRule : exitRules.values()) { + if (!exitRule.validate()) { + return; + } + } + exitChillMode(eventQueue); + } + + private void exitChillMode(EventPublisher eventQueue) { + LOG.info("SCM exiting chill mode."); + setInChillMode(false); + // Emit event to ReplicationManager to start replication. + eventQueue.fireEvent(SCMEvents.START_REPLICATION, true); + + // TODO: Remove handler registration as there is no need to listen to + // register events anymore. + + for (ChillModeExitRule e : exitRules.values()) { + e.cleanup(); + } + } + + @Override + public void onMessage( + NodeRegistrationContainerReport nodeRegistrationContainerReport, + EventPublisher publisher) { + if (getInChillMode()) { + exitRules.get(CONT_EXIT_RULE).process(nodeRegistrationContainerReport); + validateChillModeExitRules(publisher); + } + } + + public boolean getInChillMode() { + return inChillMode.get(); + } + + public void setInChillMode(boolean inChillMode) { + this.inChillMode.set(inChillMode); + } + + /** + * Interface for defining chill mode exit rules. + * + * @param + */ + public interface ChillModeExitRule { + + boolean validate(); + + void process(T report); + + void cleanup(); + } + + /** + * Class defining Chill mode exit criteria for Containers. + */ + public class ContainerChillModeRule implements + ChillModeExitRule { + + // Required cutoff % for containers with at least 1 reported replica. + private double chillModeCutoff; + // Containers read from scm db. + private Map containerMap; + private double maxContainer; + + public ContainerChillModeRule(Configuration conf, + List containers) { + chillModeCutoff = conf + .getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT, + HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT); + containerMap = new ConcurrentHashMap<>(); + if(containers != null) { + containers.forEach(c -> { + if (c != null) { + containerMap.put(c.getContainerID(), c); + } + }); + maxContainer = containers.size(); + } + } + + @Override + public boolean validate() { + if (maxContainer == 0) { + return true; + } + return getCurrentContainerThreshold() >= chillModeCutoff; + } + + @VisibleForTesting + public double getCurrentContainerThreshold() { + return (containerWithMinReplicas.doubleValue() / maxContainer); + } + + @Override + public void process(NodeRegistrationContainerReport reportsProto) { + if (maxContainer == 0) { + // No container to check. + return; + } + + reportsProto.getReport().getReportsList().forEach(c -> { + if (containerMap.containsKey(c.getContainerID())) { + if(containerMap.remove(c.getContainerID()) != null) { + containerWithMinReplicas.getAndAdd(1); + } + } + }); + + LOG.info("SCM in chill mode. {} % containers have at least one reported " + + "replica.", (containerWithMinReplicas.get() / maxContainer) * 100); + } + + @Override + public void cleanup() { + containerMap.clear(); + } + } + + @VisibleForTesting + public static Logger getLogger() { + return LOG; + } + + @VisibleForTesting + public double getCurrentContainerThreshold() { + return ((ContainerChillModeRule) exitRules.get(CONT_EXIT_RULE)) + .getCurrentContainerThreshold(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 92158039cd..8a09dc899d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -73,6 +73,8 @@ import org.apache.hadoop.hdds.scm.HddsServerUtil; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -120,6 +122,7 @@ public class SCMDatanodeProtocolServer implements private final StorageContainerManager scm; private final InetSocketAddress datanodeRpcAddress; private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher; + private final EventPublisher eventPublisher; public SCMDatanodeProtocolServer(final OzoneConfiguration conf, StorageContainerManager scm, EventPublisher eventPublisher) @@ -129,6 +132,7 @@ public SCMDatanodeProtocolServer(final OzoneConfiguration conf, Preconditions.checkNotNull(eventPublisher, "EventPublisher cannot be null"); this.scm = scm; + this.eventPublisher = eventPublisher; final int handlerCount = conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT); @@ -197,6 +201,9 @@ public SCMRegisteredResponseProto register( == SCMRegisteredResponseProto.ErrorCode.success) { scm.getScmContainerManager().processContainerReports(datanodeDetails, containerReportsProto, true); + eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT, + new NodeRegistrationContainerReport(datanodeDetails, + containerReportsProto)); } return getRegisteredResponse(registeredCommand); } @@ -305,4 +312,16 @@ public void stop() { IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager()); } + /** + * Wrapper class for events with the datanode origin. + */ + public static class NodeRegistrationContainerReport extends + ReportFromDatanode { + + public NodeRegistrationContainerReport(DatanodeDetails datanodeDetails, + ContainerReportsProto report) { + super(datanodeDetails, report); + } + } + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index b84f399eec..f505430032 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -176,6 +176,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl private final LeaseManager commandWatcherLeaseManager; private final ReplicationActivityStatus replicationStatus; + private final SCMChillModeManager scmChillModeManager; /** * Creates a new StorageContainerManager. Configuration will be updated @@ -231,7 +232,8 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { ContainerReportHandler containerReportHandler = new ContainerReportHandler(scmContainerManager, node2ContainerMap, replicationStatus); - + scmChillModeManager = new SCMChillModeManager(conf, + getScmContainerManager().getStateManager().getAllContainers()); PipelineActionEventHandler pipelineActionEventHandler = new PipelineActionEventHandler(); @@ -253,6 +255,8 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionEventHandler); eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler); + eventQueue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT, + scmChillModeManager); long watcherTimeout = conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, @@ -619,9 +623,9 @@ public void start() throws IOException { "server", getDatanodeProtocolServer().getDatanodeRpcAddress())); getDatanodeProtocolServer().start(); - replicationStatus.start(); httpServer.start(); scmBlockManager.start(); + replicationStatus.start(); replicationManager.start(); setStartTime(); } @@ -809,6 +813,15 @@ public Map getContainerReport() { return id2StatMap; } + public boolean isInChillMode() { + return scmChillModeManager.getInChillMode(); + } + + @VisibleForTesting + public double getCurrentContainerThreshold() { + return scmChillModeManager.getCurrentContainerThreshold(); + } + /** * Startup options. */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java new file mode 100644 index 0000000000..50d1eedbbe --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer + .NodeRegistrationContainerReport; + +/** + * Stateless helper functions for Hdds tests. + */ +public final class HddsTestUtils { + + private HddsTestUtils() { + } + + /** + * Create Command Status report object. + * + * @param numOfContainers number of containers to be included in report. + * @return CommandStatusReportsProto + */ + public static NodeRegistrationContainerReport + createNodeRegistrationContainerReport(int numOfContainers) { + return new NodeRegistrationContainerReport( + TestUtils.randomDatanodeDetails(), + TestUtils.getRandomContainerReports(numOfContainers)); + } + + /** + * Create NodeRegistrationContainerReport object. + * + * @param dnContainers List of containers to be included in report + * @return NodeRegistrationContainerReport + */ + public static NodeRegistrationContainerReport + createNodeRegistrationContainerReport(List dnContainers) { + List + containers = new ArrayList<>(); + dnContainers.forEach(c -> { + containers.add(TestUtils.getRandomContainerInfo(c.getContainerID())); + }); + return new NodeRegistrationContainerReport( + TestUtils.randomDatanodeDetails(), + TestUtils.getContainerReports(containers)); + } + + /** + * Creates list of ContainerInfo. + * + * @param numContainers number of ContainerInfo to be included in list. + * @return List + */ + public static List getContainerInfo(int numContainers) { + List containerInfoList = new ArrayList<>(); + for (int i = 0; i < numContainers; i++) { + ContainerInfo.Builder builder = new ContainerInfo.Builder(); + containerInfoList.add(builder + .setContainerID(RandomUtils.nextLong()) + .build()); + } + return containerInfoList; + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMChillModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMChillModeManager.java new file mode 100644 index 0000000000..e98a9ae1a3 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMChillModeManager.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.server; + +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +/** Test class for SCMChillModeManager. + */ +public class TestSCMChillModeManager { + + private static EventQueue queue; + private SCMChillModeManager scmChillModeManager; + private static Configuration config; + private List containers; + + @Rule + public Timeout timeout = new Timeout(1000 * 20); + + @BeforeClass + public static void setUp() { + queue = new EventQueue(); + config = new OzoneConfiguration(); + } + + @Test + public void testChillModeState() throws Exception { + // Test 1: test for 0 containers + testChillMode(0); + + // Test 2: test for 20 containers + testChillMode(20); + } + + @Test + public void testChillModeStateWithNullContainers() { + new SCMChillModeManager(config, null); + } + + private void testChillMode(int numContainers) throws Exception { + containers = new ArrayList<>(); + containers.addAll(HddsTestUtils.getContainerInfo(numContainers)); + scmChillModeManager = new SCMChillModeManager(config, containers); + queue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT, + scmChillModeManager); + assertTrue(scmChillModeManager.getInChillMode()); + queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT, + HddsTestUtils.createNodeRegistrationContainerReport(containers)); + GenericTestUtils.waitFor(() -> { + return !scmChillModeManager.getInChillMode(); + }, 100, 1000 * 5); + } + + @Test + public void testChillModeExitRule() throws Exception { + containers = new ArrayList<>(); + containers.addAll(HddsTestUtils.getContainerInfo(25 * 4)); + scmChillModeManager = new SCMChillModeManager(config, containers); + queue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT, + scmChillModeManager); + assertTrue(scmChillModeManager.getInChillMode()); + + testContainerThreshold(containers.subList(0, 25), 0.25); + assertTrue(scmChillModeManager.getInChillMode()); + testContainerThreshold(containers.subList(25, 50), 0.50); + assertTrue(scmChillModeManager.getInChillMode()); + testContainerThreshold(containers.subList(50, 75), 0.75); + assertTrue(scmChillModeManager.getInChillMode()); + testContainerThreshold(containers.subList(75, 100), 1.0); + + GenericTestUtils.waitFor(() -> { + return !scmChillModeManager.getInChillMode(); + }, 100, 1000 * 5); + } + + private void testContainerThreshold(List dnContainers, + double expectedThreshold) + throws Exception { + queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT, + HddsTestUtils.createNodeRegistrationContainerReport(dnContainers)); + GenericTestUtils.waitFor(() -> { + double threshold = scmChillModeManager.getCurrentContainerThreshold(); + return threshold == expectedThreshold; + }, 100, 2000 * 9); + } + +} \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index e11cf9bdcf..3cba83905e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -178,10 +178,25 @@ void restartHddsDatanode(DatanodeDetails dn) throws InterruptedException, void shutdownHddsDatanode(DatanodeDetails dn) throws IOException; /** - * Shutdown the MiniOzoneCluster. + * Shutdown the MiniOzoneCluster and delete the storage dirs. */ void shutdown(); + /** + * Stop the MiniOzoneCluster without any cleanup. + */ + void stop(); + + /** + * Start Scm. + */ + void startScm() throws IOException; + + /** + * Start DataNodes. + */ + void startHddsDatanodes(); + /** * Builder class for MiniOzoneCluster. */ @@ -209,6 +224,7 @@ abstract class Builder { protected int numOfOmHandlers = 20; protected int numOfScmHandlers = 20; protected int numOfDatanodes = 1; + protected boolean startDataNodes = true; protected Builder(OzoneConfiguration conf) { this.conf = conf; @@ -229,6 +245,11 @@ public Builder setClusterId(String id) { return this; } + public Builder setStartDataNodes(boolean startDataNodes) { + this.startDataNodes = startDataNodes; + return this; + } + /** * Sets the SCM id. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 7b9bb0e808..c2169a351a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.rest.OzoneException; +import org.apache.hadoop.ozone.common.Storage.StorageState; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.hdds.scm.server.SCMStorage; @@ -276,32 +277,54 @@ public void shutdown() { File baseDir = new File(GenericTestUtils.getTempPath( MiniOzoneClusterImpl.class.getSimpleName() + "-" + scm.getClientProtocolServer().getScmInfo().getClusterId())); + stop(); FileUtils.deleteDirectory(baseDir); - - if (ozoneManager != null) { - LOG.info("Shutting down the OzoneManager"); - ozoneManager.stop(); - ozoneManager.join(); - } - - if (scm != null) { - LOG.info("Shutting down the StorageContainerManager"); - scm.stop(); - scm.join(); - } - - if (!hddsDatanodes.isEmpty()) { - LOG.info("Shutting down the HddsDatanodes"); - for (HddsDatanodeService hddsDatanode : hddsDatanodes) { - hddsDatanode.stop(); - hddsDatanode.join(); - } - } } catch (IOException e) { LOG.error("Exception while shutting down the cluster.", e); } } + @Override + public void stop() { + LOG.info("Stopping the Mini Ozone Cluster"); + if (ozoneManager != null) { + LOG.info("Stopping the OzoneManager"); + ozoneManager.stop(); + ozoneManager.join(); + } + + if (scm != null) { + LOG.info("Stopping the StorageContainerManager"); + scm.stop(); + scm.join(); + } + + if (!hddsDatanodes.isEmpty()) { + LOG.info("Shutting the HddsDatanodes"); + for (HddsDatanodeService hddsDatanode : hddsDatanodes) { + hddsDatanode.stop(); + hddsDatanode.join(); + } + } + } + + /** + * Start Scm. + */ + @Override + public void startScm() throws IOException { + scm.start(); + } + + /** + * Start DataNodes. + */ + @Override + public void startHddsDatanodes() { + hddsDatanodes.forEach((datanode) -> datanode.start(null)); + } + + /** * Builder for configuring the MiniOzoneCluster to run. */ @@ -324,9 +347,13 @@ public MiniOzoneCluster build() throws IOException { scm.start(); OzoneManager om = createOM(); om.start(); - List hddsDatanodes = createHddsDatanodes(scm); - hddsDatanodes.forEach((datanode) -> datanode.start(null)); - return new MiniOzoneClusterImpl(conf, om, scm, hddsDatanodes); + final List hddsDatanodes = createHddsDatanodes(scm); + MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm, + hddsDatanodes); + if (startDataNodes) { + cluster.startHddsDatanodes(); + } + return cluster; } /** @@ -352,13 +379,30 @@ private void initializeConfiguration() throws IOException { private StorageContainerManager createSCM() throws IOException { configureSCM(); SCMStorage scmStore = new SCMStorage(conf); + initializeScmStorage(scmStore); + return StorageContainerManager.createSCM(null, conf); + } + + private void initializeScmStorage(SCMStorage scmStore) throws IOException { + if (scmStore.getState() == StorageState.INITIALIZED) { + return; + } scmStore.setClusterId(clusterId); if (!scmId.isPresent()) { scmId = Optional.of(UUID.randomUUID().toString()); } scmStore.setScmId(scmId.get()); scmStore.initialize(); - return StorageContainerManager.createSCM(null, conf); + } + + private void initializeOmStorage(OMStorage omStorage) throws IOException{ + if (omStorage.getState() == StorageState.INITIALIZED) { + return; + } + omStorage.setClusterId(clusterId); + omStorage.setScmId(scmId.get()); + omStorage.setOmId(omId.orElse(UUID.randomUUID().toString())); + omStorage.initialize(); } /** @@ -371,10 +415,7 @@ private StorageContainerManager createSCM() throws IOException { private OzoneManager createOM() throws IOException { configureOM(); OMStorage omStore = new OMStorage(conf); - omStore.setClusterId(clusterId); - omStore.setScmId(scmId.get()); - omStore.setOmId(omId.orElse(UUID.randomUUID().toString())); - omStore.initialize(); + initializeOmStorage(omStore); return OzoneManager.createOm(null, conf); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index 8762c0e9eb..1364d77e9f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -17,59 +17,63 @@ */ package org.apache.hadoop.ozone; -import java.io.IOException; - -import org.apache.commons.lang3.RandomUtils; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; -import org.apache.hadoop.hdds.scm.server.SCMStorage; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.scm.server.StorageContainerManager; -import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption; -import org.apache.hadoop.hdds.scm.block.DeletedBlockLog; -import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.ScmInfo; -import org.junit.Rule; -import org.junit.Assert; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; -import java.util.HashSet; -import java.util.Set; -import java.util.Map; -import java.util.Collections; -import java.util.UUID; -import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import com.google.common.util.concurrent.AtomicDouble; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; - +import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLog; +import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService; +import org.apache.hadoop.hdds.scm.container.ContainerMapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.server.SCMChillModeManager; +import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; +import org.apache.hadoop.hdds.scm.server.SCMStorage; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; import org.mockito.Mockito; -import org.apache.hadoop.test.GenericTestUtils; - -import static org.apache.hadoop.hdds - .HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; -import static org.junit.Assert.fail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test class that exercises the StorageContainerManager. @@ -78,6 +82,8 @@ public class TestStorageContainerManager { private static XceiverClientManager xceiverClientManager = new XceiverClientManager( new OzoneConfiguration()); + private static final Logger LOG = LoggerFactory + .getLogger(TestStorageContainerManager.class); /** * Set the timeout for every test. */ @@ -457,4 +463,92 @@ public void testScmInfo() throws Exception { Assert.assertEquals(clusterId, scmInfo.getClusterId()); Assert.assertEquals(scmId, scmInfo.getScmId()); } + + @Test + public void testSCMChillMode() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf) + .setHbInterval(1000) + .setNumDatanodes(3) + .setStartDataNodes(false) + .setHbProcessorInterval(500); + MiniOzoneClusterImpl cluster = (MiniOzoneClusterImpl) builder.build(); + // Test1: Test chill mode when there are no containers in system. + assertTrue(cluster.getStorageContainerManager().isInChillMode()); + cluster.startHddsDatanodes(); + cluster.waitForClusterToBeReady(); + assertFalse(cluster.getStorageContainerManager().isInChillMode()); + + // Test2: Test chill mode when containers are there in system. + // Create {numKeys} random names keys. + TestStorageContainerManagerHelper helper = + new TestStorageContainerManagerHelper(cluster, conf); + Map keyLocations = helper.createKeys(100*2, 4096); + final List containers = cluster.getStorageContainerManager() + .getScmContainerManager().getStateManager().getAllContainers(); + GenericTestUtils.waitFor(() -> { + return containers.size() > 10; + }, 100, 1000); + + // Removing some container to keep them open. + containers.remove(0); + containers.remove(1); + containers.remove(2); + containers.remove(3); + + // Close remaining containers + ContainerMapping mapping = (ContainerMapping) cluster + .getStorageContainerManager().getScmContainerManager(); + containers.forEach(c -> { + try { + mapping.updateContainerState(c.getContainerID(), + HddsProtos.LifeCycleEvent.FINALIZE); + mapping.updateContainerState(c.getContainerID(), + LifeCycleEvent.CLOSE); + } catch (IOException e) { + LOG.info("Failed to change state of open containers.", e); + } + }); + cluster.stop(); + + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer + .captureLogs(SCMChillModeManager.getLogger()); + logCapturer.clearOutput(); + AtomicReference miniCluster = new AtomicReference<>(); + new Thread(() -> { + try { + miniCluster.set(builder.setStartDataNodes(false).build()); + } catch (IOException e) { + fail("failed"); + } + }).start(); + + StorageContainerManager scm; + GenericTestUtils.waitFor(() -> { + return miniCluster.get() != null; + }, 100, 1000 * 3); + + scm = miniCluster.get().getStorageContainerManager(); + assertTrue(scm.isInChillMode()); + assertFalse(logCapturer.getOutput().contains("SCM exiting chill mode.")); + assertTrue(scm.getCurrentContainerThreshold() == 0); + AtomicDouble curThreshold = new AtomicDouble(); + AtomicDouble lastReportedThreshold = new AtomicDouble(); + for(HddsDatanodeService dn:miniCluster.get().getHddsDatanodes()){ + dn.start(null); + GenericTestUtils.waitFor(() -> { + curThreshold.set(scm.getCurrentContainerThreshold()); + return curThreshold.get() > lastReportedThreshold.get(); + }, 100, 1000 * 5); + lastReportedThreshold.set(curThreshold.get()); + } + double chillModeCutoff = conf + .getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT, + HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT); + assertTrue(scm.getCurrentContainerThreshold() >= chillModeCutoff); + assertTrue(logCapturer.getOutput().contains("SCM exiting chill mode.")); + assertFalse(scm.isInChillMode()); + cluster.shutdown(); + } + }