diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java new file mode 100644 index 0000000000..1171dbff64 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.utils; + +import org.apache.ratis.util.function.CheckedRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * This class encapsulates ScheduledExecutorService. + */ +public class Scheduler { + + private static final Logger LOG = + LoggerFactory.getLogger(Scheduler.class); + + private ScheduledExecutorService scheduler; + + private volatile boolean isClosed; + + private String threadName; + + /** + * Creates a ScheduledExecutorService based on input arguments. + * @param threadName - thread name + * @param isDaemon - if true the threads in the scheduler are started as + * daemon + * @param numCoreThreads - number of core threads to maintain in the scheduler + */ + public Scheduler(String threadName, boolean isDaemon, int numCoreThreads) { + scheduler = Executors.newScheduledThreadPool(numCoreThreads, r -> { + Thread t = new Thread(r); + t.setName(threadName); + t.setDaemon(isDaemon); + return t; + }); + this.threadName = threadName; + isClosed = false; + } + + public void schedule(Runnable runnable, long delay, TimeUnit timeUnit) { + scheduler.schedule(runnable, delay, timeUnit); + } + + public void schedule(CheckedRunnable runnable, long delay, + TimeUnit timeUnit, Logger logger, String errMsg) { + scheduler.schedule(() -> { + try { + runnable.run(); + } catch (Throwable throwable) { + logger.error(errMsg, throwable); + } + }, delay, timeUnit); + } + + public void scheduleWithFixedDelay(Runnable runnable, long initialDelay, + long fixedDelay, TimeUnit timeUnit) { + scheduler + .scheduleWithFixedDelay(runnable, initialDelay, fixedDelay, timeUnit); + } + + public boolean isClosed() { + return isClosed; + } + + /** + * Closes the scheduler for further task submission. Any pending tasks not + * yet executed are also cancelled. For the executing tasks the scheduler + * waits 60 seconds for completion. + */ + public void close() { + isClosed = true; + scheduler.shutdownNow(); + try { + scheduler.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.info(threadName + " interrupted while waiting for task completion {}", + e); + } + scheduler = null; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java index c06a3bd3a8..6774b7f03f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java @@ -54,4 +54,10 @@ public Pipeline create(ReplicationType type, ReplicationFactor factor, List nodes) { return providers.get(type).create(factor, nodes); } + + public void close() { + for (PipelineProvider p : providers.values()) { + p.close(); + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java index 84b63757ea..610e78a690 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java @@ -32,4 +32,6 @@ public interface PipelineProvider { Pipeline create(ReplicationFactor factor) throws IOException; Pipeline create(ReplicationFactor factor, List nodes); + + void close(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index ad9d219d81..ea7d370096 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; +import org.apache.hadoop.utils.Scheduler; import java.io.IOException; import java.lang.reflect.Constructor; @@ -45,12 +46,18 @@ public class RatisPipelineProvider implements PipelineProvider { private final NodeManager nodeManager; private final PipelineStateManager stateManager; private final Configuration conf; + private static Scheduler scheduler; RatisPipelineProvider(NodeManager nodeManager, PipelineStateManager stateManager, Configuration conf) { this.nodeManager = nodeManager; this.stateManager = stateManager; this.conf = conf; + scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1); + } + + static Scheduler getScheduler() { + return scheduler; } /** @@ -135,4 +142,9 @@ public Pipeline create(ReplicationFactor factor, private void initializePipeline(Pipeline pipeline) throws IOException { RatisPipelineUtils.createPipeline(pipeline, conf); } + + @Override + public void close() { + scheduler.close(); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 3029f70aae..201c034665 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -34,8 +34,6 @@ import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.util.function.CheckedBiConsumer; -import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.TimeoutScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,8 +50,6 @@ */ public final class RatisPipelineUtils { - private static TimeoutScheduler timeoutScheduler = - TimeoutScheduler.newInstance(1); private static AtomicBoolean isPipelineCreatorRunning = new AtomicBoolean(false); @@ -127,12 +123,11 @@ public static void finalizeAndDestroyPipeline(PipelineManager pipelineManager, .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); - TimeDuration timeoutDuration = TimeDuration - .valueOf(pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS); - timeoutScheduler.onTimeout(timeoutDuration, - () -> destroyPipeline(pipelineManager, pipeline, ozoneConf), LOG, - () -> String.format("Destroy pipeline failed for pipeline:%s with %s", - pipeline.getId(), group)); + RatisPipelineProvider.getScheduler() + .schedule(() -> destroyPipeline(pipelineManager, pipeline, ozoneConf), + pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG, String + .format("Destroy pipeline failed for pipeline:%s with %s", + pipeline.getId(), group)); } else { destroyPipeline(pipelineManager, pipeline, ozoneConf); } @@ -213,22 +208,12 @@ public static void scheduleFixedIntervalPipelineCreator( ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); // TODO: #CLUTIL We can start the job asap - TimeDuration timeDuration = - TimeDuration.valueOf(intervalInMillis, TimeUnit.MILLISECONDS); - timeoutScheduler.onTimeout(timeDuration, - () -> fixedIntervalPipelineCreator(pipelineManager, conf, - timeDuration), LOG, - () -> "FixedIntervalPipelineCreatorJob failed."); - } - - private static void fixedIntervalPipelineCreator( - PipelineManager pipelineManager, Configuration conf, - TimeDuration timeDuration) { - timeoutScheduler.onTimeout(timeDuration, - () -> fixedIntervalPipelineCreator(pipelineManager, conf, - timeDuration), LOG, - () -> "FixedIntervalPipelineCreatorJob failed."); - triggerPipelineCreation(pipelineManager, conf, 0); + RatisPipelineProvider.getScheduler().scheduleWithFixedDelay(() -> { + if (!isPipelineCreatorRunning.compareAndSet(false, true)) { + return; + } + createPipelines(pipelineManager, conf); + }, intervalInMillis, intervalInMillis, TimeUnit.MILLISECONDS); } /** @@ -246,10 +231,9 @@ public static void triggerPipelineCreation(PipelineManager pipelineManager, if (!isPipelineCreatorRunning.compareAndSet(false, true)) { return; } - timeoutScheduler - .onTimeout(TimeDuration.valueOf(afterMillis, TimeUnit.MILLISECONDS), - () -> createPipelines(pipelineManager, conf), LOG, - () -> "PipelineCreation failed."); + RatisPipelineProvider.getScheduler() + .schedule(() -> createPipelines(pipelineManager, conf), afterMillis, + TimeUnit.MILLISECONDS); } private static void createPipelines(PipelineManager pipelineManager, @@ -261,13 +245,18 @@ private static void createPipelines(PipelineManager pipelineManager, for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor .values()) { - try { - pipelineManager.createPipeline(type, factor); - } catch (IOException ioe) { - break; - } catch (Throwable t) { - LOG.error("Error while creating pipelines {}", t); - break; + while (true) { + try { + if (RatisPipelineProvider.getScheduler().isClosed()) { + break; + } + pipelineManager.createPipeline(type, factor); + } catch (IOException ioe) { + break; + } catch (Throwable t) { + LOG.error("Error while creating pipelines {}", t); + break; + } } } isPipelineCreatorRunning.set(false); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 176b3747d2..193d98f921 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -267,6 +267,10 @@ public void removePipeline(PipelineID pipelineID) throws IOException { @Override public void close() throws IOException { + if (pipelineFactory != null) { + pipelineFactory.close(); + } + if (pipelineStore != null) { pipelineStore.close(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java index 3e42df3326..b92f17e552 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java @@ -72,4 +72,9 @@ public Pipeline create(ReplicationFactor factor, .setNodes(nodes) .build(); } + + @Override + public void close() { + // Nothing to do in here. + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java index abc30cdc4c..e56b888842 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java @@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; import org.junit.Test; import java.util.List; @@ -51,6 +52,11 @@ public void init(int numDatanodes) throws Exception { pipelineManager = scm.getPipelineManager(); } + @After + public void cleanup() { + cluster.shutdown(); + } + @Test(timeout = 30000) public void testAutomaticPipelineCreationOnPipelineDestroy() throws Exception { @@ -90,6 +96,6 @@ private void waitForPipelines(int numPipelines) GenericTestUtils.waitFor(() -> pipelineManager .getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) - .size() == numPipelines, 100, 10000); + .size() == numPipelines, 100, 20000); } }