From 10cf5773ba32566dd76730e32a3ccdf2b3bd4d09 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Mon, 19 Nov 2018 14:38:51 +0530 Subject: [PATCH] HDDS-845. Create a new raftClient instance for every watch request for Ratis. Contributed by Shashikant Banerjee. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 3 +- .../hadoop/hdds/scm/XceiverClientRatis.java | 38 ++++++++++++++----- .../hadoop/hdds/scm/XceiverClientSpi.java | 3 +- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index c6b19abebe..5592c1d0c1 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -290,7 +290,8 @@ private void reconnect(DatanodeDetails dn) @Override public void watchForCommit(long index, long timeout) - throws InterruptedException, ExecutionException, TimeoutException { + throws InterruptedException, ExecutionException, TimeoutException, + IOException { // there is no notion of watch for commit index in standalone pipeline }; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 6b3b001dbc..b238a0937e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -122,11 +122,15 @@ public void connect() throws Exception { public void close() { final RaftClient c = client.getAndSet(null); if (c != null) { - try { - c.close(); - } catch (IOException e) { - throw new IllegalStateException(e); - } + closeRaftClient(c); + } + } + + private void closeRaftClient(RaftClient raftClient) { + try { + raftClient.close(); + } catch (IOException e) { + throw new IllegalStateException(e); } } @@ -145,19 +149,35 @@ private CompletableFuture sendRequestAsync( @Override public void watchForCommit(long index, long timeout) - throws InterruptedException, ExecutionException, TimeoutException { - // TODO: Create a new Raft client instance to watch - CompletableFuture replyFuture = getClient() + throws InterruptedException, ExecutionException, TimeoutException, + IOException { + LOG.debug("commit index : {} watch timeout : {}", index, timeout); + // create a new RaftClient instance for watch request + RaftClient raftClient = + RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy); + CompletableFuture replyFuture = raftClient .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); try { replyFuture.get(timeout, TimeUnit.MILLISECONDS); } catch (TimeoutException toe) { LOG.warn("3 way commit failed ", toe); - getClient() + + closeRaftClient(raftClient); + // generate a new raft client instance again so that next watch request + // does not get blocked for the previous one + + // TODO : need to remove the code to create the new RaftClient instance + // here once the watch request bypassing sliding window in Raft Client + // gets fixed. + raftClient = + RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy); + raftClient .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) .get(timeout, TimeUnit.MILLISECONDS); LOG.info("Could not commit " + index + " to all the nodes." + "Committed by majority."); + } finally { + closeRaftClient(raftClient); } } /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 7000660162..e9896dc906 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -126,5 +126,6 @@ public ContainerCommandResponseProto sendCommand( public abstract HddsProtos.ReplicationType getPipelineType(); public abstract void watchForCommit(long index, long timeout) - throws InterruptedException, ExecutionException, TimeoutException; + throws InterruptedException, ExecutionException, TimeoutException, + IOException; }