From ba4d5a52a8be68ae826f6e6f9d4edbaba9131e85 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Tue, 23 Jan 2018 09:28:03 -0800 Subject: [PATCH] HDFS-12986. Ozone: Update ozone to latest ratis snapshot build (0.1.1-alpha-0f7169d-SNAPSHOT). Contributed by Lokesh Jain --- .../hadoop-client-runtime/pom.xml | 1 + .../apache/hadoop/scm/XceiverClientRatis.java | 39 +++++++++++++++---- .../server/ratis/XceiverServerRatis.java | 20 +++++----- 3 files changed, 42 insertions(+), 18 deletions(-) diff --git a/hadoop-client-modules/hadoop-client-runtime/pom.xml b/hadoop-client-modules/hadoop-client-runtime/pom.xml index 363adf5b02..532fae960b 100644 --- a/hadoop-client-modules/hadoop-client-runtime/pom.xml +++ b/hadoop-client-modules/hadoop-client-runtime/pom.xml @@ -157,6 +157,7 @@ com.google.code.findbugs:jsr305 + io.dropwizard.metrics:metrics-core diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java index 12ee328f1f..3bc70ed723 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java @@ -19,6 +19,7 @@ package org.apache.hadoop.scm; import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; @@ -42,6 +43,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; @@ -68,7 +70,9 @@ public static XceiverClientRatis newXceiverClientRatis( private final AtomicReference client = new AtomicReference<>(); private final int maxOutstandingRequests; - /** Constructs a client. */ + /** + * Constructs a client. + */ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, int maxOutStandingChunks) { super(); @@ -78,7 +82,7 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, } /** - * {@inheritDoc} + * {@inheritDoc} */ public void createPipeline(String clusterId, List datanodes) throws IOException { @@ -90,6 +94,7 @@ public void createPipeline(String clusterId, List datanodes) /** * Returns Ratis as pipeline Type. + * * @return - Ratis */ @Override @@ -97,8 +102,7 @@ public OzoneProtos.ReplicationType getPipelineType() { return OzoneProtos.ReplicationType.RATIS; } - private void reinitialize( - List datanodes, RaftGroup group) + private void reinitialize(List datanodes, RaftGroup group) throws IOException { if (datanodes.isEmpty()) { return; @@ -124,8 +128,9 @@ private void reinitialize( /** * Adds a new peers to the Ratis Ring. + * * @param datanode - new datanode - * @param group - Raft group + * @param group - Raft group * @throws IOException - on Failure. */ private void reinitialize(DatanodeID datanode, RaftGroup group) @@ -141,8 +146,6 @@ private void reinitialize(DatanodeID datanode, RaftGroup group) } } - - @Override public Pipeline getPipeline() { return pipeline; @@ -216,6 +219,16 @@ private RaftClientReply sendRequest(ContainerCommandRequestProto request) return reply; } + private CompletableFuture sendRequestAsync( + ContainerCommandRequestProto request) throws IOException { + boolean isReadOnlyRequest = isReadOnly(request); + ByteString byteString = + ShadedProtoUtil.asShadedByteString(request.toByteArray()); + LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request); + return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) : + getClient().sendAsync(() -> byteString); + } + @Override public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request) throws IOException { @@ -236,6 +249,16 @@ public ContainerCommandResponseProto sendCommand( public CompletableFuture sendCommandAsync( ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException { - throw new IOException("Not implemented"); + return sendRequestAsync(request).whenComplete((reply, e) -> + LOG.debug("received reply {} for request: {} exception: {}", request, + reply, e)) + .thenApply(reply -> { + try { + return ContainerCommandResponseProto.parseFrom( + ShadedProtoUtil.asByteString(reply.getMessage().getContent())); + } catch (InvalidProtocolBufferException e) { + throw new CompletionException(e); + } + }); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index ff52341ed7..d0ff094e4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -38,6 +38,7 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,18 +116,17 @@ private static RaftProperties newRaftProperties( RaftServerConfigKeys.setStorageDir(properties, new File(storageDir)); RaftConfigKeys.Rpc.setType(properties, rpc); - //TODO: change these configs to setter after RATIS-154 - properties.setInt("raft.server.log.segment.cache.num.max", 2); - properties.setInt("raft.grpc.message.size.max", - scmChunkSize + raftSegmentPreallocatedSize); - properties.setInt("raft.server.rpc.timeout.min", 800); - properties.setInt("raft.server.rpc.timeout.max", 1000); + RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2); + GrpcConfigKeys.setMessageSizeMax(properties, + SizeInBytes.valueOf(scmChunkSize + raftSegmentPreallocatedSize)); + RaftServerConfigKeys.Rpc.setTimeoutMin(properties, + TimeDuration.valueOf(800, TimeUnit.MILLISECONDS)); + RaftServerConfigKeys.Rpc.setTimeoutMax(properties, + TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS)); if (rpc == SupportedRpcType.GRPC) { GrpcConfigKeys.Server.setPort(properties, port); - } else { - if (rpc == SupportedRpcType.NETTY) { - NettyConfigKeys.Server.setPort(properties, port); - } + } else if (rpc == SupportedRpcType.NETTY) { + NettyConfigKeys.Server.setPort(properties, port); } return properties; }