diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/RatisManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/RatisManager.java new file mode 100644 index 0000000000..ab168c727e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/RatisManager.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.ratis; + + +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; + +import java.io.IOException; +import java.util.List; + +/** + * Manage Ratis clusters. + */ +public interface RatisManager { + /** + * Create a new Ratis cluster with the given clusterId and datanodes. + */ + void createRatisCluster(String clusterId, List datanodes) + throws IOException; + + /** + * Close the Ratis cluster with the given clusterId. + */ + void closeRatisCluster(String clusterId) throws IOException; + + /** + * @return the datanode list of the Ratis cluster with the given clusterId. + */ + List getDatanodes(String clusterId) throws IOException; + + /** + * Update the datanode list of the Ratis cluster with the given clusterId. + */ + void updateDatanodes(String clusterId, List newDatanodes) + throws IOException; + + static RatisManager newRatisManager(OzoneConfiguration conf) { + final String rpc = conf.get( + OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + return new RatisManagerImpl(rpc); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/RatisManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/RatisManagerImpl.java new file mode 100644 index 0000000000..c3560b6066 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/RatisManagerImpl.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.ratis; + + +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.util.CheckedRunnable; +import org.apache.ratis.util.CheckedSupplier; +import org.apache.ratis.util.LifeCycle; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * Implementation of {@link RatisManager}. + */ +public class RatisManagerImpl implements RatisManager { + static final RaftPeer[] EMPTY_RARTPEER_ARRAY = {}; + + static final class RatisCluster { + private final String clusterId; + private final LifeCycle state; + private List datanodes; + + private RatisCluster(String clusterId, List datanodes) { + this.clusterId = clusterId; + this.state = new LifeCycle(toString()); + this.datanodes = Collections.unmodifiableList(new ArrayList<>(datanodes)); + } + + synchronized List getDatanodes() { + return datanodes; + } + + synchronized void setDatanodes( + CheckedSupplier, IOException> update) + throws IOException { + state.assertCurrentState(LifeCycle.State.RUNNING); + datanodes = Collections.unmodifiableList(update.get()); + } + + synchronized void init(CheckedRunnable init) + throws IOException { + state.startAndTransition(() -> init.run()); + } + + synchronized void close(CheckedRunnable close) + throws IOException { + state.checkStateAndClose(() -> close.run()); + } + + @Override + public String toString() { + return getClass().getSimpleName() + ":" + clusterId; + } + } + + static final class RatisInfo { + private final RaftPeer peer; + + private RatisInfo(DatanodeID datanode) { + this.peer = RatisHelper.toRaftPeer(datanode); + } + + RaftPeer getPeer() { + return peer; + } + } + + private final RpcType rpcType; + private final Map clusters = new ConcurrentHashMap<>(); + private final Map infos = new ConcurrentHashMap<>(); + + RatisManagerImpl(String rpc) { + rpcType = SupportedRpcType.valueOfIgnoreCase(rpc); + } + + private RaftPeer getRaftPeer(DatanodeID datanode) { + return infos.computeIfAbsent(datanode, RatisInfo::new).getPeer(); + } + + @Override + public void createRatisCluster(String clusterId, List datanodes) + throws IOException { + final RatisCluster cluster = new RatisCluster(clusterId, datanodes); + final RatisCluster returned = clusters.putIfAbsent(clusterId, cluster); + if (returned != null) { + throw new IOException("Cluster " + clusterId + " already exists."); + } + + final RaftPeer[] newPeers = datanodes.stream().map(this::getRaftPeer) + .toArray(RaftPeer[]::new); + cluster.init(() -> reinitialize(datanodes, newPeers)); + } + + private void reinitialize(List datanodes, RaftPeer[] newPeers) + throws IOException { + if (datanodes.isEmpty()) { + return; + } + + IOException exception = null; + for (DatanodeID d : datanodes) { + try { + reinitialize(d, newPeers); + } catch (IOException ioe) { + if (exception == null) { + exception = new IOException( + "Failed to reinitialize some of the RaftPeer(s)", ioe); + } else { + exception.addSuppressed(ioe); + } + } + } + if (exception != null) { + throw exception; + } + } + + private void reinitialize(DatanodeID datanode, RaftPeer[] newPeers) + throws IOException { + final RaftPeer p = getRaftPeer(datanode); + try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) { + client.reinitialize(newPeers, p.getId()); + } catch (IOException ioe) { + throw new IOException("Failed to reinitialize RaftPeer " + p + + "(datanode=" + datanode + ")", ioe); + } + } + + @Override + public void closeRatisCluster(String clusterId) throws IOException { + final RatisCluster c = clusters.get(clusterId); + if (c == null) { + throw new IOException("Cluster " + clusterId + " not found."); + } + c.close(() -> reinitialize(c.getDatanodes(), EMPTY_RARTPEER_ARRAY)); + } + + @Override + public List getDatanodes(String clusterId) throws IOException { + return clusters.get(clusterId).getDatanodes(); + } + + @Override + public void updateDatanodes(String clusterId, List newDNs) + throws IOException { + final RatisCluster c = clusters.get(clusterId); + c.setDatanodes(() -> { + final List oldDNs = c.getDatanodes(); + final RaftPeer[] newPeers = newDNs.stream().map(this::getRaftPeer) + .toArray(RaftPeer[]::new); + try (RaftClient client = newRaftClient(oldDNs)) { + client.setConfiguration(newPeers); + } + + final List notInOld = newDNs.stream().filter(oldDNs::contains) + .collect(Collectors.toList()); + reinitialize(notInOld, newPeers); + return newDNs; + }); + } + + private RaftClient newRaftClient(List datanodes) + throws IOException { + final List peers = datanodes.stream().map(this::getRaftPeer) + .collect(Collectors.toList()); + return RatisHelper.newRaftClient(rpcType, peers.get(0).getId(), peers); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/package-info.java new file mode 100644 index 0000000000..27fd32b851 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.scm.ratis; + +/** + * This package contains classes related to Apache Ratis for SCM. + */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java index 3adb881da1..f77e731d45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java @@ -18,23 +18,23 @@ package org.apache.hadoop.ozone.container.ozoneimpl; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.RatisTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.scm.ratis.RatisManager; import org.apache.hadoop.ozone.web.utils.OzoneUtils; -import org.apache.ratis.RatisHelper; import org.apache.hadoop.scm.XceiverClientRatis; import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.util.CheckedBiConsumer; import org.apache.ratis.util.CollectionUtils; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -93,25 +93,29 @@ public class TestOzoneContainerRatis { .setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL) .numDataNodes(numNodes) .build(); - cluster.waitOzoneReady(); - - final String containerName = OzoneUtils.getRequestID(); - final List datanodes = cluster.getDataNodes(); - final Pipeline pipeline = ContainerTestHelper.createPipeline(containerName, - CollectionUtils.as(datanodes, DataNode::getDatanodeId)); - - LOG.info("pipeline=" + pipeline); - // Create Ratis cluster - final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline); - for(RaftPeer p : peers) { - final RaftClient client = RatisHelper.newRaftClient(rpc, p); - client.reinitialize(peers, p.getId()); - } - - LOG.info("reinitialize done"); - final XceiverClientSpi client = XceiverClientRatis.newXceiverClientRatis( - pipeline, conf); try { + cluster.waitOzoneReady(); + + final String containerName = OzoneUtils.getRequestID(); + final List datanodes = cluster.getDataNodes(); + final Pipeline pipeline = ContainerTestHelper.createPipeline( + containerName, + CollectionUtils.as(datanodes, DataNode::getDatanodeId)); + LOG.info("pipeline=" + pipeline); + + // Create Ratis cluster + final String ratisId = "ratis1"; + final RatisManager manager = RatisManager.newRatisManager(conf); + manager.createRatisCluster(ratisId, pipeline.getMachines()); + LOG.info("Created RatisCluster " + ratisId); + + // check Ratis cluster members + final List dns = manager.getDatanodes(ratisId); + Assert.assertEquals(pipeline.getMachines(), dns); + + // run test + final XceiverClientSpi client = XceiverClientRatis.newXceiverClientRatis( + pipeline, conf); test.accept(containerName, client); } finally { cluster.shutdown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestRatisManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestRatisManager.java new file mode 100644 index 0000000000..d53cbfba06 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestRatisManager.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.ozoneimpl; + +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.RatisTestHelper; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.scm.ratis.RatisManager; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +/** + * Tests ozone containers with Apache Ratis. + */ +public class TestRatisManager { + private static final Logger LOG = LoggerFactory.getLogger( + TestRatisManager.class); + + static OzoneConfiguration newOzoneConfiguration() { + final OzoneConfiguration conf = new OzoneConfiguration(); + ContainerTestHelper.setOzoneLocalStorageRoot( + TestRatisManager.class, conf); + return conf; + } + + + /** Set the timeout for every test. */ + @Rule + public Timeout testTimeout = new Timeout(200_000); + + @Test + public void testTestRatisManagerGrpc() throws Exception { + runTestRatisManager(SupportedRpcType.GRPC); + } + + @Test + public void testTestRatisManagerNetty() throws Exception { + runTestRatisManager(SupportedRpcType.NETTY); + } + + private static void runTestRatisManager(RpcType rpc) throws Exception { + LOG.info("runTestRatisManager, rpc=" + rpc); + + // create Ozone clusters + final OzoneConfiguration conf = newOzoneConfiguration(); + RatisTestHelper.initRatisConf(rpc, conf); + final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf) + .setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL) + .numDataNodes(5) + .build(); + try { + cluster.waitOzoneReady(); + + final List datanodes = cluster.getDataNodes(); + final List allIds = datanodes.stream() + .map(DataNode::getDatanodeId).collect(Collectors.toList()); + + final RatisManager manager = RatisManager.newRatisManager(conf); + + final int[] idIndex = {3, 4, 5}; + for (int i = 0; i < idIndex.length; i++) { + final int previous = i == 0 ? 0 : idIndex[i - 1]; + final List subIds = allIds.subList(previous, idIndex[i]); + + // Create Ratis cluster + final String ratisId = "ratis" + i; + manager.createRatisCluster(ratisId, subIds); + LOG.info("Created RatisCluster " + ratisId); + + // check Ratis cluster members + final List dns = manager.getDatanodes(ratisId); + Assert.assertEquals(subIds, dns); + } + + // randomly close two of the clusters + final int chosen = ThreadLocalRandom.current().nextInt(idIndex.length); + LOG.info("chosen = " + chosen); + + for (int i = 0; i < idIndex.length; i++) { + if (i != chosen) { + final String ratisId = "ratis" + i; + manager.closeRatisCluster(ratisId); + } + } + + // update datanodes + final String ratisId = "ratis" + chosen; + manager.updateDatanodes(ratisId, allIds); + + // check Ratis cluster members + final List dns = manager.getDatanodes(ratisId); + Assert.assertEquals(allIds, dns); + } finally { + cluster.shutdown(); + } + } + +}