HDFS-17156. Client may receive old state ID which will lead to inconsistent reads. (#5951)

Reviewed-by: Simbarashe Dzinamarira <sdzinamarira@linkedin.com>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
Chunyi Yang 2023-08-18 01:56:34 +09:00 committed by GitHub
parent 65e4a66e25
commit 42b4525f75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 2 deletions

View File

@ -1214,10 +1214,10 @@ private void receiveRpcResponse() {
if (status == RpcStatusProto.SUCCESS) {
Writable value = packet.newInstance(valueClass, conf);
final Call call = calls.remove(callId);
call.setRpcResponse(value);
if (call.alignmentContext != null) {
call.alignmentContext.receiveResponseState(header);
}
call.setRpcResponse(value);
}
// verify that packet length was correct
if (packet.remaining() > 0) {

View File

@ -82,6 +82,7 @@
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
@ -162,9 +163,15 @@ static LongWritable call(Client client, long param, InetSocketAddress addr,
static LongWritable call(Client client, LongWritable param,
InetSocketAddress addr, int rpcTimeout, Configuration conf)
throws IOException {
return call(client, param, addr, rpcTimeout, conf, null);
}
static LongWritable call(Client client, LongWritable param,
InetSocketAddress addr, int rpcTimeout, Configuration conf, AlignmentContext alignmentContext)
throws IOException {
final ConnectionId remoteId = getConnectionId(addr, rpcTimeout, conf);
return (LongWritable)client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
RPC.RPC_SERVICE_CLASS_DEFAULT, null);
RPC.RPC_SERVICE_CLASS_DEFAULT, null, alignmentContext);
}
static class TestServer extends Server {
@ -1330,6 +1337,37 @@ public void run() {
server.stop();
}
}
/**
* Verify that stateID is received into call before
* caller is notified.
* @throws IOException
*/
@Test(timeout=60000)
public void testReceiveStateBeforeCallerNotification() throws IOException {
AtomicBoolean stateReceived = new AtomicBoolean(false);
AlignmentContext alignmentContext = Mockito.mock(AlignmentContext.class);
Mockito.doAnswer((Answer<Void>) invocation -> {
Thread.sleep(1000);
stateReceived.set(true);
return null;
}).when(alignmentContext)
.receiveResponseState(any(RpcHeaderProtos.RpcResponseHeaderProto.class));
final Client client = new Client(LongWritable.class, conf);
final TestServer server = new TestServer(1, false);
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
call(client, new LongWritable(RANDOM.nextLong()), addr,
0, conf, alignmentContext);
Assert.assertTrue(stateReceived.get());
} finally {
client.stop();
server.stop();
}
}
/** A dummy protocol */
interface DummyProtocol {