From 42b4525f75b828bf58170187f030b08622e238ab Mon Sep 17 00:00:00 2001 From: Chunyi Yang <32279893+chunyiyang@users.noreply.github.com> Date: Fri, 18 Aug 2023 01:56:34 +0900 Subject: [PATCH] HDFS-17156. Client may receive old state ID which will lead to inconsistent reads. (#5951) Reviewed-by: Simbarashe Dzinamarira Signed-off-by: Takanobu Asanuma --- .../java/org/apache/hadoop/ipc/Client.java | 2 +- .../java/org/apache/hadoop/ipc/TestIPC.java | 40 ++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 0fb1fd7abf..4ccb4254c7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -1214,10 +1214,10 @@ private void receiveRpcResponse() { if (status == RpcStatusProto.SUCCESS) { Writable value = packet.newInstance(valueClass, conf); final Call call = calls.remove(callId); - call.setRpcResponse(value); if (call.alignmentContext != null) { call.alignmentContext.receiveResponseState(header); } + call.setRpcResponse(value); } // verify that packet length was correct if (packet.remaining() > 0) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 394bcfc896..7cfd65d482 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -82,6 +82,7 @@ import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.Server.Connection; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; @@ -162,9 +163,15 @@ static LongWritable call(Client client, long param, InetSocketAddress addr, static LongWritable call(Client client, LongWritable param, InetSocketAddress addr, int rpcTimeout, Configuration conf) throws IOException { + return call(client, param, addr, rpcTimeout, conf, null); + } + + static LongWritable call(Client client, LongWritable param, + InetSocketAddress addr, int rpcTimeout, Configuration conf, AlignmentContext alignmentContext) + throws IOException { final ConnectionId remoteId = getConnectionId(addr, rpcTimeout, conf); return (LongWritable)client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, - RPC.RPC_SERVICE_CLASS_DEFAULT, null); + RPC.RPC_SERVICE_CLASS_DEFAULT, null, alignmentContext); } static class TestServer extends Server { @@ -1330,6 +1337,37 @@ public void run() { server.stop(); } } + + /** + * Verify that stateID is received into call before + * caller is notified. + * @throws IOException + */ + @Test(timeout=60000) + public void testReceiveStateBeforeCallerNotification() throws IOException { + AtomicBoolean stateReceived = new AtomicBoolean(false); + AlignmentContext alignmentContext = Mockito.mock(AlignmentContext.class); + Mockito.doAnswer((Answer) invocation -> { + Thread.sleep(1000); + stateReceived.set(true); + return null; + }).when(alignmentContext) + .receiveResponseState(any(RpcHeaderProtos.RpcResponseHeaderProto.class)); + + final Client client = new Client(LongWritable.class, conf); + final TestServer server = new TestServer(1, false); + + try { + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + call(client, new LongWritable(RANDOM.nextLong()), addr, + 0, conf, alignmentContext); + Assert.assertTrue(stateReceived.get()); + } finally { + client.stop(); + server.stop(); + } + } /** A dummy protocol */ interface DummyProtocol {