diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java deleted file mode 100644 index 5f2fe266e7..0000000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.ratis.shaded.io.netty.bootstrap.Bootstrap; -import org.apache.ratis.shaded.io.netty.channel.Channel; -import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup; -import org.apache.ratis.shaded.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.ratis.shaded.io.netty.channel.socket.nio.NioSocketChannel; -import org.apache.ratis.shaded.io.netty.handler.logging.LogLevel; -import org.apache.ratis.shaded.io.netty.handler.logging.LoggingHandler; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; - -/** - * A Client for the storageContainer protocol. - */ -public class XceiverClient extends XceiverClientSpi { - static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); - private final Pipeline pipeline; - private final Configuration config; - private Channel channel; - private Bootstrap b; - private EventLoopGroup group; - private final Semaphore semaphore; - private boolean closed = false; - - /** - * Constructs a client that can communicate with the Container framework on - * data nodes. - * - * @param pipeline - Pipeline that defines the machines. - * @param config -- Ozone Config - */ - public XceiverClient(Pipeline pipeline, Configuration config) { - super(); - Preconditions.checkNotNull(pipeline); - Preconditions.checkNotNull(config); - this.pipeline = pipeline; - this.config = config; - this.semaphore = - new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); - } - - @Override - public void connect() throws Exception { - if (closed) { - throw new IOException("This channel is not connected."); - } - - if (channel != null && channel.isActive()) { - throw new IOException("This client is already connected to a host."); - } - - group = new NioEventLoopGroup(); - b = new Bootstrap(); - b.group(group) - .channel(NioSocketChannel.class) - .handler(new LoggingHandler(LogLevel.INFO)) - .handler(new XceiverClientInitializer(this.pipeline, semaphore)); - DatanodeDetails leader = this.pipeline.getLeader(); - - // read port from the data node, on failure use default configured - // port. - int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); - if (port == 0) { - port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, - OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); - } - LOG.debug("Connecting to server Port : " + port); - channel = b.connect(leader.getHostName(), port).sync().channel(); - } - - public void reconnect() throws IOException { - try { - connect(); - if (channel == null || !channel.isActive()) { - throw new IOException("This channel is not connected."); - } - } catch (Exception e) { - LOG.error("Error while connecting: ", e); - throw new IOException(e); - } - } - - /** - * Returns if the exceiver client connects to a server. - * - * @return True if the connection is alive, false otherwise. - */ - @VisibleForTesting - public boolean isConnected() { - return channel.isActive(); - } - - @Override - public void close() { - closed = true; - if (group != null) { - group.shutdownGracefully().awaitUninterruptibly(); - } - } - - @Override - public Pipeline getPipeline() { - return pipeline; - } - - @Override - public ContainerProtos.ContainerCommandResponseProto sendCommand( - ContainerProtos.ContainerCommandRequestProto request) throws IOException { - try { - if ((channel == null) || (!channel.isActive())) { - reconnect(); - } - XceiverClientHandler handler = - channel.pipeline().get(XceiverClientHandler.class); - - return handler.sendCommand(request); - } catch (ExecutionException | InterruptedException e) { - /** - * In case the netty channel handler throws an exception, - * the exception thrown will be wrapped within {@link ExecutionException}. - * Unwarpping here so that original exception gets passed - * to to the client. - */ - if (e instanceof ExecutionException) { - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - throw (IOException) cause; - } - } - throw new IOException( - "Unexpected exception during execution:" + e.getMessage()); - } - } - - /** - * Sends a given command to server gets a waitable future back. - * - * @param request Request - * @return Response to the command - * @throws IOException - */ - @Override - public CompletableFuture - sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) - throws IOException, ExecutionException, InterruptedException { - if ((channel == null) || (!channel.isActive())) { - reconnect(); - } - XceiverClientHandler handler = - channel.pipeline().get(XceiverClientHandler.class); - return handler.sendCommandAsync(request); - } - - /** - * Create a pipeline. - */ - @Override - public void createPipeline() - throws IOException { - // For stand alone pipeline, there is no notion called setup pipeline. - } - - public void destroyPipeline() { - // For stand alone pipeline, there is no notion called destroy pipeline. - } - - /** - * Returns pipeline Type. - * - * @return - Stand Alone as the type. - */ - @Override - public HddsProtos.ReplicationType getPipelineType() { - return HddsProtos.ReplicationType.STAND_ALONE; - } -} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java deleted file mode 100644 index 7c568f6b87..0000000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm; - -import com.google.common.base.Preconditions; -import org.apache.ratis.shaded.io.netty.channel.Channel; -import org.apache.ratis.shaded.io.netty.channel.ChannelHandlerContext; -import org.apache.ratis.shaded.io.netty.channel.SimpleChannelInboundHandler; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.Semaphore; - -/** - * Netty client handler. - */ -public class XceiverClientHandler extends - SimpleChannelInboundHandler { - - static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class); - private final ConcurrentMap responses = - new ConcurrentHashMap<>(); - - private final Pipeline pipeline; - private volatile Channel channel; - private XceiverClientMetrics metrics; - private final Semaphore semaphore; - - /** - * Constructs a client that can communicate to a container server. - */ - public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) { - super(false); - Preconditions.checkNotNull(pipeline); - this.pipeline = pipeline; - this.metrics = XceiverClientManager.getXceiverClientMetrics(); - this.semaphore = semaphore; - } - - /** - * Please keep in mind that this method will be renamed to {@code - * messageReceived(ChannelHandlerContext, I)} in 5.0. - *

- * Is called for each message of type {@link ContainerProtos - * .ContainerCommandResponseProto}. - * - * @param ctx the {@link ChannelHandlerContext} which this {@link - * SimpleChannelInboundHandler} belongs to - * @param msg the message to handle - * @throws Exception is thrown if an error occurred - */ - @Override - public void channelRead0(ChannelHandlerContext ctx, - ContainerProtos.ContainerCommandResponseProto msg) - throws Exception { - Preconditions.checkNotNull(msg); - metrics.decrPendingContainerOpsMetrics(msg.getCmdType()); - - String key = msg.getTraceID(); - ResponseFuture response = responses.remove(key); - semaphore.release(); - - if (response != null) { - response.getFuture().complete(msg); - - long requestTime = response.getRequestTime(); - metrics.addContainerOpsLatency(msg.getCmdType(), - Time.monotonicNowNanos() - requestTime); - } else { - LOG.error("A reply received for message that was not queued. trace " + - "ID: {}", msg.getTraceID()); - } - } - - @Override - public void channelRegistered(ChannelHandlerContext ctx) { - LOG.debug("channelRegistered: Connected to ctx"); - channel = ctx.channel(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - LOG.info("Exception in client " + cause.toString()); - Iterator keyIterator = responses.keySet().iterator(); - while (keyIterator.hasNext()) { - ResponseFuture response = responses.remove(keyIterator.next()); - response.getFuture().completeExceptionally(cause); - semaphore.release(); - } - ctx.close(); - } - - /** - * Since netty is async, we send a work request and then wait until a response - * appears in the reply queue. This is simple sync interface for clients. we - * should consider building async interfaces for client if this turns out to - * be a performance bottleneck. - * - * @param request - request. - * @return -- response - */ - - public ContainerCommandResponseProto sendCommand( - ContainerProtos.ContainerCommandRequestProto request) - throws ExecutionException, InterruptedException { - Future future = sendCommandAsync(request); - return future.get(); - } - - /** - * SendCommandAsyc queues a command to the Netty Subsystem and returns a - * CompletableFuture. This Future is marked compeleted in the channelRead0 - * when the call comes back. - * @param request - Request to execute - * @return CompletableFuture - */ - public CompletableFuture sendCommandAsync( - ContainerProtos.ContainerCommandRequestProto request) - throws InterruptedException { - - // Throw an exception of request doesn't have traceId - if (StringUtils.isEmpty(request.getTraceID())) { - throw new IllegalArgumentException("Invalid trace ID"); - } - - // Setting the datanode ID in the commands, so that we can distinguish - // commands when the cluster simulator is running. - if(!request.hasDatanodeUuid()) { - throw new IllegalArgumentException("Invalid Datanode ID"); - } - - metrics.incrPendingContainerOpsMetrics(request.getCmdType()); - - CompletableFuture future - = new CompletableFuture<>(); - ResponseFuture response = new ResponseFuture(future, - Time.monotonicNowNanos()); - semaphore.acquire(); - ResponseFuture previous = responses.putIfAbsent( - request.getTraceID(), response); - if (previous != null) { - LOG.error("Command with Trace already exists. Ignoring this command. " + - "{}. Previous Command: {}", request.getTraceID(), - previous.toString()); - throw new IllegalStateException("Duplicate trace ID. Command with this " + - "trace ID is already executing. Please ensure that " + - "trace IDs are not reused. ID: " + request.getTraceID()); - } - - channel.writeAndFlush(request); - return response.getFuture(); - } - - /** - * Class wraps response future info. - */ - static class ResponseFuture { - private final long requestTime; - private final CompletableFuture future; - - ResponseFuture(CompletableFuture future, - long requestTime) { - this.future = future; - this.requestTime = requestTime; - } - - public long getRequestTime() { - return requestTime; - } - - public CompletableFuture getFuture() { - return future; - } - } -} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java deleted file mode 100644 index 90e2f5ad91..0000000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm; - -import org.apache.ratis.shaded.io.netty.channel.ChannelInitializer; -import org.apache.ratis.shaded.io.netty.channel.ChannelPipeline; -import org.apache.ratis.shaded.io.netty.channel.socket.SocketChannel; -import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder; -import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder; -import org.apache.ratis.shaded.io.netty.handler.codec.protobuf - .ProtobufVarint32FrameDecoder; -import org.apache.ratis.shaded.io.netty.handler.codec.protobuf - .ProtobufVarint32LengthFieldPrepender; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; - -import java.util.concurrent.Semaphore; - -/** - * Setup the netty pipeline. - */ -public class XceiverClientInitializer extends - ChannelInitializer { - private final Pipeline pipeline; - private final Semaphore semaphore; - - /** - * Constructs an Initializer for the client pipeline. - * @param pipeline - Pipeline. - */ - public XceiverClientInitializer(Pipeline pipeline, Semaphore semaphore) { - this.pipeline = pipeline; - this.semaphore = semaphore; - } - - /** - * This method will be called once when the Channel is registered. After - * the method returns this instance will be removed from the - * ChannelPipeline of the Channel. - * - * @param ch Channel which was registered. - * @throws Exception is thrown if an error occurs. In that case the - * Channel will be closed. - */ - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline p = ch.pipeline(); - - p.addLast(new ProtobufVarint32FrameDecoder()); - p.addLast(new ProtobufDecoder(ContainerProtos - .ContainerCommandResponseProto.getDefaultInstance())); - - p.addLast(new ProtobufVarint32LengthFieldPrepender()); - p.addLast(new ProtobufEncoder()); - - p.addLast(new XceiverClientHandler(this.pipeline, this.semaphore)); - - } -} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java deleted file mode 100644 index f866fcd3e5..0000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.transport.server; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.ratis.shaded.io.netty.bootstrap.ServerBootstrap; -import org.apache.ratis.shaded.io.netty.channel.Channel; -import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup; -import org.apache.ratis.shaded.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.ratis.shaded.io.netty.channel.socket.nio - .NioServerSocketChannel; -import org.apache.ratis.shaded.io.netty.handler.logging.LogLevel; -import org.apache.ratis.shaded.io.netty.handler.logging.LoggingHandler; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.SocketAddress; - -/** - * Creates a netty server endpoint that acts as the communication layer for - * Ozone containers. - */ -public final class XceiverServer implements XceiverServerSpi { - private static final Logger - LOG = LoggerFactory.getLogger(XceiverServer.class); - private int port; - private final ContainerDispatcher storageContainer; - - private EventLoopGroup bossGroup; - private EventLoopGroup workerGroup; - private Channel channel; - - /** - * Constructs a netty server class. - * - * @param conf - Configuration - */ - public XceiverServer(DatanodeDetails datanodeDetails, Configuration conf, - ContainerDispatcher dispatcher) { - Preconditions.checkNotNull(conf); - - this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, - OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); - // Get an available port on current node and - // use that as the container port - if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, - OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) { - try (ServerSocket socket = new ServerSocket()) { - socket.setReuseAddress(true); - SocketAddress address = new InetSocketAddress(0); - socket.bind(address); - this.port = socket.getLocalPort(); - LOG.info("Found a free port for the server : {}", this.port); - } catch (IOException e) { - LOG.error("Unable find a random free port for the server, " - + "fallback to use default port {}", this.port, e); - } - } - datanodeDetails.setPort( - DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port)); - this.storageContainer = dispatcher; - } - - @Override - public int getIPCPort() { - return this.port; - } - - /** - * Returns the Replication type supported by this end-point. - * - * @return enum -- {Stand_Alone, Ratis, Chained} - */ - @Override - public HddsProtos.ReplicationType getServerType() { - return HddsProtos.ReplicationType.STAND_ALONE; - } - - @Override - public void start() throws IOException { - bossGroup = new NioEventLoopGroup(); - workerGroup = new NioEventLoopGroup(); - channel = new ServerBootstrap() - .group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .handler(new LoggingHandler(LogLevel.INFO)) - .childHandler(new XceiverServerInitializer(storageContainer)) - .bind(port) - .syncUninterruptibly() - .channel(); - } - - @Override - public void stop() { - if (storageContainer != null) { - storageContainer.shutdown(); - } - if (bossGroup != null) { - bossGroup.shutdownGracefully(); - } - if (workerGroup != null) { - workerGroup.shutdownGracefully(); - } - if (channel != null) { - channel.close().awaitUninterruptibly(); - } - } - - @Override - public void submitRequest(ContainerCommandRequestProto request, - HddsProtos.PipelineID pipelineID) { - storageContainer.dispatch(request); - } -} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java deleted file mode 100644 index 37652991c8..0000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.transport.server; - -import org.apache.ratis.shaded.io.netty.channel.ChannelHandlerContext; -import org.apache.ratis.shaded.io.netty.channel.SimpleChannelInboundHandler; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Netty server handlers that respond to Network events. - */ -public class XceiverServerHandler extends - SimpleChannelInboundHandler { - - static final Logger LOG = LoggerFactory.getLogger(XceiverServerHandler.class); - private final ContainerDispatcher dispatcher; - - /** - * Constructor for server handler. - * @param dispatcher - Dispatcher interface - */ - public XceiverServerHandler(ContainerDispatcher dispatcher) { - this.dispatcher = dispatcher; - } - - /** - * Please keep in mind that this method will be renamed to {@code - * messageReceived(ChannelHandlerContext, I)} in 5.0. - *

- * Is called for each message of type {@link ContainerCommandRequestProto}. - * - * @param ctx the {@link ChannelHandlerContext} which this {@link - * SimpleChannelInboundHandler} belongs to - * @param msg the message to handle - * @throws Exception is thrown if an error occurred - */ - @Override - public void channelRead0(ChannelHandlerContext ctx, - ContainerCommandRequestProto msg) throws - Exception { - ContainerCommandResponseProto response = this.dispatcher.dispatch(msg); - LOG.debug("Writing the reponse back to client."); - ctx.writeAndFlush(response); - - } - - /** - * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} - * Sub-classes may override this method to change behavior. - * - * @param ctx - Channel Handler Context - * @param cause - Exception - */ - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - LOG.error("An exception caught in the pipeline : " + cause.toString()); - super.exceptionCaught(ctx, cause); - } -} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java deleted file mode 100644 index e405cf99cc..0000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.transport.server; - -import com.google.common.base.Preconditions; -import org.apache.ratis.shaded.io.netty.channel.ChannelInitializer; -import org.apache.ratis.shaded.io.netty.channel.ChannelPipeline; -import org.apache.ratis.shaded.io.netty.channel.socket.SocketChannel; -import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder; -import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder; -import org.apache.ratis.shaded.io.netty.handler.codec.protobuf - .ProtobufVarint32FrameDecoder; -import org.apache.ratis.shaded.io.netty.handler.codec.protobuf - .ProtobufVarint32LengthFieldPrepender; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; - -/** - * Creates a channel for the XceiverServer. - */ -public class XceiverServerInitializer extends ChannelInitializer{ - private final ContainerDispatcher dispatcher; - public XceiverServerInitializer(ContainerDispatcher dispatcher) { - Preconditions.checkNotNull(dispatcher); - this.dispatcher = dispatcher; - } - - /** - * This method will be called once the Channel is registered. After - * the method returns this instance will be removed from the {@link - * ChannelPipeline} - * - * @param ch the which was registered. - * @throws Exception is thrown if an error occurs. In that case the channel - * will be closed. - */ - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new ProtobufVarint32FrameDecoder()); - pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto - .getDefaultInstance())); - pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); - pipeline.addLast(new ProtobufEncoder()); - pipeline.addLast(new XceiverServerHandler(dispatcher)); - } -} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java index e5bb373d08..302ea465f0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -28,7 +28,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.XceiverClient; +import org.apache.hadoop.hdds.scm.XceiverClientGrpc; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.TestGenericTestUtils; @@ -100,7 +100,7 @@ public class TestMiniOzoneCluster { pipeline.addMember(datanodeDetails); // Verify client is able to connect to the container - try (XceiverClient client = new XceiverClient(pipeline, conf)){ + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){ client.connect(); assertTrue(client.isConnected()); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java index 19b561a427..aac908de24 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.XceiverClientGrpc; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -37,11 +38,12 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.XceiverClient; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.container.replication.GrpcReplicationService; +import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; @@ -54,10 +56,16 @@ import java.util.UUID; */ public class TestContainerMetrics { + private GrpcReplicationService createReplicationService( + ContainerSet containerSet) { + return new GrpcReplicationService( + new OnDemandContainerReplicationSource(containerSet)); + } + @Test public void testContainerMetrics() throws Exception { - XceiverServer server = null; - XceiverClient client = null; + XceiverServerGrpc server = null; + XceiverClientGrpc client = null; long containerID = ContainerTestHelper.getTestContainerID(); String path = GenericTestUtils.getRandomizedTempPath(); @@ -81,8 +89,9 @@ public class TestContainerMetrics { volumeSet, null); dispatcher.setScmId(UUID.randomUUID().toString()); - server = new XceiverServer(datanodeDetails, conf, dispatcher); - client = new XceiverClient(pipeline, conf); + server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, + createReplicationService(containerSet)); + client = new XceiverClientGrpc(pipeline, conf); server.start(); client.connect(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java index b89814e4f3..de55d9eb27 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java @@ -22,6 +22,8 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.replication.GrpcReplicationService; +import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource; import org.apache.ratis.shaded.io.netty.channel.embedded.EmbeddedChannel; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -36,12 +38,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.RatisTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; -import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerHandler; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.web.utils.OzoneUtils; -import org.apache.hadoop.hdds.scm.XceiverClient; +import org.apache.hadoop.hdds.scm.XceiverClientGrpc; import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; @@ -70,43 +71,24 @@ public class TestContainerServer { static final String TEST_DIR = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator; - @Test - public void testPipeline() throws IOException { - EmbeddedChannel channel = null; - String containerName = OzoneUtils.getRequestID(); - try { - channel = new EmbeddedChannel(new XceiverServerHandler( - new TestContainerDispatcher())); - ContainerCommandRequestProto request = - ContainerTestHelper.getCreateContainerRequest( - ContainerTestHelper.getTestContainerID(), - ContainerTestHelper.createSingleNodePipeline()); - channel.writeInbound(request); - Assert.assertTrue(channel.finish()); - - Object responseObject = channel.readOutbound(); - Assert.assertTrue(responseObject instanceof - ContainerCommandResponseProto); - ContainerCommandResponseProto response = - (ContainerCommandResponseProto) responseObject; - Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); - } finally { - if (channel != null) { - channel.close(); - } - } + private GrpcReplicationService createReplicationService( + ContainerSet containerSet) { + return new GrpcReplicationService( + new OnDemandContainerReplicationSource(containerSet)); } @Test public void testClientServer() throws Exception { DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); + ContainerSet containerSet = new ContainerSet(); runTestClientServer(1, (pipeline, conf) -> conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, pipeline.getLeader() .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()), - XceiverClient::new, - (dn, conf) -> new XceiverServer(datanodeDetails, conf, - new TestContainerDispatcher()), + XceiverClientGrpc::new, + (dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf, + new TestContainerDispatcher(), + createReplicationService(containerSet)), (dn, p) -> {}); } @@ -193,8 +175,8 @@ public class TestContainerServer { @Test public void testClientServerWithContainerDispatcher() throws Exception { - XceiverServer server = null; - XceiverClient client = null; + XceiverServerGrpc server = null; + XceiverClientGrpc client = null; try { Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(); @@ -203,12 +185,14 @@ public class TestContainerServer { pipeline.getLeader() .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()); + ContainerSet containerSet = new ContainerSet(); HddsDispatcher dispatcher = new HddsDispatcher( conf, mock(ContainerSet.class), mock(VolumeSet.class), null); dispatcher.init(); DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); - server = new XceiverServer(datanodeDetails, conf, dispatcher); - client = new XceiverClient(pipeline, conf); + server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, + createReplicationService(containerSet)); + client = new XceiverClientGrpc(pipeline, conf); server.start(); client.connect();