From 999da98d67c0a5e662802a65bc1c5167808f270c Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Fri, 4 Jan 2019 15:25:26 -0800 Subject: [PATCH] HDDS-915. Submit client request to OM Ratis server. Contributed by Hanisha Koneru. --- .../src/main/resources/ozone-default.xml | 24 ++- .../java/org/apache/hadoop/ozone/OmUtils.java | 46 ++++- .../apache/hadoop/ozone/om/OMConfigKeys.java | 23 ++- .../apache/hadoop/ozone/om/OzoneManager.java | 69 +++++-- .../ozone/om/exceptions/OMException.java | 3 +- .../hadoop/ozone/om/ratis/OMRatisHelper.java | 121 +++++++++++ .../om/ratis/OzoneManagerRatisClient.java | 194 ++++++++++++++++++ .../om/ratis/OzoneManagerRatisServer.java | 79 +++++-- .../om/ratis/OzoneManagerStateMachine.java | 90 ++++++++ ...ManagerProtocolServerSideTranslatorPB.java | 30 ++- .../om/ratis/TestOzoneManagerRatisServer.java | 86 +++++++- 11 files changed, 713 insertions(+), 52 deletions(-) create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index ad6745ba11..e0f9201e70 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1543,12 +1543,34 @@ - ozone.om.ratis.client.request.timeout + ozone.om.ratis.client.request.timeout.duration 3s OZONE, OM, RATIS, MANAGEMENT The timeout duration for OM Ratis client request. + + ozone.om.ratis.client.request.max.retries + 180 + OZONE, OM, RATIS, MANAGEMENT + Number of retries for OM client request. + + + ozone.om.ratis.client.request.retry.interval + 100ms + OZONE, OM, RATIS, MANAGEMENT + Interval between successive retries for a OM client request. + + + + + ozone.om.leader.election.minimum.timeout.duration + 1s + OZONE, OM, RATIS, MANAGEMENT + The minimum timeout duration for OM ratis leader election. + Default is 1s. + + ozone.acl.authorizer.class diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java index 92806485c4..b93529bdaa 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone; import java.io.File; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collection; import java.util.Optional; @@ -28,6 +29,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +46,7 @@ * communication. */ public final class OmUtils { - private static final Logger LOG = LoggerFactory.getLogger(OmUtils.class); + public static final Logger LOG = LoggerFactory.getLogger(OmUtils.class); private OmUtils() { } @@ -133,4 +135,46 @@ public static File getOmDbDir(Configuration conf) { OMConfigKeys.OZONE_OM_DB_DIRS, HddsConfigKeys.OZONE_METADATA_DIRS); return ServerUtils.getOzoneMetaDirPath(conf); } + + /** + * Checks if the OM request is read only or not. + * @param omRequest OMRequest proto + * @return True if its readOnly, false otherwise. + */ + public static boolean isReadOnly( + OzoneManagerProtocolProtos.OMRequest omRequest) { + OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType(); + switch (cmdType) { + case CheckVolumeAccess: + case InfoVolume: + case ListVolume: + case InfoBucket: + case ListBuckets: + case LookupKey: + case ListKeys: + case InfoS3Bucket: + case ListS3Buckets: + case ServiceList: + return true; + case CreateVolume: + case SetVolumeProperty: + case DeleteVolume: + case CreateBucket: + case SetBucketProperty: + case DeleteBucket: + case CreateKey: + case RenameKey: + case DeleteKey: + case CommitKey: + case AllocateBlock: + case CreateS3Bucket: + case DeleteS3Bucket: + case InitiateMultiPartUpload: + case CommitMultiPartUpload: + return false; + default: + LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType); + return false; + } + } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 82eca4e838..3cc4434bf5 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -153,9 +153,26 @@ private OMConfigKeys() { = TimeDuration.valueOf(1, TimeUnit.SECONDS); // OM Ratis client configurations - public static final String OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_KEY - = "ozone.om.ratis.client.request.timeout"; + public static final String OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY + = "ozone.om.ratis.client.request.timeout.duration"; public static final TimeDuration - OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DEFAULT + OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); + public static final String OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY + = "ozone.om.ratis.client.request.max.retries"; + public static final int OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT + = 180; + public static final String OZONE_OM_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY + = "ozone.om.ratis.client.request.retry.interval"; + public static final TimeDuration + OZONE_OM_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT + = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + + // OM Ratis Leader Election configurations + public static final String + OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY = + "ozone.om.leader.election.minimum.timeout.duration"; + public static final TimeDuration + OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT = + TimeDuration.valueOf(1, TimeUnit.SECONDS); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index a98e38811f..4fc0813e2f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -68,9 +68,12 @@ import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType; @@ -134,7 +137,9 @@ .OZONE_OM_METRICS_SAVE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys .OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT; -import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService; +import static org.apache.hadoop.ozone.protocol.proto + .OzoneManagerProtocolProtos.OzoneManagerService + .newReflectiveBlockingService; import static org.apache.hadoop.util.ExitUtil.terminate; /** @@ -156,7 +161,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private final OzoneConfiguration configuration; private RPC.Server omRpcServer; private InetSocketAddress omRpcAddress; + private String omId; private OzoneManagerRatisServer omRatisServer; + private OzoneManagerRatisClient omRatisClient; private final OMMetadataManager metadataManager; private final VolumeManager volumeManager; private final BucketManager bucketManager; @@ -185,6 +192,7 @@ private OzoneManager(OzoneConfiguration conf) throws IOException { Preconditions.checkNotNull(conf); configuration = conf; omStorage = new OMStorage(conf); + omId = omStorage.getOmId(); scmBlockClient = getScmBlockClient(configuration); scmContainerClient = getScmContainerClient(configuration); if (omStorage.getState() != StorageState.INITIALIZED) { @@ -584,8 +592,32 @@ public void start() throws IOException { InetSocketAddress omNodeRpcAddr = getOmAddress(configuration); int handlerCount = configuration.getInt(OZONE_OM_HANDLER_COUNT_KEY, OZONE_OM_HANDLER_COUNT_DEFAULT); + + // This is a temporary check. Once fully implemented, all OM state change + // should go through Ratis - either standalone (for non-HA) or replicated + // (for HA). + boolean omRatisEnabled = configuration.getBoolean( + OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, + OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); + if (omRatisEnabled) { + omRatisServer = OzoneManagerRatisServer.newOMRatisServer( + omId, omRpcAddress.getAddress(), configuration); + omRatisServer.start(); + + LOG.info("OzoneManager Ratis server started at port {}", + omRatisServer.getServerPort()); + + omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient( + omId, omRatisServer.getRaftGroup(), configuration); + omRatisClient.connect(); + } else { + omRatisServer = null; + omRatisClient = null; + } + BlockingService omService = newReflectiveBlockingService( - new OzoneManagerProtocolServerSideTranslatorPB(this)); + new OzoneManagerProtocolServerSideTranslatorPB( + this, omRatisClient, omRatisEnabled)); omRpcServer = startRpcServer(configuration, omNodeRpcAddr, OzoneManagerProtocolPB.class, omService, handlerCount); @@ -596,23 +628,6 @@ public void start() throws IOException { LOG.info(buildRpcServerStartMessage("OzoneManager RPC server", omRpcAddress)); - boolean omRatisEnabled = configuration.getBoolean( - OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, - OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); - // This is a temporary check. Once fully implemented, all OM state change - // should go through Ratis, either standalone (for non-HA) or replicated - // (for HA). - if (omRatisEnabled) { - omRatisServer = OzoneManagerRatisServer.newOMRatisServer( - omStorage.getOmId(), configuration); - omRatisServer.start(); - - LOG.info("OzoneManager Ratis server started at port {}", - omRatisServer.getServerPort()); - } else { - omRatisServer = null; - } - DefaultMetricsSystem.initialize("OzoneManager"); metadataManager.start(configuration); @@ -687,6 +702,22 @@ public void join() { } } + /** + * Validates that the incoming OM request has required parameters. + * TODO: Add more validation checks before writing the request to Ratis log. + * @param omRequest client request to OM + * @throws OMException thrown if required parameters are set to null. + */ + public void validateRequest(OMRequest omRequest) throws OMException { + Type cmdType = omRequest.getCmdType(); + if (cmdType == null) { + throw new OMException("CmdType is null", ResultCodes.INVALID_REQUEST); + } + if (omRequest.getClientId() == null) { + throw new OMException("ClientId is null", ResultCodes.INVALID_REQUEST); + } + } + /** * Creates a volume. * diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java index bdf2fb50bf..6d93a78705 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java @@ -118,6 +118,7 @@ public enum ResultCodes { S3_BUCKET_NOT_FOUND, INITIATE_MULTIPART_UPLOAD_FAILED, NO_SUCH_MULTIPART_UPLOAD, - UPLOAD_PART_FAILED; + UPLOAD_PART_FAILED, + INVALID_REQUEST; } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java new file mode 100644 index 0000000000..73ee517b2c --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.ratis; + +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.retry.RetryPolicy; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.SizeInBytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Ratis helper methods for OM Ratis server and client. + */ +public class OMRatisHelper { + private static final Logger LOG = LoggerFactory.getLogger( + OMRatisHelper.class); + + private OMRatisHelper() { + } + + /** + * Creates a new RaftClient object. + * @param rpcType Replication Type + * @param omId OM id of the client + * @param group RaftGroup + * @param retryPolicy Retry policy + * @return RaftClient object + */ + static RaftClient newRaftClient(RpcType rpcType, String omId, RaftGroup + group, RetryPolicy retryPolicy, Configuration conf) { + LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, omId, group); + final RaftProperties properties = new RaftProperties(); + RaftConfigKeys.Rpc.setType(properties, rpcType); + + final int raftSegmentPreallocatedSize = (int) conf.getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, + OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT, + StorageUnit.BYTES); + GrpcConfigKeys.setMessageSizeMax( + properties, SizeInBytes.valueOf(raftSegmentPreallocatedSize)); + + return RaftClient.newBuilder() + .setRaftGroup(group) + .setLeaderId(getRaftPeerId(omId)) + .setProperties(properties) + .setRetryPolicy(retryPolicy) + .build(); + } + + static RaftPeerId getRaftPeerId(String omId) { + return RaftPeerId.valueOf(omId); + } + + static ByteString convertRequestToByteString(OMRequest request) { + byte[] requestBytes = request.toByteArray(); + return ByteString.copyFrom(requestBytes); + } + + static OMRequest convertByteStringToOMRequest(ByteString byteString) + throws InvalidProtocolBufferException { + byte[] bytes = byteString.toByteArray(); + return OMRequest.parseFrom(bytes); + } + + static ByteString convertResponseToByteString(OMResponse response) { + byte[] requestBytes = response.toByteArray(); + return ByteString.copyFrom(requestBytes); + } + + static OMResponse convertByteStringToOMResponse(ByteString byteString) + throws InvalidProtocolBufferException { + byte[] bytes = byteString.toByteArray(); + return OMResponse.parseFrom(bytes); + } + + static OMResponse getErrorResponse(Type cmdType, Exception e) { + return OMResponse.newBuilder() + .setCmdType(cmdType) + .setSuccess(false) + .setMessage(e.getMessage()) + .build(); + } + + static CompletableFuture completeExceptionally(Exception e) { + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java new file mode 100644 index 0000000000..c18437c8bc --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.ratis; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftRetryFailureException; +import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.retry.RetryPolicy; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * OM Ratis client to interact with OM Ratis server endpoint. + */ +public final class OzoneManagerRatisClient implements Closeable { + static final Logger LOG = LoggerFactory.getLogger( + OzoneManagerRatisClient.class); + + private final RaftGroup raftGroup; + private final String omID; + private final RpcType rpcType; + private final AtomicReference client = new AtomicReference<>(); + private final RetryPolicy retryPolicy; + private final Configuration conf; + + private OzoneManagerRatisClient(String omId, RaftGroup raftGroup, + RpcType rpcType, RetryPolicy retryPolicy, + Configuration config) { + this.raftGroup = raftGroup; + this.omID = omId; + this.rpcType = rpcType; + this.retryPolicy = retryPolicy; + this.conf = config; + } + + public static OzoneManagerRatisClient newOzoneManagerRatisClient( + String omId, RaftGroup raftGroup, Configuration conf) { + final String rpcType = conf.get( + OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY, + OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT); + + final int maxRetryCount = conf.getInt( + OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, + OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT); + final long retryInterval = conf.getTimeDuration( + OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, + OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT + .toIntExact(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + final TimeDuration sleepDuration = TimeDuration.valueOf( + retryInterval, TimeUnit.MILLISECONDS); + final RetryPolicy retryPolicy = RetryPolicies + .retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration); + + return new OzoneManagerRatisClient(omId, raftGroup, + SupportedRpcType.valueOfIgnoreCase(rpcType), retryPolicy, conf); + } + + public void connect() { + LOG.debug("Connecting to OM Ratis Server GroupId:{} OM:{}", + raftGroup.getGroupId().getUuid().toString(), omID); + + // TODO : XceiverClient ratis should pass the config value of + // maxOutstandingRequests so as to set the upper bound on max no of async + // requests to be handled by raft client + + if (!client.compareAndSet(null, OMRatisHelper.newRaftClient( + rpcType, omID, raftGroup, retryPolicy, conf))) { + throw new IllegalStateException("Client is already connected."); + } + } + + @Override + public void close() { + final RaftClient c = client.getAndSet(null); + if (c != null) { + closeRaftClient(c); + } + } + + private void closeRaftClient(RaftClient raftClient) { + try { + raftClient.close(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + private RaftClient getClient() { + return Objects.requireNonNull(client.get(), "client is null"); + } + + /** + * Sends a given request to server and gets the reply back. + * @param request Request + * @return Response to the command + */ + public OMResponse sendCommand(OMRequest request) { + try { + CompletableFuture reply = sendCommandAsync(request); + return reply.get(); + } catch (ExecutionException | InterruptedException e) { + LOG.error("Failed to execute command: " + request, e); + return OMRatisHelper.getErrorResponse(request.getCmdType(), e); + } + } + + /** + * Sends a given command to server gets a waitable future back. + * + * @param request Request + * @return Response to the command + */ + private CompletableFuture sendCommandAsync(OMRequest request) { + CompletableFuture raftClientReply = + sendRequestAsync(request); + + CompletableFuture omRatisResponse = + raftClientReply.whenComplete((reply, e) -> LOG.debug( + "received reply {} for request: cmdType={} traceID={} " + + "exception: {}", reply, request.getCmdType(), + request.getTraceID(), e)) + .thenApply(reply -> { + try { + // we need to handle RaftRetryFailure Exception + RaftRetryFailureException raftRetryFailureException = + reply.getRetryFailureException(); + if (raftRetryFailureException != null) { + throw new CompletionException(raftRetryFailureException); + } + OMResponse response = OMRatisHelper + .convertByteStringToOMResponse(reply.getMessage() + .getContent()); + return response; + } catch (InvalidProtocolBufferException e) { + throw new CompletionException(e); + } + }); + return omRatisResponse; + } + + /** + * Submits {@link RaftClient#sendReadOnlyAsync(Message)} request to Ratis + * server if the request is readOnly. Otherwise, submits + * {@link RaftClient#sendAsync(Message)} request. + * @param request OMRequest + * @return RaftClient response + */ + private CompletableFuture sendRequestAsync( + OMRequest request) { + boolean isReadOnlyRequest = OmUtils.isReadOnly(request); + ByteString byteString = OMRatisHelper.convertRequestToByteString(request); + LOG.debug("sendOMRequestAsync {} {}", isReadOnlyRequest, request); + return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) : + getClient().sendAsync(() -> byteString); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index 6d9801b8c2..f28f2ce529 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -22,12 +22,14 @@ import com.google.common.base.Strings; import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketAddress; import java.util.Collections; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; @@ -38,7 +40,9 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.netty.NettyConfigKeys; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; @@ -59,23 +63,43 @@ public final class OzoneManagerRatisServer { .getLogger(OzoneManagerRatisServer.class); private final int port; + private final InetSocketAddress omRatisAddress; private final RaftServer server; + private final RaftGroupId raftGroupId; + private final RaftGroup raftGroup; + private final RaftPeerId raftPeerId; - private OzoneManagerRatisServer(String omId, int port, Configuration conf) - throws IOException { + private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); + + private static long nextCallId() { + return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; + } + + private OzoneManagerRatisServer(String omId, InetAddress addr, int port, + Configuration conf) throws IOException { Objects.requireNonNull(omId, "omId == null"); this.port = port; + this.omRatisAddress = new InetSocketAddress(addr.getHostAddress(), port); RaftProperties serverProperties = newRaftProperties(conf); + // TODO: When implementing replicated OM ratis servers, RaftGroupID + // should be the same across all the OMs. Add all the OM servers as Raft + // Peers. + this.raftGroupId = RaftGroupId.randomId(); + this.raftPeerId = RaftPeerId.getRaftPeerId(omId); + + RaftPeer raftPeer = new RaftPeer(raftPeerId, omRatisAddress); + this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeer); this.server = RaftServer.newBuilder() - .setServerId(RaftPeerId.valueOf(omId)) + .setServerId(this.raftPeerId) + .setGroup(this.raftGroup) .setProperties(serverProperties) - .setStateMachineRegistry(this::getStateMachine) + .setStateMachine(getStateMachine(this.raftGroupId)) .build(); } public static OzoneManagerRatisServer newOMRatisServer(String omId, - Configuration ozoneConf) throws IOException { + InetAddress omAddress, Configuration ozoneConf) throws IOException { int localPort = ozoneConf.getInt( OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT); @@ -96,7 +120,11 @@ public static OzoneManagerRatisServer newOMRatisServer(String omId, + "fallback to use default port {}", localPort, e); } } - return new OzoneManagerRatisServer(omId, localPort, ozoneConf); + return new OzoneManagerRatisServer(omId, omAddress, localPort, ozoneConf); + } + + public RaftGroup getRaftGroup() { + return this.raftGroup; } /** @@ -104,7 +132,7 @@ public static OzoneManagerRatisServer newOMRatisServer(String omId, * TODO: Implement a state machine on OM. */ private BaseStateMachine getStateMachine(RaftGroupId gid) { - return new BaseStateMachine(); + return new OzoneManagerStateMachine(null); } public void start() throws IOException { @@ -163,8 +191,8 @@ private RaftProperties newRaftProperties(Configuration conf) { OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, StorageUnit.BYTES); - RaftServerConfigKeys.Log.Appender - .setBufferElementLimit(properties, logAppenderQueueNumElements); + RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties, + logAppenderQueueNumElements); RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties, SizeInBytes.valueOf(logAppenderQueueByteLimit)); RaftServerConfigKeys.Log.setPreallocatedSize(properties, @@ -197,8 +225,8 @@ private RaftProperties newRaftProperties(Configuration conf) { .getDuration(), retryCacheTimeoutUnit); final TimeDuration retryCacheTimeout = TimeDuration.valueOf( retryCacheTimeoutDuration, retryCacheTimeoutUnit); - RaftServerConfigKeys.RetryCache - .setExpiryTime(properties, retryCacheTimeout); + RaftServerConfigKeys.RetryCache.setExpiryTime(properties, + retryCacheTimeout); // Set the server min and max timeout TimeUnit serverMinTimeoutUnit = @@ -222,11 +250,11 @@ private RaftProperties newRaftProperties(Configuration conf) { RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2); // Set the client request timeout - TimeUnit clientRequestTimeoutUnit = - OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DEFAULT.getUnit(); + TimeUnit clientRequestTimeoutUnit = OMConfigKeys + .OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT .getUnit(); long clientRequestTimeoutDuration = conf.getTimeDuration( - OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_KEY, - OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DEFAULT + OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY, + OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT .getDuration(), clientRequestTimeoutUnit); final TimeDuration clientRequestTimeout = TimeDuration.valueOf( clientRequestTimeoutDuration, clientRequestTimeoutUnit); @@ -243,10 +271,24 @@ private RaftProperties newRaftProperties(Configuration conf) { /** * TODO: set following ratis leader election related configs when * replicated ratis server is implemented. - * 1. leader election timeout - * 2. node failure timeout - * 3. + * 1. node failure timeout */ + // Set the ratis leader election timeout + TimeUnit leaderElectionMinTimeoutUnit = + OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT + .getUnit(); + long leaderElectionMinTimeoutduration = conf.getTimeDuration( + OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, + OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT + .getDuration(), leaderElectionMinTimeoutUnit); + final TimeDuration leaderElectionMinTimeout = TimeDuration.valueOf( + leaderElectionMinTimeoutduration, leaderElectionMinTimeoutUnit); + RaftServerConfigKeys.Rpc.setTimeoutMin(properties, + leaderElectionMinTimeout); + long leaderElectionMaxTimeout = leaderElectionMinTimeout.toLong( + TimeUnit.MILLISECONDS) + 200; + RaftServerConfigKeys.Rpc.setTimeoutMax(properties, + TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS)); /** * TODO: when ratis snapshots are implemented, set snapshot threshold and @@ -276,5 +318,4 @@ public static String getOMRatisDirectory(Configuration conf) { } return storageDir; } - } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java new file mode 100644 index 0000000000..5ea0b49530 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.ratis; + +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.ozone.container.common.transport.server.ratis + .ContainerStateMachine; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.*; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The OM StateMachine is the state machine for OM Ratis server. It is + * responsible for applying ratis committed transactions to + * {@link OzoneManager}. + */ +public class OzoneManagerStateMachine extends BaseStateMachine { + + static final Logger LOG = + LoggerFactory.getLogger(ContainerStateMachine.class); + private final SimpleStateMachineStorage storage = + new SimpleStateMachineStorage(); + private final OzoneManager ozoneManager; + + public OzoneManagerStateMachine(OzoneManager om) { + // OzoneManager is required when implementing StateMachine + this.ozoneManager = om; + } + + /** + * Initializes the State Machine with the given server, group and storage. + * TODO: Load the latest snapshot from the file system. + */ + @Override + public void initialize( + RaftServer server, RaftGroupId id, RaftStorage raftStorage) + throws IOException { + super.initialize(server, id, raftStorage); + storage.init(raftStorage); + } + + /* + * Apply a committed log entry to the state machine. This function + * currently returns a dummy message. + * TODO: Apply transaction to OM state machine + */ + @Override + public CompletableFuture applyTransaction(TransactionContext trx) { + String errorMessage; + ByteString logData = trx.getStateMachineLogEntry().getLogData(); + try { + OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(logData); + LOG.debug("Received request: cmdType={} traceID={} ", + omRequest.getCmdType(), omRequest.getTraceID()); + errorMessage = "Dummy response from Ratis server for command type: " + + omRequest.getCmdType(); + } catch (InvalidProtocolBufferException e) { + errorMessage = e.getMessage(); + } + + // TODO: When State Machine is implemented, send the actual response back + return OMRatisHelper.completeExceptionally(new IOException(errorMessage)); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 5e33dbf799..33453ac892 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .AllocateBlockRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -160,6 +161,8 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements private static final Logger LOG = LoggerFactory .getLogger(OzoneManagerProtocolServerSideTranslatorPB.class); private final OzoneManagerProtocol impl; + private final OzoneManagerRatisClient omRatisClient; + private final boolean isRatisEnabled; /** * Constructs an instance of the server handler. @@ -167,8 +170,11 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements * @param impl OzoneManagerProtocolPB */ public OzoneManagerProtocolServerSideTranslatorPB( - OzoneManagerProtocol impl) { + OzoneManagerProtocol impl, OzoneManagerRatisClient ratisClient, + boolean enableRatis) { this.impl = impl; + this.omRatisClient = ratisClient; + this.isRatisEnabled = enableRatis; } /** @@ -179,10 +185,29 @@ public OzoneManagerProtocolServerSideTranslatorPB( @Override public OMResponse submitRequest(RpcController controller, OMRequest request) throws ServiceException { - Type cmdType = request.getCmdType(); + if (isRatisEnabled) { + return submitRequestToRatis(request); + } else { + return submitRequestToOM(request); + } + } + /** + * Submits request to OM's Ratis server. + */ + private OMResponse submitRequestToRatis(OMRequest request) { + return omRatisClient.sendCommand(request); + } + + /** + * Submits request directly to OM. + */ + private OMResponse submitRequestToOM(OMRequest request) + throws ServiceException { + Type cmdType = request.getCmdType(); OMResponse.Builder responseBuilder = OMResponse.newBuilder() .setCmdType(cmdType); + switch (cmdType) { case CreateVolume: CreateVolumeResponse createVolumeResponse = createVolume( @@ -318,7 +343,6 @@ public OMResponse submitRequest(RpcController controller, } return responseBuilder.build(); } - // Convert and exception to corresponding status code private Status exceptionToResponseStatus(IOException ex) { if (ex instanceof OMException) { diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java index 564deb2e67..d8915ae09f 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java @@ -18,35 +18,58 @@ package org.apache.hadoop.ozone.om.ratis; +import java.net.InetAddress; import java.nio.file.Path; import java.nio.file.Paths; import java.util.UUID; -import org.apache.hadoop.conf.Configuration; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.test.GenericTestUtils; import org.apache.ratis.util.LifeCycle; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertFalse; /** * Test OM Ratis server. */ public class TestOzoneManagerRatisServer { - private Configuration conf; + private OzoneConfiguration conf; private OzoneManagerRatisServer omRatisServer; + private OzoneManagerRatisClient omRatisClient; private String omID; + private String clientId = UUID.randomUUID().toString(); + private static final long LEADER_ELECTION_TIMEOUT = 500L; @Before - public void init() { + public void init() throws Exception { conf = new OzoneConfiguration(); omID = UUID.randomUUID().toString(); final String path = GenericTestUtils.getTempPath(omID); Path metaDirPath = Paths.get(path, "om-meta"); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString()); + conf.setTimeDuration( + OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, + LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS); + omRatisServer = OzoneManagerRatisServer.newOMRatisServer(omID, + InetAddress.getLocalHost(), conf); + omRatisServer.start(); + omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(omID, + omRatisServer.getRaftGroup(), conf); + omRatisClient.connect(); } @After @@ -54,6 +77,9 @@ public void shutdown() { if (omRatisServer != null) { omRatisServer.stop(); } + if (omRatisClient != null) { + omRatisClient.close(); + } } /** @@ -61,9 +87,59 @@ public void shutdown() { */ @Test public void testStartOMRatisServer() throws Exception { - omRatisServer = OzoneManagerRatisServer.newOMRatisServer(omID, conf); - omRatisServer.start(); Assert.assertEquals("Ratis Server should be in running state", LifeCycle.State.RUNNING, omRatisServer.getServerState()); } + + /** + * Submit any request to OM Ratis server and check that the dummy response + * message is received. + * TODO: Once state machine is implemented, submitting a request to Ratis + * server should result in a valid response. + */ + @Test + public void testSubmitRatisRequest() throws Exception { + // Wait for leader election + Thread.sleep(LEADER_ELECTION_TIMEOUT * 2); + + OMRequest request = OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume) + .setClientId(clientId) + .build(); + + OMResponse response = omRatisClient.sendCommand(request); + + // Since the state machine is not implemented yet, we should get the + // configured dummy message from Ratis. + Assert.assertEquals(false, response.getSuccess()); + Assert.assertTrue(response.getMessage().contains("Dummy response from " + + "Ratis server for command type: " + + OzoneManagerProtocolProtos.Type.CreateVolume)); + Assert.assertEquals(false, response.hasCreateVolumeResponse()); + } + + /** + * Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are + * categorized in {@link OmUtils#isReadOnly(OMRequest)}. + */ + @Test + public void testIsReadOnlyCapturesAllCmdTypeEnums() throws Exception { + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer + .captureLogs(LoggerFactory.getLogger(OmUtils.class)); + String clientId = UUID.randomUUID().toString(); + OzoneManagerProtocolProtos.Type[] cmdTypes = + OzoneManagerProtocolProtos.Type.values(); + + for (OzoneManagerProtocolProtos.Type cmdtype : cmdTypes) { + OMRequest request = OMRequest.newBuilder() + .setCmdType(cmdtype) + .setClientId(clientId) + .build(); + OmUtils.isReadOnly(request); + assertFalse(cmdtype + "is not categorized in OmUtils#isReadyOnly", + logCapturer.getOutput().contains("CmdType " + cmdtype +" is not " + + "categorized as readOnly or not.")); + logCapturer.clearOutput(); + } + } }