From 5a9140690aba295ba1226a3190b52f34347a8372 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Tue, 22 May 2018 16:51:43 -0700 Subject: [PATCH] HDDS-49. Standalone protocol should use grpc in place of netty. Contributed by Mukul Kumar Singh. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 217 ++++++++++++++++++ .../hadoop/hdds/scm/XceiverClientManager.java | 21 +- .../hadoop/hdds/scm/XceiverClientMetrics.java | 8 +- .../dev-support/findbugsExcludeFile.xml | 3 + hadoop-hdds/common/pom.xml | 17 ++ .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 + .../proto/DatanodeContainerProtocol.proto | 7 + .../src/main/resources/ozone-default.xml | 9 + .../common/helpers/ContainerMetrics.java | 14 +- .../transport/server/GrpcXceiverService.java | 82 +++++++ .../transport/server/XceiverServerGrpc.java | 105 +++++++++ .../container/ozoneimpl/OzoneContainer.java | 11 +- .../hadoop/ozone/MiniOzoneClusterImpl.java | 10 +- .../ozone/scm/TestXceiverClientManager.java | 67 ++++-- hadoop-project/pom.xml | 1 + 15 files changed, 540 insertions(+), 36 deletions(-) create mode 100644 hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java new file mode 100644 index 0000000000..84790e8e71 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; +import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.util.Time; +import org.apache.ratis.shaded.io.grpc.ManagedChannel; +import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * A Client for the storageContainer protocol. + */ +public class XceiverClientGrpc extends XceiverClientSpi { + static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); + private final Pipeline pipeline; + private final Configuration config; + private XceiverClientProtocolServiceStub asyncStub; + private XceiverClientMetrics metrics; + private ManagedChannel channel; + private final Semaphore semaphore; + + /** + * Constructs a client that can communicate with the Container framework on + * data nodes. + * + * @param pipeline - Pipeline that defines the machines. + * @param config -- Ozone Config + */ + public XceiverClientGrpc(Pipeline pipeline, Configuration config) { + super(); + Preconditions.checkNotNull(pipeline); + Preconditions.checkNotNull(config); + this.pipeline = pipeline; + this.config = config; + this.semaphore = + new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); + this.metrics = XceiverClientManager.getXceiverClientMetrics(); + } + + @Override + public void connect() throws Exception { + DatanodeDetails leader = this.pipeline.getLeader(); + + // read port from the data node, on failure use default configured + // port. + int port = leader.getContainerPort(); + if (port == 0) { + port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); + } + LOG.debug("Connecting to server Port : " + leader.getIpAddress()); + channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port) + .usePlaintext(true) + .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) + .build(); + asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel); + } + + /** + * Returns if the xceiver client connects to a server. + * + * @return True if the connection is alive, false otherwise. + */ + @VisibleForTesting + public boolean isConnected() { + return !channel.isTerminated() && !channel.isShutdown(); + } + + @Override + public void close() { + channel.shutdownNow(); + try { + channel.awaitTermination(60, TimeUnit.MINUTES); + } catch (Exception e) { + LOG.error("Unexpected exception while waiting for channel termination", + e); + } + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request) throws IOException { + try { + return sendCommandAsync(request).get(); + } catch (ExecutionException | InterruptedException e) { + /** + * In case the grpc channel handler throws an exception, + * the exception thrown will be wrapped within {@link ExecutionException}. + * Unwarpping here so that original exception gets passed + * to to the client. + */ + if (e instanceof ExecutionException) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } + } + throw new IOException( + "Unexpected exception during execution:" + e.getMessage()); + } + } + + /** + * Sends a given command to server gets a waitable future back. + * + * @param request Request + * @return Response to the command + * @throws IOException + */ + @Override + public CompletableFuture + sendCommandAsync(ContainerCommandRequestProto request) + throws IOException, ExecutionException, InterruptedException { + final CompletableFuture replyFuture = + new CompletableFuture<>(); + semaphore.acquire(); + long requestTime = Time.monotonicNowNanos(); + metrics.incrPendingContainerOpsMetrics(request.getCmdType()); + // create a new grpc stream for each non-async call. + final StreamObserver requestObserver = + asyncStub.send(new StreamObserver() { + @Override + public void onNext(ContainerCommandResponseProto value) { + replyFuture.complete(value); + metrics.decrPendingContainerOpsMetrics(request.getCmdType()); + metrics.addContainerOpsLatency(request.getCmdType(), + Time.monotonicNowNanos() - requestTime); + semaphore.release(); + } + @Override + public void onError(Throwable t) { + replyFuture.completeExceptionally(t); + metrics.decrPendingContainerOpsMetrics(request.getCmdType()); + metrics.addContainerOpsLatency(request.getCmdType(), + Time.monotonicNowNanos() - requestTime); + semaphore.release(); + } + + @Override + public void onCompleted() { + if (!replyFuture.isDone()) { + replyFuture.completeExceptionally( + new IOException("Stream completed but no reply for request " + + request)); + } + } + }); + requestObserver.onNext(request); + requestObserver.onCompleted(); + return replyFuture; + } + + /** + * Create a pipeline. + * + * @param pipelineID - Name of the pipeline. + * @param datanodes - Datanodes + */ + @Override + public void createPipeline(String pipelineID, List datanodes) + throws IOException { + // For stand alone pipeline, there is no notion called setup pipeline. + return; + } + + /** + * Returns pipeline Type. + * + * @return - Stand Alone as the type. + */ + @Override + public HddsProtos.ReplicationType getPipelineType() { + return HddsProtos.ReplicationType.STAND_ALONE; + } +} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index dcaa57621d..89197971ee 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -41,8 +41,6 @@ .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos - .ReplicationType.RATIS; /** * XceiverClientManager is responsible for the lifecycle of XceiverClient @@ -62,6 +60,7 @@ public class XceiverClientManager implements Closeable { private final Configuration conf; private final Cache clientCache; private final boolean useRatis; + private final boolean useGrpc; private static XceiverClientMetrics metrics; /** @@ -79,6 +78,8 @@ public XceiverClientManager(Configuration conf) { this.useRatis = conf.getBoolean( ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); + this.useGrpc = conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT); this.conf = conf; this.clientCache = CacheBuilder.newBuilder() .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) @@ -146,9 +147,19 @@ private XceiverClientSpi getClient(Pipeline pipeline, long containerID) new Callable() { @Override public XceiverClientSpi call() throws Exception { - XceiverClientSpi client = pipeline.getType() == RATIS ? - XceiverClientRatis.newXceiverClientRatis(pipeline, conf) - : new XceiverClient(pipeline, conf); + XceiverClientSpi client = null; + switch (pipeline.getType()) { + case RATIS: + client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf); + break; + case STAND_ALONE: + client = useGrpc ? new XceiverClientGrpc(pipeline, conf) : + new XceiverClient(pipeline, conf); + break; + case CHAINED: + default: + throw new IOException ("not implemented" + pipeline.getType()); + } client.connect(); return client; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java index fbc348cd6a..a4304009ad 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java @@ -49,13 +49,13 @@ public XceiverClientMetrics() { this.containerOpsLatency = new MutableRate[numEnumEntries]; for (int i = 0; i < numEnumEntries; i++) { pendingOpsArray[i] = registry.newCounter( - "numPending" + ContainerProtos.Type.valueOf(i + 1), - "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops", + "numPending" + ContainerProtos.Type.forNumber(i + 1), + "number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops", (long) 0); containerOpsLatency[i] = registry.newRate( - ContainerProtos.Type.valueOf(i + 1) + "Latency", - "latency of " + ContainerProtos.Type.valueOf(i + 1) + ContainerProtos.Type.forNumber(i + 1) + "Latency", + "latency of " + ContainerProtos.Type.forNumber(i + 1) + " ops"); } } diff --git a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml index 3571a8929e..daf6fec79a 100644 --- a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml @@ -18,4 +18,7 @@ + + + diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml index 6310df1a68..a8a634c0a2 100644 --- a/hadoop-hdds/common/pom.xml +++ b/hadoop-hdds/common/pom.xml @@ -61,6 +61,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> ratis-grpc org.apache.ratis + + com.google.errorprone + error_prone_annotations + 2.2.0 + true + org.rocksdb @@ -108,7 +114,15 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> compile test-compile + compile-custom + test-compile-custom + + grpc-java + + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + @@ -122,6 +136,9 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 29ccf308b1..85407e65ce 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -49,6 +49,10 @@ public final class ScmConfigKeys { = "dfs.container.ratis.enabled"; public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT = false; + public static final String DFS_CONTAINER_GRPC_ENABLED_KEY + = "dfs.container.grpc.enabled"; + public static final boolean DFS_CONTAINER_GRPC_ENABLED_DEFAULT + = false; public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY = "dfs.container.ratis.rpc.type"; public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 95b7cbba1b..1138297892 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -24,6 +24,7 @@ // This file contains protocol buffers that are used to transfer data // to and from the datanode. +syntax = "proto2"; option java_package = "org.apache.hadoop.hdds.protocol.datanode.proto"; option java_outer_classname = "ContainerProtos"; option java_generate_equals_and_hash = true; @@ -418,3 +419,9 @@ message CopyContainerResponseProto { repeated bytes data = 5; optional int64 checksum = 6; } + +service XceiverClientProtocolService { + // A client-to-datanode RPC to send container commands + rpc send(stream ContainerCommandRequestProto) returns + (stream ContainerCommandResponseProto) {} +} \ No newline at end of file diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index e0aca67605..7a91610c65 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -71,6 +71,15 @@ the replication pipeline supported by ozone. + + dfs.container.grpc.enabled + false + OZONE, MANAGEMENT, PIPELINE, RATIS + Ozone supports different kinds of replication pipelines + protocols. grpc is one of the replication pipeline protocol supported by + ozone. + + dfs.container.ratis.ipc 9858 diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java index 4300b2da42..714db598d7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java @@ -63,20 +63,20 @@ public ContainerMetrics(int[] intervals) { this.registry = new MetricsRegistry("StorageContainerMetrics"); for (int i = 0; i < numEnumEntries; i++) { numOpsArray[i] = registry.newCounter( - "num" + ContainerProtos.Type.valueOf(i + 1), - "number of " + ContainerProtos.Type.valueOf(i + 1) + " ops", + "num" + ContainerProtos.Type.forNumber(i + 1), + "number of " + ContainerProtos.Type.forNumber(i + 1) + " ops", (long) 0); opsBytesArray[i] = registry.newCounter( - "bytes" + ContainerProtos.Type.valueOf(i + 1), - "bytes used by " + ContainerProtos.Type.valueOf(i + 1) + "op", + "bytes" + ContainerProtos.Type.forNumber(i + 1), + "bytes used by " + ContainerProtos.Type.forNumber(i + 1) + "op", (long) 0); opsLatency[i] = registry.newRate( - "latency" + ContainerProtos.Type.valueOf(i + 1), - ContainerProtos.Type.valueOf(i + 1) + " op"); + "latency" + ContainerProtos.Type.forNumber(i + 1), + ContainerProtos.Type.forNumber(i + 1) + " op"); for (int j = 0; j < len; j++) { int interval = intervals[j]; - String quantileName = ContainerProtos.Type.valueOf(i + 1) + "Nanos" + String quantileName = ContainerProtos.Type.forNumber(i + 1) + "Nanos" + interval + "s"; opsLatQuantiles[i][j] = registry.newQuantiles(quantileName, "latency of Container ops", "ops", "latency", interval); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java new file mode 100644 index 0000000000..df6220cec7 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto + .XceiverClientProtocolServiceGrpc; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Grpc Service for handling Container Commands on datanode. + */ +public class GrpcXceiverService extends + XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceImplBase { + public static final Logger + LOG = LoggerFactory.getLogger(GrpcXceiverService.class); + + private final ContainerDispatcher dispatcher; + + public GrpcXceiverService(ContainerDispatcher dispatcher) { + this.dispatcher = dispatcher; + } + + @Override + public StreamObserver send( + StreamObserver responseObserver) { + return new StreamObserver() { + private final AtomicBoolean isClosed = new AtomicBoolean(false); + + @Override + public void onNext(ContainerCommandRequestProto request) { + try { + ContainerCommandResponseProto resp = dispatcher.dispatch(request); + responseObserver.onNext(resp); + } catch (Throwable e) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} got exception when processing" + + " ContainerCommandRequestProto {}: {}", request, e); + } + responseObserver.onError(e); + } + } + + @Override + public void onError(Throwable t) { + // for now we just log a msg + LOG.info("{}: ContainerCommand send on error. Exception: {}", t); + } + + @Override + public void onCompleted() { + if (isClosed.compareAndSet(false, true)) { + LOG.info("{}: ContainerCommand send completed"); + responseObserver.onCompleted(); + } + } + }; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java new file mode 100644 index 0000000000..30a2f875f0 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.ratis.shaded.io.grpc.Server; +import org.apache.ratis.shaded.io.grpc.ServerBuilder; +import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketAddress; + +/** + * Creates a Grpc server endpoint that acts as the communication layer for + * Ozone containers. + */ +public final class XceiverServerGrpc implements XceiverServerSpi { + private static final Logger + LOG = LoggerFactory.getLogger(XceiverServerGrpc.class); + private int port; + private Server server; + + /** + * Constructs a Grpc server class. + * + * @param conf - Configuration + */ + public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, + ContainerDispatcher dispatcher) { + Preconditions.checkNotNull(conf); + + this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); + // Get an available port on current node and + // use that as the container port + if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) { + try (ServerSocket socket = new ServerSocket()) { + socket.setReuseAddress(true); + SocketAddress address = new InetSocketAddress(0); + socket.bind(address); + this.port = socket.getLocalPort(); + LOG.info("Found a free port for the server : {}", this.port); + } catch (IOException e) { + LOG.error("Unable find a random free port for the server, " + + "fallback to use default port {}", this.port, e); + } + } + datanodeDetails.setContainerPort(port); + server = ((NettyServerBuilder) ServerBuilder.forPort(port)) + .maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) + .addService(new GrpcXceiverService(dispatcher)) + .build(); + } + + @Override + public int getIPCPort() { + return this.port; + } + + /** + * Returns the Replication type supported by this end-point. + * + * @return enum -- {Stand_Alone, Ratis, Grpc, Chained} + */ + @Override + public HddsProtos.ReplicationType getServerType() { + return HddsProtos.ReplicationType.STAND_ALONE; + } + + @Override + public void start() throws IOException { + server.start(); + } + + @Override + public void stop() { + server.shutdown(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 1fc79d738c..b497cdc6d0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -39,6 +40,8 @@ import org.apache.hadoop.ozone.container.common.statemachine.background .BlockDeletingService; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; +import org.apache.hadoop.ozone.container.common.transport.server + .XceiverServerGrpc; import org.apache.hadoop.ozone.container.common.transport.server .XceiverServerSpi; import org.apache.hadoop.ozone.container.common.transport.server.ratis @@ -121,8 +124,14 @@ public OzoneContainer( this.dispatcher = new Dispatcher(manager, this.ozoneConfig); + boolean useGrpc = this.ozoneConfig.getBoolean( + ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT); server = new XceiverServerSpi[]{ - new XceiverServer(datanodeDetails, this.ozoneConfig, this.dispatcher), + useGrpc ? new XceiverServerGrpc(datanodeDetails, + this.ozoneConfig, this.dispatcher) : + new XceiverServer(datanodeDetails, + this.ozoneConfig, this.dispatcher), XceiverServerRatis .newXceiverServerRatis(datanodeDetails, this.ozoneConfig, dispatcher) }; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 08d71760cb..993681511d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -220,13 +220,13 @@ public void restartHddsDatanode(int i) { datanodeService.stop(); datanodeService.join(); // ensure same ports are used across restarts. - Configuration conf = datanodeService.getConf(); + Configuration config = datanodeService.getConf(); int currentPort = datanodeService.getDatanodeDetails().getContainerPort(); - conf.setInt(DFS_CONTAINER_IPC_PORT, currentPort); - conf.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false); + config.setInt(DFS_CONTAINER_IPC_PORT, currentPort); + config.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false); int ratisPort = datanodeService.getDatanodeDetails().getRatisPort(); - conf.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort); - conf.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false); + config.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort); + config.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false); datanodeService.start(null); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java index 07ad6ef610..77e4e1bcfd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java @@ -19,6 +19,7 @@ import com.google.common.cache.Cache; import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -30,13 +31,17 @@ .StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.junit.Assert; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import static org.apache.hadoop.hdds.scm .ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY; @@ -44,19 +49,32 @@ /** * Test for XceiverClientManager caching and eviction. */ +@RunWith(Parameterized.class) public class TestXceiverClientManager { private static OzoneConfiguration config; private static MiniOzoneCluster cluster; private static StorageContainerLocationProtocolClientSideTranslatorPB storageContainerLocationClient; private static String containerOwner = "OZONE"; + private static boolean shouldUseGrpc; + + @Parameterized.Parameters + public static Collection withGrpc() { + return Arrays.asList(new Object[][] {{false}, {true}}); + } + + public TestXceiverClientManager(boolean useGrpc) { + shouldUseGrpc = useGrpc; + } @Rule public ExpectedException exception = ExpectedException.none(); - @BeforeClass - public static void init() throws Exception { + @Before + public void init() throws Exception { config = new OzoneConfiguration(); + config.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + shouldUseGrpc); cluster = MiniOzoneCluster.newBuilder(config) .setNumDatanodes(3) .build(); @@ -65,8 +83,8 @@ public static void init() throws Exception { .getStorageContainerLocationClient(); } - @AfterClass - public static void shutdown() { + @After + public void shutdown() { if (cluster != null) { cluster.shutdown(); } @@ -76,6 +94,8 @@ public static void shutdown() { @Test public void testCaching() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); + conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + shouldUseGrpc); XceiverClientManager clientManager = new XceiverClientManager(conf); ContainerInfo container1 = storageContainerLocationClient @@ -106,6 +126,8 @@ public void testCaching() throws IOException { public void testFreeByReference() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); + conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + shouldUseGrpc); XceiverClientManager clientManager = new XceiverClientManager(conf); Cache cache = clientManager.getClientCache(); @@ -140,10 +162,18 @@ public void testFreeByReference() throws IOException { // After releasing the client, this connection should be closed // and any container operations should fail clientManager.releaseClient(client1); - exception.expect(IOException.class); - exception.expectMessage("This channel is not connected."); - ContainerProtocolCalls.createContainer(client1, - container1.getContainerID(), traceID1); + + String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" : + "This channel is not connected."; + try { + ContainerProtocolCalls.createContainer(client1, + container1.getContainerID(), traceID1); + Assert.fail("Create container should throw exception on closed" + + "client"); + } catch (Exception e) { + Assert.assertEquals(e.getClass(), IOException.class); + Assert.assertTrue(e.getMessage().contains(expectedMessage)); + } clientManager.releaseClient(client2); } @@ -151,6 +181,8 @@ public void testFreeByReference() throws IOException { public void testFreeByEviction() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); + conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + shouldUseGrpc); XceiverClientManager clientManager = new XceiverClientManager(conf); Cache cache = clientManager.getClientCache(); @@ -181,10 +213,17 @@ public void testFreeByEviction() throws IOException { // Any container operation should now fail String traceID2 = "trace" + RandomStringUtils.randomNumeric(4); - exception.expect(IOException.class); - exception.expectMessage("This channel is not connected."); - ContainerProtocolCalls.createContainer(client1, - container1.getContainerID(), traceID2); + String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" : + "This channel is not connected."; + try { + ContainerProtocolCalls.createContainer(client1, + container1.getContainerID(), traceID2); + Assert.fail("Create container should throw exception on closed" + + "client"); + } catch (Exception e) { + Assert.assertEquals(e.getClass(), IOException.class); + Assert.assertTrue(e.getMessage().contains(expectedMessage)); + } clientManager.releaseClient(client2); } } diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index a916108f66..73c3f5b0d6 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -107,6 +107,7 @@ 0.5.1 3.5.0 + 1.10.0 1.5.0.Final