From 3f6195045ee826815956e33122943280f60b38f5 Mon Sep 17 00:00:00 2001 From: Nanda kumar Date: Wed, 26 Sep 2018 09:15:25 +0530 Subject: [PATCH] HDDS-554. In XceiverClientSpi, implement sendCommand(..) using sendCommandAsync(..). Contributed by Tsz Wo Nicholas Sze. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 23 ------------- .../hadoop/hdds/scm/XceiverClientRatis.java | 32 ++----------------- .../hadoop/hdds/scm/XceiverClientSpi.java | 10 ++++-- 3 files changed, 10 insertions(+), 55 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 3cdbc7cc99..d353e7af42 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -120,29 +120,6 @@ public Pipeline getPipeline() { return pipeline; } - @Override - public ContainerCommandResponseProto sendCommand( - ContainerCommandRequestProto request) throws IOException { - try { - return sendCommandAsync(request).get(); - } catch (ExecutionException | InterruptedException e) { - /** - * In case the grpc channel handler throws an exception, - * the exception thrown will be wrapped within {@link ExecutionException}. - * Unwarpping here so that original exception gets passed - * to to the client. - */ - if (e instanceof ExecutionException) { - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - throw (IOException) cause; - } - } - throw new IOException( - "Unexpected exception during execution:" + e.getMessage()); - } - } - /** * Sends a given command to server gets a waitable future back. * diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 4c4de7f35b..0d301d9851 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm; -import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.io.MultipleIOException; import org.apache.ratis.retry.RetryPolicy; @@ -52,7 +51,6 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; /** @@ -183,20 +181,8 @@ private RaftClient getClient() { return Objects.requireNonNull(client.get(), "client is null"); } - private RaftClientReply sendRequest(ContainerCommandRequestProto request) - throws IOException { - boolean isReadOnlyRequest = HddsUtils.isReadOnly(request); - ByteString byteString = request.toByteString(); - LOG.debug("sendCommand {} {}", isReadOnlyRequest, request); - final RaftClientReply reply = isReadOnlyRequest ? - getClient().sendReadOnly(() -> byteString) : - getClient().send(() -> byteString); - LOG.debug("reply {} {}", isReadOnlyRequest, reply); - return reply; - } - private CompletableFuture sendRequestAsync( - ContainerCommandRequestProto request) throws IOException { + ContainerCommandRequestProto request) { boolean isReadOnlyRequest = HddsUtils.isReadOnly(request); ByteString byteString = request.toByteString(); LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request); @@ -204,19 +190,6 @@ private CompletableFuture sendRequestAsync( getClient().sendAsync(() -> byteString); } - @Override - public ContainerCommandResponseProto sendCommand( - ContainerCommandRequestProto request) throws IOException { - final RaftClientReply reply = sendRequest(request); - if (reply == null) { - throw new IOException( - String.format("Could not execute the request %s", request)); - } - Preconditions.checkState(reply.isSuccess()); - return ContainerCommandResponseProto.parseFrom( - reply.getMessage().getContent()); - } - /** * Sends a given command to server gets a waitable future back. * @@ -226,8 +199,7 @@ public ContainerCommandResponseProto sendCommand( */ @Override public CompletableFuture sendCommandAsync( - ContainerCommandRequestProto request) - throws IOException, ExecutionException, InterruptedException { + ContainerCommandRequestProto request) { return sendRequestAsync(request).whenComplete((reply, e) -> LOG.debug("received reply {} for request: {} exception: {}", request, reply, e)) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index e8ef5c572c..571d148848 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -95,8 +95,14 @@ public int getRefcount() { * @return Response to the command * @throws IOException */ - public abstract ContainerCommandResponseProto sendCommand( - ContainerCommandRequestProto request) throws IOException; + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request) throws IOException { + try { + return sendCommandAsync(request).get(); + } catch (ExecutionException | InterruptedException e) { + throw new IOException("Failed to command " + request, e); + } + } /** * Sends a given command to server gets a waitable future back.