diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java index 89349761bf..cec688c1a8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java @@ -61,4 +61,8 @@ public Pipeline create(ReplicationType type, ReplicationFactor factor, List nodes) { return providers.get(type).create(factor, nodes); } + + public void shutdown() { + providers.values().forEach(provider -> provider.shutdown()); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java index bb16533751..a0ce216267 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java @@ -33,4 +33,5 @@ public interface PipelineProvider { Pipeline create(ReplicationFactor factor, List nodes); + void shutdown(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index df21420be1..d3b02e6253 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -24,17 +24,39 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.grpc.GrpcTlsConfig; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.retry.RetryPolicy; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.function.CheckedBiConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -42,10 +64,28 @@ */ public class RatisPipelineProvider implements PipelineProvider { + private static final Logger LOG = + LoggerFactory.getLogger(RatisPipelineProvider.class); + private final NodeManager nodeManager; private final PipelineStateManager stateManager; private final Configuration conf; + // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines. + private final int parallelismForPool = 3; + + private final ForkJoinPool.ForkJoinWorkerThreadFactory factory = + (pool -> { + final ForkJoinWorkerThread worker = ForkJoinPool. + defaultForkJoinWorkerThreadFactory.newThread(pool); + worker.setName("RATISCREATEPIPELINE" + worker.getPoolIndex()); + return worker; + }); + + private final ForkJoinPool forkJoinPool = new ForkJoinPool( + parallelismForPool, factory, null, false); + + RatisPipelineProvider(NodeManager nodeManager, PipelineStateManager stateManager, Configuration conf) { this.nodeManager = nodeManager; @@ -53,6 +93,7 @@ public class RatisPipelineProvider implements PipelineProvider { this.conf = conf; } + /** * Create pluggable container placement policy implementation instance. * @@ -133,7 +174,81 @@ public Pipeline create(ReplicationFactor factor, .build(); } + + @Override + public void shutdown() { + forkJoinPool.shutdownNow(); + try { + forkJoinPool.awaitTermination(60, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error("Unexpected exception occurred during shutdown of " + + "RatisPipelineProvider", e); + } + } + protected void initializePipeline(Pipeline pipeline) throws IOException { - RatisPipelineUtils.createPipeline(pipeline, conf); + final RaftGroup group = RatisHelper.newRaftGroup(pipeline); + LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); + callRatisRpc(pipeline.getNodes(), + (raftClient, peer) -> { + RaftClientReply reply = raftClient.groupAdd(group, peer.getId()); + if (reply == null || !reply.isSuccess()) { + String msg = "Pipeline initialization failed for pipeline:" + + pipeline.getId() + " node:" + peer.getId(); + LOG.error(msg); + throw new IOException(msg); + } + }); + } + + private void callRatisRpc(List datanodes, + CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc) + throws IOException { + if (datanodes.isEmpty()) { + return; + } + + final String rpcType = conf + .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf); + final List< IOException > exceptions = + Collections.synchronizedList(new ArrayList<>()); + final int maxOutstandingRequests = + HddsClientUtils.getMaxOutstandingRequests(conf); + final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new + SecurityConfig(conf)); + final TimeDuration requestTimeout = + RatisHelper.getClientRequestTimeout(conf); + try { + forkJoinPool.submit(() -> { + datanodes.parallelStream().forEach(d -> { + final RaftPeer p = RatisHelper.toRaftPeer(d); + try (RaftClient client = RatisHelper + .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, + retryPolicy, maxOutstandingRequests, tlsConfig, + requestTimeout)) { + rpc.accept(client, p); + } catch (IOException ioe) { + String errMsg = + "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid(); + LOG.error(errMsg, ioe); + exceptions.add(new IOException(errMsg, ioe)); + } + }); + }).get(); + } catch (ExecutionException | RejectedExecutionException ex) { + LOG.error(ex.getClass().getName() + " exception occurred during " + + "createPipeline", ex); + throw new IOException(ex.getClass().getName() + " exception occurred " + + "during createPipeline", ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupt exception occurred during " + + "createPipeline", ex); + } + if (!exceptions.isEmpty()) { + throw MultipleIOException.createIOException(exceptions); + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 0af34fb856..6d2f08b9ca 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -17,66 +17,37 @@ */ package org.apache.hadoop.hdds.scm.pipeline; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.security.x509.SecurityConfig; -import org.apache.hadoop.io.MultipleIOException; import org.apache.ratis.RatisHelper; import org.apache.ratis.client.RaftClient; import org.apache.ratis.grpc.GrpcTlsConfig; -import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.function.CheckedBiConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; /** * Utility class for Ratis pipelines. Contains methods to create and destroy * ratis pipelines. */ -final class RatisPipelineUtils { +public final class RatisPipelineUtils { private static final Logger LOG = LoggerFactory.getLogger(RatisPipelineUtils.class); private RatisPipelineUtils() { } - - /** - * Sends ratis command to create pipeline on all the datanodes. - * - * @param pipeline - Pipeline to be created - * @param ozoneConf - Ozone Confinuration - * @throws IOException if creation fails - */ - public static void createPipeline(Pipeline pipeline, Configuration ozoneConf) - throws IOException { - final RaftGroup group = RatisHelper.newRaftGroup(pipeline); - LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); - callRatisRpc(pipeline.getNodes(), ozoneConf, - (raftClient, peer) -> { - RaftClientReply reply = raftClient.groupAdd(group, peer.getId()); - if (reply == null || !reply.isSuccess()) { - String msg = "Pipeline initialization failed for pipeline:" - + pipeline.getId() + " node:" + peer.getId(); - LOG.error(msg); - throw new IOException(msg); - } - }); - } - /** * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all * the datanodes. @@ -125,42 +96,4 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID, client .groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId()); } - - private static void callRatisRpc(List datanodes, - Configuration ozoneConf, - CheckedBiConsumer rpc) - throws IOException { - if (datanodes.isEmpty()) { - return; - } - - final String rpcType = ozoneConf - .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); - final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); - final List exceptions = - Collections.synchronizedList(new ArrayList<>()); - final int maxOutstandingRequests = - HddsClientUtils.getMaxOutstandingRequests(ozoneConf); - final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new - SecurityConfig(ozoneConf)); - final TimeDuration requestTimeout = - RatisHelper.getClientRequestTimeout(ozoneConf); - datanodes.parallelStream().forEach(d -> { - final RaftPeer p = RatisHelper.toRaftPeer(d); - try (RaftClient client = RatisHelper - .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, - retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) { - rpc.accept(client, p); - } catch (IOException ioe) { - String errMsg = - "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid(); - LOG.error(errMsg, ioe); - exceptions.add(new IOException(errMsg, ioe)); - } - }); - if (!exceptions.isEmpty()) { - throw MultipleIOException.createIOException(exceptions); - } - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index c72a52886c..bce396b6a5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -87,7 +87,8 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager, this.lock = new ReentrantReadWriteLock(); this.conf = conf; this.stateManager = new PipelineStateManager(conf); - this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, conf); + this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, + conf); // TODO: See if thread priority needs to be set for these threads scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1); this.backgroundPipelineCreator = @@ -419,5 +420,7 @@ public void close() throws IOException { if(metrics != null) { metrics.unRegister(); } + // shutdown pipeline provider. + pipelineFactory.shutdown(); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java index 3e42df3326..ab98dfa3ed 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java @@ -72,4 +72,9 @@ public Pipeline create(ReplicationFactor factor, .setNodes(nodes) .build(); } + + @Override + public void shutdown() { + // Do nothing. + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index 2282804691..32784a31de 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -37,4 +37,9 @@ public MockRatisPipelineProvider(NodeManager nodeManager, protected void initializePipeline(Pipeline pipeline) throws IOException { // do nothing as the datanodes do not exists } + + @Override + public void shutdown() { + // Do nothing. + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestory.java similarity index 92% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestory.java index b653e7a2b9..9fd8aae0f0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestory.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; -import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.test.GenericTestUtils; @@ -40,7 +39,7 @@ /** * Tests for RatisPipelineUtils. */ -public class TestRatisPipelineUtils { +public class TestRatisPipelineCreateAndDestory { private static MiniOzoneCluster cluster; private OzoneConfiguration conf = new OzoneConfiguration(); @@ -98,11 +97,13 @@ public void testPipelineCreationOnNodeRestart() throws Exception { // try creating another pipeline now try { - RatisPipelineUtils.createPipeline(pipelines.get(0), conf); + pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); Assert.fail("pipeline creation should fail after shutting down pipeline"); } catch (IOException ioe) { - // in case the pipeline creation fails, MultipleIOException is thrown - Assert.assertTrue(ioe instanceof MultipleIOException); + // As now all datanodes are shutdown, they move to stale state, there + // will be no sufficient datanodes to create the pipeline. + Assert.assertTrue(ioe instanceof InsufficientDatanodesException); } // make sure pipelines is destroyed