HDFS-13331. [SBN read] Add lastSeenStateId to RpcRequestHeader. Contributed by Plamen Jeliazkov.
This commit is contained in:
parent
c9d73437e8
commit
9bf0696c73
@ -20,6 +20,7 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||
|
||||
/**
|
||||
@ -48,4 +49,17 @@ public interface AlignmentContext {
|
||||
*/
|
||||
void receiveResponseState(RpcResponseHeaderProto header);
|
||||
|
||||
/**
|
||||
* This is the intended client method call to pull last seen state info
|
||||
* into RPC request processing.
|
||||
* @param header The RPC request header builder.
|
||||
*/
|
||||
void updateRequestState(RpcRequestHeaderProto.Builder header);
|
||||
|
||||
/**
|
||||
* This is the intended server method call to implement to receive
|
||||
* client state info during RPC response header processing.
|
||||
* @param header The RPC request header.
|
||||
*/
|
||||
void receiveRequestState(RpcRequestHeaderProto header);
|
||||
}
|
||||
|
@ -1114,7 +1114,7 @@ public void sendRpcRequest(final Call call)
|
||||
// Items '1' and '2' are prepared here.
|
||||
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
|
||||
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
|
||||
clientId);
|
||||
clientId, alignmentContext);
|
||||
|
||||
final ResponseBuffer buf = new ResponseBuffer();
|
||||
header.writeDelimitedTo(buf);
|
||||
|
@ -2523,6 +2523,11 @@ private void processRpcRequest(RpcRequestHeaderProto header,
|
||||
}
|
||||
}
|
||||
|
||||
if (alignmentContext != null) {
|
||||
// Check incoming RPC request's state.
|
||||
alignmentContext.receiveRequestState(header);
|
||||
}
|
||||
|
||||
CallerContext callerContext = null;
|
||||
if (header.hasCallerContext()) {
|
||||
callerContext =
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.io.DataInput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.ipc.AlignmentContext;
|
||||
import org.apache.hadoop.ipc.CallerContext;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
||||
@ -165,6 +166,13 @@ public static RPC.RpcKind convert( RpcKindProto kind) {
|
||||
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
|
||||
RpcRequestHeaderProto.OperationProto operation, int callId,
|
||||
int retryCount, byte[] uuid) {
|
||||
return makeRpcRequestHeader(rpcKind, operation, callId, retryCount, uuid,
|
||||
null);
|
||||
}
|
||||
|
||||
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
|
||||
RpcRequestHeaderProto.OperationProto operation, int callId,
|
||||
int retryCount, byte[] uuid, AlignmentContext alignmentContext) {
|
||||
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
|
||||
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
|
||||
.setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid));
|
||||
@ -190,6 +198,11 @@ public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
|
||||
result.setCallerContext(contextBuilder);
|
||||
}
|
||||
|
||||
// Add alignment context if it is not null
|
||||
if (alignmentContext != null) {
|
||||
alignmentContext.updateRequestState(result);
|
||||
}
|
||||
|
||||
return result.build();
|
||||
}
|
||||
}
|
||||
|
@ -90,6 +90,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
|
||||
optional sint32 retryCount = 5 [default = -1];
|
||||
optional RPCTraceInfoProto traceInfo = 6; // tracing info
|
||||
optional RPCCallerContextProto callerContext = 7; // call context
|
||||
optional int64 stateId = 8; // The last seen Global State ID
|
||||
}
|
||||
|
||||
|
||||
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ipc.AlignmentContext;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||
|
||||
import java.util.concurrent.atomic.LongAccumulator;
|
||||
@ -33,16 +34,11 @@
|
||||
@InterfaceStability.Stable
|
||||
class ClientGCIContext implements AlignmentContext {
|
||||
|
||||
private final DFSClient dfsClient;
|
||||
private final LongAccumulator lastSeenStateId =
|
||||
new LongAccumulator(Math::max, Long.MIN_VALUE);
|
||||
|
||||
/**
|
||||
* Client side constructor.
|
||||
* @param dfsClient client side state receiver
|
||||
*/
|
||||
ClientGCIContext(DFSClient dfsClient) {
|
||||
this.dfsClient = dfsClient;
|
||||
long getLastSeenStateId() {
|
||||
return lastSeenStateId.get();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -55,11 +51,27 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Client side implementation for receiving state alignment info.
|
||||
* Client side implementation for receiving state alignment info in responses.
|
||||
*/
|
||||
@Override
|
||||
public void receiveResponseState(RpcResponseHeaderProto header) {
|
||||
lastSeenStateId.accumulate(header.getStateId());
|
||||
dfsClient.lastSeenStateId = lastSeenStateId.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Client side implementation for providing state alignment info in requests.
|
||||
*/
|
||||
@Override
|
||||
public void updateRequestState(RpcRequestHeaderProto.Builder header) {
|
||||
header.setStateId(lastSeenStateId.longValue());
|
||||
}
|
||||
|
||||
/**
|
||||
* Client side implementation only provides state alignment info in requests.
|
||||
* Client does not receive RPC requests therefore this does nothing.
|
||||
*/
|
||||
@Override
|
||||
public void receiveRequestState(RpcRequestHeaderProto header) {
|
||||
// Do nothing.
|
||||
}
|
||||
}
|
||||
|
@ -220,7 +220,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||
final UserGroupInformation ugi;
|
||||
volatile boolean clientRunning = true;
|
||||
volatile long lastLeaseRenewal;
|
||||
volatile long lastSeenStateId;
|
||||
private volatile FsServerDefaults serverDefaults;
|
||||
private volatile long serverDefaultsLastUpdate;
|
||||
final String clientName;
|
||||
@ -243,6 +242,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||
private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
|
||||
private final int smallBufferSize;
|
||||
private final long serverDefaultsValidityPeriod;
|
||||
private final ClientGCIContext alignmentContext;
|
||||
|
||||
public DfsClientConf getConf() {
|
||||
return dfsClientConf;
|
||||
@ -398,7 +398,8 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
|
||||
this.saslClient = new SaslDataTransferClient(
|
||||
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
||||
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
|
||||
Client.setAlignmentContext(new ClientGCIContext(this));
|
||||
this.alignmentContext = new ClientGCIContext();
|
||||
Client.setAlignmentContext(alignmentContext);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -547,6 +548,11 @@ public boolean isClientRunning() {
|
||||
return clientRunning;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
ClientGCIContext getAlignmentContext() {
|
||||
return alignmentContext;
|
||||
}
|
||||
|
||||
long getLastLeaseRenewal() {
|
||||
return lastLeaseRenewal;
|
||||
}
|
||||
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ipc.AlignmentContext;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||
|
||||
/**
|
||||
@ -41,7 +42,7 @@ class GlobalStateIdContext implements AlignmentContext {
|
||||
}
|
||||
|
||||
/**
|
||||
* Server side implementation for providing state alignment info.
|
||||
* Server side implementation for providing state alignment info in responses.
|
||||
*/
|
||||
@Override
|
||||
public void updateResponseState(RpcResponseHeaderProto.Builder header) {
|
||||
@ -56,4 +57,27 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) {
|
||||
public void receiveResponseState(RpcResponseHeaderProto header) {
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
/**
|
||||
* Server side implementation only receives state alignment info.
|
||||
* It does not build RPC requests therefore this does nothing.
|
||||
*/
|
||||
@Override
|
||||
public void updateRequestState(RpcRequestHeaderProto.Builder header) {
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
/**
|
||||
* Server side implementation for processing state alignment info in requests.
|
||||
*/
|
||||
@Override
|
||||
public void receiveRequestState(RpcRequestHeaderProto header) {
|
||||
long serverStateId = namesystem.getLastWrittenTransactionId();
|
||||
long clientStateId = header.getStateId();
|
||||
if (clientStateId > serverStateId) {
|
||||
FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
|
||||
", but server state is: " + serverStateId);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,20 +18,30 @@
|
||||
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.ipc.AlignmentContext;
|
||||
import org.apache.hadoop.ipc.Client;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Class is used to test server sending state alignment information to clients
|
||||
@ -91,7 +101,7 @@ public void after() throws IOException {
|
||||
public void testStateTransferOnWrite() throws Exception {
|
||||
long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
|
||||
DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
|
||||
long clientState = dfs.dfs.lastSeenStateId;
|
||||
long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
|
||||
long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
|
||||
// Write(s) should have increased state. Check for greater than.
|
||||
assertThat(clientState > preWriteState, is(true));
|
||||
@ -109,7 +119,8 @@ public void testStateTransferOnRead() throws Exception {
|
||||
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
|
||||
DFSTestUtil.readFile(dfs, new Path("/testFile2"));
|
||||
// Read should catch client up to last written state.
|
||||
assertThat(dfs.dfs.lastSeenStateId, is(lastWrittenId));
|
||||
long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
|
||||
assertThat(clientState, is(lastWrittenId));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -122,10 +133,80 @@ public void testStateTransferOnFreshClient() throws Exception {
|
||||
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
|
||||
try (DistributedFileSystem clearDfs =
|
||||
(DistributedFileSystem) FileSystem.get(CONF)) {
|
||||
assertThat(clearDfs.dfs.lastSeenStateId, is(0L));
|
||||
ClientGCIContext clientState = clearDfs.dfs.getAlignmentContext();
|
||||
assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
|
||||
DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
|
||||
assertThat(clearDfs.dfs.lastSeenStateId, is(lastWrittenId));
|
||||
assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test mocks an AlignmentContext and ensures that DFSClient
|
||||
* writes its lastSeenStateId into RPC requests.
|
||||
*/
|
||||
@Test
|
||||
public void testClientSendsState() throws Exception {
|
||||
AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
|
||||
AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
|
||||
Client.setAlignmentContext(spiedAlignContext);
|
||||
|
||||
// Collect RpcRequestHeaders for verification later.
|
||||
final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> collectedHeaders =
|
||||
new ArrayList<>();
|
||||
Mockito.doAnswer(a -> {
|
||||
Object[] arguments = a.getArguments();
|
||||
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
|
||||
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
|
||||
collectedHeaders.add(header);
|
||||
return a.callRealMethod();
|
||||
}).when(spiedAlignContext).updateRequestState(Mockito.any());
|
||||
|
||||
DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
|
||||
|
||||
// Ensure first header and last header have different state.
|
||||
assertThat(collectedHeaders.size() > 1, is(true));
|
||||
assertThat(collectedHeaders.get(0).getStateId(),
|
||||
is(not(collectedHeaders.get(collectedHeaders.size() - 1))));
|
||||
|
||||
// Ensure collected RpcRequestHeaders are in increasing order.
|
||||
long lastHeader = collectedHeaders.get(0).getStateId();
|
||||
for(RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
|
||||
collectedHeaders.subList(1, collectedHeaders.size())) {
|
||||
long currentHeader = header.getStateId();
|
||||
assertThat(currentHeader >= lastHeader, is(true));
|
||||
lastHeader = header.getStateId();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test mocks an AlignmentContext to send stateIds greater than
|
||||
* server's stateId in RPC requests.
|
||||
*/
|
||||
@Test
|
||||
public void testClientSendsGreaterState() throws Exception {
|
||||
AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
|
||||
AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
|
||||
Client.setAlignmentContext(spiedAlignContext);
|
||||
|
||||
// Make every client call have a stateId > server's stateId.
|
||||
Mockito.doAnswer(a -> {
|
||||
Object[] arguments = a.getArguments();
|
||||
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
|
||||
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
|
||||
try {
|
||||
return a.callRealMethod();
|
||||
} finally {
|
||||
header.setStateId(Long.MAX_VALUE);
|
||||
}
|
||||
}).when(spiedAlignContext).updateRequestState(Mockito.any());
|
||||
|
||||
GenericTestUtils.LogCapturer logCapturer =
|
||||
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
|
||||
DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
|
||||
logCapturer.stopCapturing();
|
||||
|
||||
String output = logCapturer.getOutput();
|
||||
assertThat(output, containsString("A client sent stateId: "));
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user