From 9df6275954844a9617e20eb4924ee7898ca0df8e Mon Sep 17 00:00:00 2001 From: Mukul Kumar Singh Date: Mon, 1 Jul 2019 20:52:32 +0530 Subject: [PATCH] HDDS-1555. Disable install snapshot for ContainerStateMachine. Contributed by Siddharth Wagle. (#846) --- .../hadoop/hdds/scm/XceiverClientRatis.java | 72 ++++++++++--------- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 7 +- .../apache/hadoop/ozone/OzoneConfigKeys.java | 4 ++ .../src/main/resources/ozone-default.xml | 8 +++ .../server/ratis/ContainerStateMachine.java | 28 +++++--- .../server/ratis/XceiverServerRatis.java | 51 ++++++++++--- hadoop-hdds/pom.xml | 2 +- .../om/ratis/OzoneManagerRatisClient.java | 57 +++++++-------- hadoop-ozone/pom.xml | 2 +- 9 files changed, 142 insertions(+), 89 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index efd82bce7b..4a90e489c9 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -18,52 +18,54 @@ package org.apache.hadoop.hdds.scm; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.OptionalLong; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.x509.SecurityConfig; - -import io.opentracing.Scope; -import io.opentracing.util.GlobalTracer; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.util.Time; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.client.RaftClient; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.GroupMismatchException; -import org.apache.ratis.protocol.RaftRetryFailureException; -import org.apache.ratis.retry.RetryPolicy; -import org.apache.ratis.thirdparty.com.google.protobuf - .InvalidProtocolBufferException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.tracing.TracingUtil; - -import org.apache.ratis.RatisHelper; -import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftException; +import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import io.opentracing.Scope; +import io.opentracing.util.GlobalTracer; /** * An abstract implementation of {@link XceiverClientSpi} using Ratis. @@ -309,10 +311,7 @@ public XceiverClientReply sendCommandAsync( Time.monotonicNowNanos() - requestTime); }).thenApply(reply -> { try { - // we need to handle RaftRetryFailure Exception - RaftRetryFailureException raftRetryFailureException = - reply.getRetryFailureException(); - if (raftRetryFailureException != null) { + if (!reply.isSuccess()) { // in case of raft retry failure, the raft client is // not able to connect to the leader hence the pipeline // can not be used but this instance of RaftClient will close @@ -324,7 +323,10 @@ public XceiverClientReply sendCommandAsync( // to SCM as in this case, it is the raft client which is not // able to connect to leader in the pipeline, though the // pipeline can still be functional. - throw new CompletionException(raftRetryFailureException); + RaftException exception = reply.getException(); + Preconditions.checkNotNull(exception, "Raft reply failure but " + + "no exception propagated."); + throw new CompletionException(exception); } ContainerCommandResponseProto response = ContainerCommandResponseProto diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index ae09c9d6d9..a98739900c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -107,6 +107,11 @@ public final class ScmConfigKeys { "dfs.container.ratis.log.appender.queue.byte-limit"; public static final String DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB"; + public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP = + "dfs.container.ratis.log.purge.gap"; + // TODO: Set to 1024 once RATIS issue around purge is fixed. + public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT = + 1000000000; // expiry interval stateMachineData cache entry inside containerStateMachine public static final String DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL = @@ -146,7 +151,7 @@ public final class ScmConfigKeys { public static final String DFS_RATIS_SNAPSHOT_THRESHOLD_KEY = "dfs.ratis.snapshot.threshold"; - public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 10000; + public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 100000; public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY = "dfs.ratis.server.failure.duration"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 1463c43e83..b77cca35a8 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -322,6 +322,10 @@ public final class OzoneConfigKeys { public static final String DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT; + public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP = + ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP; + public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT = + ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT; public static final String DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY = ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY; public static final TimeDuration diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 427def917e..c10aa3353a 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -104,6 +104,14 @@ Byte limit for ratis leader's log appender queue. + + dfs.container.ratis.log.purge.gap + 1000000000 + OZONE, DEBUG, CONTAINER, RATIS + Purge gap between the last purged commit index + and the current index, when the leader decides to purge its log. + + dfs.container.ratis.datanode.storage.dir diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 7a7baec300..f4a8008b74 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -28,12 +28,11 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; -import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.thirdparty.com.google.protobuf .InvalidProtocolBufferException; @@ -195,12 +194,12 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot) throws IOException { if (snapshot == null) { TermIndex empty = - TermIndex.newTermIndex(0, RaftServerConstants.INVALID_LOG_INDEX); + TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX); LOG.info( "The snapshot info is null." + "Setting the last applied index to:" + empty); setLastAppliedTermIndex(empty); - return RaftServerConstants.INVALID_LOG_INDEX; + return RaftLog.INVALID_LOG_INDEX; } final File snapshotFile = snapshot.getFile().getPath().toFile(); @@ -243,7 +242,7 @@ public void persistContainerSet(OutputStream out) throws IOException { public long takeSnapshot() throws IOException { TermIndex ti = getLastAppliedTermIndex(); LOG.info("Taking snapshot at termIndex:" + ti); - if (ti != null && ti.getIndex() != RaftServerConstants.INVALID_LOG_INDEX) { + if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) { final File snapshotFile = storage.getSnapshotFile(ti.getTerm(), ti.getIndex()); LOG.info("Taking a snapshot to file {}", snapshotFile); @@ -651,14 +650,13 @@ private void evictStateMachineCache() { } @Override - public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) { - ratisServer.handleNodeSlowness(group, roleInfoProto); + public void notifySlowness(RoleInfoProto roleInfoProto) { + ratisServer.handleNodeSlowness(gid, roleInfoProto); } @Override - public void notifyExtendedNoLeader(RaftGroup group, - RoleInfoProto roleInfoProto) { - ratisServer.handleNoLeader(group, roleInfoProto); + public void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) { + ratisServer.handleNoLeader(gid, roleInfoProto); } @Override @@ -667,6 +665,16 @@ public void notifyNotLeader(Collection pendingEntries) evictStateMachineCache(); } + @Override + public CompletableFuture notifyInstallSnapshotFromLeader( + RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) { + ratisServer.handleInstallSnapshotFromLeader(gid, roleInfoProto, + firstTermIndexInLog); + final CompletableFuture future = new CompletableFuture<>(); + future.complete(firstTermIndexInLog); + return future; + } + @Override public void close() throws IOException { evictStateMachineCache(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 424281891b..246d58af20 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -57,7 +57,6 @@ import org.apache.ratis.protocol.NotLeaderException; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; @@ -66,6 +65,7 @@ import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -240,8 +240,9 @@ private RaftProperties newRaftProperties(Configuration conf) { OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT, OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT, StorageUnit.BYTES); - RaftServerConfigKeys.Log.setElementLimit(properties, logQueueNumElements); - RaftServerConfigKeys.Log.setByteLimit(properties, logQueueByteLimit); + RaftServerConfigKeys.Log.setQueueElementLimit( + properties, logQueueNumElements); + RaftServerConfigKeys.Log.setQueueByteLimit(properties, logQueueByteLimit); int numSyncRetries = conf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES, @@ -251,8 +252,17 @@ private RaftProperties newRaftProperties(Configuration conf) { numSyncRetries); // Enable the StateMachineCaching - RaftServerConfigKeys.Log.StateMachineData - .setCachingEnabled(properties, true); + RaftServerConfigKeys.Log.StateMachineData.setCachingEnabled( + properties, true); + + RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties, + false); + + int purgeGap = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP, + OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT); + RaftServerConfigKeys.Log.setPurgeGap(properties, purgeGap); + return properties; } @@ -590,11 +600,32 @@ public List getPipelineIds() { return pipelineIDs; } - void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) { - handlePipelineFailure(group.getGroupId(), roleInfoProto); + void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) { + handlePipelineFailure(groupId, roleInfoProto); } - void handleNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) { - handlePipelineFailure(group.getGroupId(), roleInfoProto); + void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) { + handlePipelineFailure(groupId, roleInfoProto); } -} \ No newline at end of file + + /** + * The fact that the snapshot contents cannot be used to actually catch up + * the follower, it is the reason to initiate close pipeline and + * not install the snapshot. The follower will basically never be able to + * catch up. + * + * @param groupId raft group information + * @param roleInfoProto information about the current node role and + * rpc delay information. + * @param firstTermIndexInLog After the snapshot installation is complete, + * return the last included term index in the snapshot. + */ + void handleInstallSnapshotFromLeader(RaftGroupId groupId, + RoleInfoProto roleInfoProto, + TermIndex firstTermIndexInLog) { + LOG.warn("Install snapshot notification received from Leader with " + + "termIndex: {}, terminating pipeline: {}", + firstTermIndexInLog, groupId); + handlePipelineFailure(groupId, roleInfoProto); + } +} diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml index 0e87c2cc8d..12ed0a3f78 100644 --- a/hadoop-hdds/pom.xml +++ b/hadoop-hdds/pom.xml @@ -47,7 +47,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> 0.5.0-SNAPSHOT - 0.4.0-fe2b15d-SNAPSHOT + 0.4.0-2337318-SNAPSHOT 1.60 diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java index cd99cd1fab..2cbef50cb0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java @@ -17,6 +17,8 @@ package org.apache.hadoop.ozone.om.ratis; +import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE; + import java.io.Closeable; import java.io.IOException; import java.util.concurrent.CompletableFuture; @@ -24,23 +26,18 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.ServiceException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftException; import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftRetryFailureException; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.retry.RetryPolicies; import org.apache.ratis.retry.RetryPolicy; @@ -51,7 +48,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE; +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.ServiceException; /** * OM Ratis client to interact with OM Ratis server endpoint. @@ -167,29 +166,25 @@ private CompletableFuture sendCommandAsync(OMRequest request) { CompletableFuture raftClientReply = sendRequestAsync(request); - CompletableFuture omRatisResponse = - raftClientReply.whenComplete((reply, e) -> LOG.debug( - "received reply {} for request: cmdType={} traceID={} " + - "exception: {}", reply, request.getCmdType(), - request.getTraceID(), e)) - .thenApply(reply -> { - try { - // we need to handle RaftRetryFailure Exception - RaftRetryFailureException raftRetryFailureException = - reply.getRetryFailureException(); - if (raftRetryFailureException != null) { - throw new CompletionException(raftRetryFailureException); - } + return raftClientReply.whenComplete((reply, e) -> LOG.debug( + "received reply {} for request: cmdType={} traceID={} " + + "exception: {}", reply, request.getCmdType(), + request.getTraceID(), e)) + .thenApply(reply -> { + try { + Preconditions.checkNotNull(reply); + if (!reply.isSuccess()) { + RaftException exception = reply.getException(); + Preconditions.checkNotNull(exception, "Raft reply failure " + + "but no exception propagated."); + throw new CompletionException(exception); + } + return OMRatisHelper.getOMResponseFromRaftClientReply(reply); - OMResponse response = OMRatisHelper - .getOMResponseFromRaftClientReply(reply); - - return response; - } catch (InvalidProtocolBufferException e) { - throw new CompletionException(e); - } - }); - return omRatisResponse; + } catch (InvalidProtocolBufferException e) { + throw new CompletionException(e); + } + }); } /** diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml index 9fa1c8b6d5..2356276089 100644 --- a/hadoop-ozone/pom.xml +++ b/hadoop-ozone/pom.xml @@ -29,7 +29,7 @@ 3.2.0 0.5.0-SNAPSHOT 0.5.0-SNAPSHOT - 0.4.0-fe2b15d-SNAPSHOT + 0.4.0-2337318-SNAPSHOT 1.60 Crater Lake ${ozone.version}