HDDS-233. Update ozone to latest ratis snapshot build(0.3.0-50588bd-SNAPSHOT). Contributed by Shashikant Banerjee.

This commit is contained in:
Mukul Kumar Singh 2018-09-13 19:00:55 +05:30
parent f4bda5e8e9
commit c9e0b69ab3
10 changed files with 130 additions and 106 deletions

View File

@ -20,6 +20,8 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.shaded.com.google.protobuf
.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
@ -35,15 +37,17 @@
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.util.CheckedBiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@ -65,50 +69,48 @@ public static XceiverClientRatis newXceiverClientRatis(
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests);
SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
retryPolicy);
}
private final Pipeline pipeline;
private final RpcType rpcType;
private final AtomicReference<RaftClient> client = new AtomicReference<>();
private final int maxOutstandingRequests;
private final RetryPolicy retryPolicy;
/**
* Constructs a client.
*/
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
int maxOutStandingChunks) {
int maxOutStandingChunks, RetryPolicy retryPolicy) {
super();
this.pipeline = pipeline;
this.rpcType = rpcType;
this.maxOutstandingRequests = maxOutStandingChunks;
this.retryPolicy = retryPolicy;
}
/**
* {@inheritDoc}
*/
public void createPipeline()
throws IOException {
RaftGroupId groupId = pipeline.getId().getRaftGroupID();
RaftGroup group = RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
LOG.debug("initializing pipeline:{} with nodes:{}",
pipeline.getId(), group.getPeers());
reinitialize(pipeline.getMachines(), RatisHelper.emptyRaftGroup(), group);
public void createPipeline() throws IOException {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
callRatisRpc(pipeline.getMachines(),
(raftClient, peer) -> raftClient.groupAdd(group, peer.getId()));
}
/**
* {@inheritDoc}
*/
public void destroyPipeline()
throws IOException {
RaftGroupId groupId = pipeline.getId().getRaftGroupID();
RaftGroup currentGroup =
RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
LOG.debug("destroying pipeline:{} with nodes:{}",
pipeline.getId(), currentGroup.getPeers());
reinitialize(pipeline.getMachines(), currentGroup,
RatisHelper.emptyRaftGroup());
public void destroyPipeline() throws IOException {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
callRatisRpc(pipeline.getMachines(), (raftClient, peer) -> raftClient
.groupRemove(group.getGroupId(), peer.getId()));
}
/**
@ -121,51 +123,28 @@ public HddsProtos.ReplicationType getPipelineType() {
return HddsProtos.ReplicationType.RATIS;
}
private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup oldGroup,
RaftGroup newGroup) throws IOException {
private void callRatisRpc(List<DatanodeDetails> datanodes,
CheckedBiConsumer<RaftClient, RaftPeer, IOException> rpc)
throws IOException {
if (datanodes.isEmpty()) {
return;
}
IOException exception = null;
for (DatanodeDetails d : datanodes) {
try {
reinitialize(d, oldGroup, newGroup);
final List<IOException> exceptions =
Collections.synchronizedList(new ArrayList<>());
datanodes.parallelStream().forEach(d -> {
final RaftPeer p = RatisHelper.toRaftPeer(d);
try (RaftClient client = RatisHelper
.newRaftClient(rpcType, p, retryPolicy)) {
rpc.accept(client, p);
} catch (IOException ioe) {
if (exception == null) {
exception = new IOException(
"Failed to reinitialize some of the RaftPeer(s)", ioe);
} else {
exception.addSuppressed(ioe);
}
exceptions.add(
new IOException("Failed invoke Ratis rpc " + rpc + " for " + d,
ioe));
}
}
if (exception != null) {
throw exception;
}
}
/**
* Adds a new peers to the Ratis Ring.
*
* @param datanode - new datanode
* @param oldGroup - previous Raft group
* @param newGroup - new Raft group
* @throws IOException - on Failure.
*/
private void reinitialize(DatanodeDetails datanode, RaftGroup oldGroup,
RaftGroup newGroup)
throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(datanode);
try (RaftClient client = oldGroup == RatisHelper.emptyRaftGroup() ?
RatisHelper.newRaftClient(rpcType, p) :
RatisHelper.newRaftClient(rpcType, p, oldGroup)) {
client.reinitialize(newGroup, p.getId());
} catch (IOException ioe) {
LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ",
p, datanode, ioe);
throw new IOException("Failed to reinitialize RaftPeer " + p
+ "(datanode=" + datanode + ")", ioe);
});
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}
@ -183,7 +162,7 @@ public void connect() throws Exception {
// maxOutstandingRequests so as to set the upper bound on max no of async
// requests to be handled by raft client
if (!client.compareAndSet(null,
RatisHelper.newRaftClient(rpcType, getPipeline()))) {
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy))) {
throw new IllegalStateException("Client is already connected.");
}
}

View File

@ -80,6 +80,12 @@ public final class ScmConfigKeys {
public static final TimeDuration
DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT =
TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
public static final String
DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY =
"dfs.ratis.leader.election.minimum.timeout.duration";
public static final TimeDuration
DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT =
TimeDuration.valueOf(1, TimeUnit.SECONDS);
public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
"dfs.ratis.server.failure.duration";

View File

@ -242,6 +242,12 @@ public final class OzoneConfigKeys {
public static final TimeDuration
DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT =
ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT;
public static final String
DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY =
ScmConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY;
public static final TimeDuration
DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT =
ScmConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT;
public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY;

View File

@ -18,6 +18,7 @@
package org.apache.ratis;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -28,10 +29,13 @@
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,6 +45,7 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@ -116,30 +121,34 @@ static RaftGroup newRaftGroup(RaftGroupId groupId,
}
static RaftGroup newRaftGroup(Pipeline pipeline) {
return newRaftGroup(toRaftPeers(pipeline));
return new RaftGroup(pipeline.getId().getRaftGroupID(),
toRaftPeers(pipeline));
}
static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) {
static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
RetryPolicy retryPolicy) {
return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()),
newRaftGroup(pipeline.getId().getRaftGroupID(),
pipeline.getMachines()));
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) {
return newRaftClient(rpcType, leader.getId(),
newRaftGroup(new ArrayList<>(Arrays.asList(leader))));
newRaftGroup(pipeline.getId().getRaftGroupID(), pipeline.getMachines()),
retryPolicy);
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RaftGroup group) {
return newRaftClient(rpcType, leader.getId(), group);
RetryPolicy retryPolicy) {
return newRaftClient(rpcType, leader.getId(),
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy);
}
static RaftClient newRaftClient(
RpcType rpcType, RaftPeerId leader, RaftGroup group) {
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RaftGroup group, RetryPolicy retryPolicy) {
return newRaftClient(rpcType, leader.getId(), group, retryPolicy);
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
RaftGroup group, RetryPolicy retryPolicy) {
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
final RaftProperties properties = new RaftProperties();
RaftConfigKeys.Rpc.setType(properties, rpcType);
GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE));
@ -147,6 +156,22 @@ static RaftClient newRaftClient(
.setRaftGroup(group)
.setLeaderId(leader)
.setProperties(properties)
.setRetryPolicy(retryPolicy)
.build();
}
static RetryPolicy createRetryPolicy(Configuration conf) {
int maxRetryCount =
conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
OZONE_CLIENT_MAX_RETRIES_DEFAULT);
long retryInterval = conf.getTimeDuration(OzoneConfigKeys.
OZONE_CLIENT_RETRY_INTERVAL, OzoneConfigKeys.
OZONE_CLIENT_RETRY_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS.MILLISECONDS);
TimeDuration sleepDuration =
TimeDuration.valueOf(retryInterval, TimeUnit.MILLISECONDS);
RetryPolicy retryPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration);
return retryPolicy;
}
}

View File

@ -163,6 +163,14 @@
<tag>OZONE, RATIS, MANAGEMENT</tag>
<description>The timeout duration for ratis server request.</description>
</property>
<property>
<name>dfs.ratis.leader.election.minimum.timeout.duration</name>
<value>1s</value>
<tag>OZONE, RATIS, MANAGEMENT</tag>
<description>The minimum timeout duration for ratis leader election.
Default is 1s.
</description>
</property>
<property>
<name>dfs.ratis.server.failure.duration</name>
<value>120s</value>

View File

@ -182,18 +182,30 @@ private RaftProperties newRaftProperties(Configuration conf,
RaftServerConfigKeys.Rpc
.setRequestTimeout(properties, serverRequestTimeout);
// Set the ratis leader election timeout
TimeUnit leaderElectionMinTimeoutUnit =
OzoneConfigKeys.
DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
.getUnit();
duration = conf.getTimeDuration(
OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
OzoneConfigKeys.
DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
.getDuration(), leaderElectionMinTimeoutUnit);
final TimeDuration leaderElectionMinTimeout =
TimeDuration.valueOf(duration, leaderElectionMinTimeoutUnit);
RaftServerConfigKeys.Rpc
.setTimeoutMin(properties, leaderElectionMinTimeout);
long leaderElectionMaxTimeout =
leaderElectionMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200;
RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
// Enable batch append on raft server
RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
// Set the maximum cache segments
RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
// Set the ratis leader election timeout
RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
TimeDuration.valueOf(800, TimeUnit.MILLISECONDS));
RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS));
// set the node failure timeout
timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
.getUnit();

View File

@ -20,10 +20,14 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.slf4j.Logger;
@ -104,4 +108,13 @@ static MiniOzoneCluster newMiniOzoneCluster(
cluster.waitForClusterToBeReady();
return cluster;
}
static void initXceiverServerRatis(
RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(dd);
final OzoneConfiguration conf = new OzoneConfiguration();
final RaftClient client =
RatisHelper.newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf));
client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId());
}
}

View File

@ -46,12 +46,7 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.rpc.RpcType;
import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.util.CheckedBiConsumer;
import java.util.function.BiConsumer;
@ -77,7 +72,7 @@ public void testContainerStateMachineMetrics() throws Exception {
(pipeline, conf) -> RatisTestHelper.initRatisConf(GRPC, conf),
XceiverClientRatis::newXceiverClientRatis,
TestCSMMetrics::newXceiverServerRatis,
(dn, p) -> initXceiverServerRatis(GRPC, dn, p));
(dn, p) -> RatisTestHelper.initXceiverServerRatis(GRPC, dn, p));
}
static void runContainerStateMachineMetrics(
@ -160,15 +155,6 @@ static XceiverServerRatis newXceiverServerRatis(
null);
}
static void initXceiverServerRatis(
RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(dd);
final RaftClient client = RatisHelper.newRaftClient(rpc, p);
RaftGroupId groupId = pipeline.getId().getRaftGroupID();
client.reinitialize(RatisHelper.newRaftGroup(groupId,
pipeline.getMachines()), p.getId());
}
private static class TestContainerDispatcher implements ContainerDispatcher {
/**
* Dispatches commands to container layer.

View File

@ -46,9 +46,6 @@
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.util.CheckedBiConsumer;
import org.junit.Assert;
@ -142,21 +139,13 @@ static XceiverServerRatis newXceiverServerRatis(
.newXceiverServerRatis(dn, conf, dispatcher, null);
}
static void initXceiverServerRatis(
RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(dd);
final RaftClient client = RatisHelper.newRaftClient(rpc, p);
client.reinitialize(RatisHelper.newRaftGroup(pipeline), p.getId());
}
static void runTestClientServerRatis(RpcType rpc, int numNodes)
throws Exception {
runTestClientServer(numNodes,
(pipeline, conf) -> RatisTestHelper.initRatisConf(rpc, conf),
XceiverClientRatis::newXceiverClientRatis,
TestContainerServer::newXceiverServerRatis,
(dn, p) -> initXceiverServerRatis(rpc, dn, p));
(dn, p) -> RatisTestHelper.initXceiverServerRatis(rpc, dn, p));
}
static void runTestClientServer(

View File

@ -101,7 +101,7 @@
<ldap-api.version>1.0.0-M33</ldap-api.version>
<!-- Apache Ratis version -->
<ratis.version>0.3.0-e6fd494-SNAPSHOT</ratis.version>
<ratis.version>0.3.0-50588bd-SNAPSHOT</ratis.version>
<jcache.version>1.0-alpha-1</jcache.version>
<ehcache.version>3.3.1</ehcache.version>
<hikari.version>2.4.12</hikari.version>