diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 499f94d2ba..f0db7b5f41 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -20,6 +20,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.shaded.com.google.protobuf .InvalidProtocolBufferException; import org.apache.hadoop.conf.Configuration; @@ -35,15 +37,17 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.util.CheckedBiConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -65,50 +69,48 @@ public static XceiverClientRatis newXceiverClientRatis( ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); final int maxOutstandingRequests = HddsClientUtils.getMaxOutstandingRequests(ozoneConf); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); return new XceiverClientRatis(pipeline, - SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests); + SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests, + retryPolicy); } private final Pipeline pipeline; private final RpcType rpcType; private final AtomicReference client = new AtomicReference<>(); private final int maxOutstandingRequests; + private final RetryPolicy retryPolicy; /** * Constructs a client. */ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, - int maxOutStandingChunks) { + int maxOutStandingChunks, RetryPolicy retryPolicy) { super(); this.pipeline = pipeline; this.rpcType = rpcType; this.maxOutstandingRequests = maxOutStandingChunks; + this.retryPolicy = retryPolicy; } /** * {@inheritDoc} */ - public void createPipeline() - throws IOException { - RaftGroupId groupId = pipeline.getId().getRaftGroupID(); - RaftGroup group = RatisHelper.newRaftGroup(groupId, pipeline.getMachines()); - LOG.debug("initializing pipeline:{} with nodes:{}", - pipeline.getId(), group.getPeers()); - reinitialize(pipeline.getMachines(), RatisHelper.emptyRaftGroup(), group); + public void createPipeline() throws IOException { + final RaftGroup group = RatisHelper.newRaftGroup(pipeline); + LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); + callRatisRpc(pipeline.getMachines(), + (raftClient, peer) -> raftClient.groupAdd(group, peer.getId())); } /** * {@inheritDoc} */ - public void destroyPipeline() - throws IOException { - RaftGroupId groupId = pipeline.getId().getRaftGroupID(); - RaftGroup currentGroup = - RatisHelper.newRaftGroup(groupId, pipeline.getMachines()); - LOG.debug("destroying pipeline:{} with nodes:{}", - pipeline.getId(), currentGroup.getPeers()); - reinitialize(pipeline.getMachines(), currentGroup, - RatisHelper.emptyRaftGroup()); + public void destroyPipeline() throws IOException { + final RaftGroup group = RatisHelper.newRaftGroup(pipeline); + LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); + callRatisRpc(pipeline.getMachines(), (raftClient, peer) -> raftClient + .groupRemove(group.getGroupId(), peer.getId())); } /** @@ -121,51 +123,28 @@ public HddsProtos.ReplicationType getPipelineType() { return HddsProtos.ReplicationType.RATIS; } - private void reinitialize(List datanodes, RaftGroup oldGroup, - RaftGroup newGroup) throws IOException { + private void callRatisRpc(List datanodes, + CheckedBiConsumer rpc) + throws IOException { if (datanodes.isEmpty()) { return; } - IOException exception = null; - for (DatanodeDetails d : datanodes) { - try { - reinitialize(d, oldGroup, newGroup); + final List exceptions = + Collections.synchronizedList(new ArrayList<>()); + datanodes.parallelStream().forEach(d -> { + final RaftPeer p = RatisHelper.toRaftPeer(d); + try (RaftClient client = RatisHelper + .newRaftClient(rpcType, p, retryPolicy)) { + rpc.accept(client, p); } catch (IOException ioe) { - if (exception == null) { - exception = new IOException( - "Failed to reinitialize some of the RaftPeer(s)", ioe); - } else { - exception.addSuppressed(ioe); - } + exceptions.add( + new IOException("Failed invoke Ratis rpc " + rpc + " for " + d, + ioe)); } - } - if (exception != null) { - throw exception; - } - } - - /** - * Adds a new peers to the Ratis Ring. - * - * @param datanode - new datanode - * @param oldGroup - previous Raft group - * @param newGroup - new Raft group - * @throws IOException - on Failure. - */ - private void reinitialize(DatanodeDetails datanode, RaftGroup oldGroup, - RaftGroup newGroup) - throws IOException { - final RaftPeer p = RatisHelper.toRaftPeer(datanode); - try (RaftClient client = oldGroup == RatisHelper.emptyRaftGroup() ? - RatisHelper.newRaftClient(rpcType, p) : - RatisHelper.newRaftClient(rpcType, p, oldGroup)) { - client.reinitialize(newGroup, p.getId()); - } catch (IOException ioe) { - LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ", - p, datanode, ioe); - throw new IOException("Failed to reinitialize RaftPeer " + p - + "(datanode=" + datanode + ")", ioe); + }); + if (!exceptions.isEmpty()) { + throw MultipleIOException.createIOException(exceptions); } } @@ -183,7 +162,7 @@ public void connect() throws Exception { // maxOutstandingRequests so as to set the upper bound on max no of async // requests to be handled by raft client if (!client.compareAndSet(null, - RatisHelper.newRaftClient(rpcType, getPipeline()))) { + RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy))) { throw new IllegalStateException("Client is already connected."); } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 22ba71409d..5b25779006 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -80,6 +80,12 @@ public final class ScmConfigKeys { public static final TimeDuration DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); + public static final String + DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY = + "dfs.ratis.leader.election.minimum.timeout.duration"; + public static final TimeDuration + DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT = + TimeDuration.valueOf(1, TimeUnit.SECONDS); public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY = "dfs.ratis.server.failure.duration"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index f07d59955d..54ec139225 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -242,6 +242,12 @@ public final class OzoneConfigKeys { public static final TimeDuration DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT = ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT; + public static final String + DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY = + ScmConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY; + public static final TimeDuration + DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT = + ScmConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT; public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY = ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY; diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java index 48fdd6477d..d851992c42 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java @@ -18,6 +18,7 @@ package org.apache.ratis; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -28,10 +29,13 @@ import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +45,7 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -116,30 +121,34 @@ static RaftGroup newRaftGroup(RaftGroupId groupId, } static RaftGroup newRaftGroup(Pipeline pipeline) { - return newRaftGroup(toRaftPeers(pipeline)); + return new RaftGroup(pipeline.getId().getRaftGroupID(), + toRaftPeers(pipeline)); } - static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) { + static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline, + RetryPolicy retryPolicy) { return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()), - newRaftGroup(pipeline.getId().getRaftGroupID(), - pipeline.getMachines())); - } - - static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) { - return newRaftClient(rpcType, leader.getId(), - newRaftGroup(new ArrayList<>(Arrays.asList(leader)))); + newRaftGroup(pipeline.getId().getRaftGroupID(), pipeline.getMachines()), + retryPolicy); } static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, - RaftGroup group) { - return newRaftClient(rpcType, leader.getId(), group); + RetryPolicy retryPolicy) { + return newRaftClient(rpcType, leader.getId(), + newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy); } - static RaftClient newRaftClient( - RpcType rpcType, RaftPeerId leader, RaftGroup group) { + static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, + RaftGroup group, RetryPolicy retryPolicy) { + return newRaftClient(rpcType, leader.getId(), group, retryPolicy); + } + + static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader, + RaftGroup group, RetryPolicy retryPolicy) { LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group); final RaftProperties properties = new RaftProperties(); RaftConfigKeys.Rpc.setType(properties, rpcType); + GrpcConfigKeys.setMessageSizeMax(properties, SizeInBytes.valueOf(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)); @@ -147,6 +156,22 @@ static RaftClient newRaftClient( .setRaftGroup(group) .setLeaderId(leader) .setProperties(properties) + .setRetryPolicy(retryPolicy) .build(); } + + static RetryPolicy createRetryPolicy(Configuration conf) { + int maxRetryCount = + conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys. + OZONE_CLIENT_MAX_RETRIES_DEFAULT); + long retryInterval = conf.getTimeDuration(OzoneConfigKeys. + OZONE_CLIENT_RETRY_INTERVAL, OzoneConfigKeys. + OZONE_CLIENT_RETRY_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS.MILLISECONDS); + TimeDuration sleepDuration = + TimeDuration.valueOf(retryInterval, TimeUnit.MILLISECONDS); + RetryPolicy retryPolicy = RetryPolicies + .retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration); + return retryPolicy; + } } diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 6afc8708e9..e160f25720 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -163,6 +163,14 @@ OZONE, RATIS, MANAGEMENT The timeout duration for ratis server request. + + dfs.ratis.leader.election.minimum.timeout.duration + 1s + OZONE, RATIS, MANAGEMENT + The minimum timeout duration for ratis leader election. + Default is 1s. + + dfs.ratis.server.failure.duration 120s diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 38d826b3e2..a57997d189 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -182,18 +182,30 @@ private RaftProperties newRaftProperties(Configuration conf, RaftServerConfigKeys.Rpc .setRequestTimeout(properties, serverRequestTimeout); + // Set the ratis leader election timeout + TimeUnit leaderElectionMinTimeoutUnit = + OzoneConfigKeys. + DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT + .getUnit(); + duration = conf.getTimeDuration( + OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, + OzoneConfigKeys. + DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT + .getDuration(), leaderElectionMinTimeoutUnit); + final TimeDuration leaderElectionMinTimeout = + TimeDuration.valueOf(duration, leaderElectionMinTimeoutUnit); + RaftServerConfigKeys.Rpc + .setTimeoutMin(properties, leaderElectionMinTimeout); + long leaderElectionMaxTimeout = + leaderElectionMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200; + RaftServerConfigKeys.Rpc.setTimeoutMax(properties, + TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS)); // Enable batch append on raft server RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true); // Set the maximum cache segments RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2); - // Set the ratis leader election timeout - RaftServerConfigKeys.Rpc.setTimeoutMin(properties, - TimeDuration.valueOf(800, TimeUnit.MILLISECONDS)); - RaftServerConfigKeys.Rpc.setTimeoutMax(properties, - TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS)); - // set the node failure timeout timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT .getUnit(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java index e27de3d335..1cb2cda87d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java @@ -20,10 +20,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.client.rpc.RpcClient; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.ozone.client.rest.OzoneException; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.slf4j.Logger; @@ -104,4 +108,13 @@ static MiniOzoneCluster newMiniOzoneCluster( cluster.waitForClusterToBeReady(); return cluster; } + + static void initXceiverServerRatis( + RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException { + final RaftPeer p = RatisHelper.toRaftPeer(dd); + final OzoneConfiguration conf = new OzoneConfiguration(); + final RaftClient client = + RatisHelper.newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf)); + client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId()); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java index b53e683326..2c94f3bbda 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java @@ -46,12 +46,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.ratis.RatisHelper; -import org.apache.ratis.rpc.RpcType; import static org.apache.ratis.rpc.SupportedRpcType.GRPC; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.util.CheckedBiConsumer; import java.util.function.BiConsumer; @@ -77,7 +72,7 @@ public void testContainerStateMachineMetrics() throws Exception { (pipeline, conf) -> RatisTestHelper.initRatisConf(GRPC, conf), XceiverClientRatis::newXceiverClientRatis, TestCSMMetrics::newXceiverServerRatis, - (dn, p) -> initXceiverServerRatis(GRPC, dn, p)); + (dn, p) -> RatisTestHelper.initXceiverServerRatis(GRPC, dn, p)); } static void runContainerStateMachineMetrics( @@ -160,15 +155,6 @@ static XceiverServerRatis newXceiverServerRatis( null); } - static void initXceiverServerRatis( - RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException { - final RaftPeer p = RatisHelper.toRaftPeer(dd); - final RaftClient client = RatisHelper.newRaftClient(rpc, p); - RaftGroupId groupId = pipeline.getId().getRaftGroupID(); - client.reinitialize(RatisHelper.newRaftGroup(groupId, - pipeline.getMachines()), p.getId()); - } - private static class TestContainerDispatcher implements ContainerDispatcher { /** * Dispatches commands to container layer. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java index 3abc8f861d..b89814e4f3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java @@ -46,9 +46,6 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.ratis.RatisHelper; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.util.CheckedBiConsumer; import org.junit.Assert; @@ -142,21 +139,13 @@ static XceiverServerRatis newXceiverServerRatis( .newXceiverServerRatis(dn, conf, dispatcher, null); } - static void initXceiverServerRatis( - RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException { - final RaftPeer p = RatisHelper.toRaftPeer(dd); - final RaftClient client = RatisHelper.newRaftClient(rpc, p); - client.reinitialize(RatisHelper.newRaftGroup(pipeline), p.getId()); - } - - static void runTestClientServerRatis(RpcType rpc, int numNodes) throws Exception { runTestClientServer(numNodes, (pipeline, conf) -> RatisTestHelper.initRatisConf(rpc, conf), XceiverClientRatis::newXceiverClientRatis, TestContainerServer::newXceiverServerRatis, - (dn, p) -> initXceiverServerRatis(rpc, dn, p)); + (dn, p) -> RatisTestHelper.initXceiverServerRatis(rpc, dn, p)); } static void runTestClientServer( diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 2fdf0688df..20a27232be 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -101,7 +101,7 @@ 1.0.0-M33 - 0.3.0-e6fd494-SNAPSHOT + 0.3.0-50588bd-SNAPSHOT 1.0-alpha-1 3.3.1 2.4.12