HDFS-13873. [SBN read] ObserverNode should reject read requests when it is too far behind. Contributed by Konstantin Shvachko.
This commit is contained in:
parent
b8ad6c85a5
commit
b73fb70f97
@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||
@ -64,9 +66,15 @@ public interface AlignmentContext {
|
||||
* client state info during RPC response header processing.
|
||||
*
|
||||
* @param header The RPC request header.
|
||||
* @return state id of in the request header.
|
||||
* @param threshold a parameter to verify a condition when server
|
||||
* should reject client request due to its state being too far
|
||||
* misaligned with the client state.
|
||||
* See implementation for more details.
|
||||
* @return state id required for the server to execute the call.
|
||||
* @throws IOException
|
||||
*/
|
||||
long receiveRequestState(RpcRequestHeaderProto header);
|
||||
long receiveRequestState(RpcRequestHeaderProto header, long threshold)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the last seen state id of the alignment context instance.
|
||||
|
@ -2573,6 +2573,7 @@ private void processRpcRequest(RpcRequestHeaderProto header,
|
||||
|
||||
// Save the priority level assignment by the scheduler
|
||||
call.setPriorityLevel(callQueue.getPriorityLevel(call));
|
||||
call.markCallCoordinated(false);
|
||||
if(alignmentContext != null && call.rpcRequest != null &&
|
||||
(call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) {
|
||||
// if call.rpcRequest is not RpcProtobufRequest, will skip the following
|
||||
@ -2581,23 +2582,21 @@ private void processRpcRequest(RpcRequestHeaderProto header,
|
||||
// coordinated.
|
||||
String methodName;
|
||||
String protoName;
|
||||
ProtobufRpcEngine.RpcProtobufRequest req =
|
||||
(ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
|
||||
try {
|
||||
ProtobufRpcEngine.RpcProtobufRequest req =
|
||||
(ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
|
||||
methodName = req.getRequestHeader().getMethodName();
|
||||
protoName = req.getRequestHeader().getDeclaringClassProtocolName();
|
||||
if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
|
||||
call.markCallCoordinated(true);
|
||||
long stateId;
|
||||
stateId = alignmentContext.receiveRequestState(
|
||||
header, getMaxIdleTime());
|
||||
call.setClientStateId(stateId);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
throw new RpcServerException("Rpc request header check fail", ioe);
|
||||
throw new RpcServerException("Processing RPC request caught ", ioe);
|
||||
}
|
||||
if (!alignmentContext.isCoordinatedCall(protoName, methodName)) {
|
||||
call.markCallCoordinated(false);
|
||||
} else {
|
||||
call.markCallCoordinated(true);
|
||||
long stateId = alignmentContext.receiveRequestState(header);
|
||||
call.setClientStateId(stateId);
|
||||
}
|
||||
} else {
|
||||
call.markCallCoordinated(false);
|
||||
}
|
||||
|
||||
try {
|
||||
@ -3698,6 +3697,10 @@ public void run() {
|
||||
}
|
||||
}
|
||||
|
||||
protected int getMaxIdleTime() {
|
||||
return connectionManager.maxIdleTime;
|
||||
}
|
||||
|
||||
public String getServerName() {
|
||||
return serverName;
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.LongAccumulator;
|
||||
|
||||
/**
|
||||
@ -60,7 +61,8 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Client side implementation for receiving state alignment info in responses.
|
||||
* Client side implementation for receiving state alignment info
|
||||
* in responses.
|
||||
*/
|
||||
@Override
|
||||
public void receiveResponseState(RpcResponseHeaderProto header) {
|
||||
@ -80,7 +82,8 @@ public void updateRequestState(RpcRequestHeaderProto.Builder header) {
|
||||
* Client does not receive RPC requests therefore this does nothing.
|
||||
*/
|
||||
@Override
|
||||
public long receiveRequestState(RpcRequestHeaderProto header) {
|
||||
public long receiveRequestState(RpcRequestHeaderProto header, long threshold)
|
||||
throws IOException {
|
||||
// Do nothing.
|
||||
return 0;
|
||||
}
|
||||
|
@ -20,12 +20,15 @@
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
|
||||
import org.apache.hadoop.ipc.AlignmentContext;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||
|
||||
@ -36,8 +39,23 @@
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Stable
|
||||
class GlobalStateIdContext implements AlignmentContext {
|
||||
private final FSNamesystem namesystem;
|
||||
/**
|
||||
* Estimated number of journal transactions a typical NameNode can execute
|
||||
* per second. The number is used to estimate how long a client's
|
||||
* RPC request will wait in the call queue before the Observer catches up
|
||||
* with its state id.
|
||||
*/
|
||||
private static final long ESTIMATED_TRANSACTIONS_PER_SECOND = 10000L;
|
||||
|
||||
/**
|
||||
* The client wait time on an RPC request is composed of
|
||||
* the server execution time plus the communication time.
|
||||
* This is an expected fraction of the total wait time spent on
|
||||
* server execution.
|
||||
*/
|
||||
private static final float ESTIMATED_SERVER_TIME_MULTIPLIER = 0.8f;
|
||||
|
||||
private final FSNamesystem namesystem;
|
||||
private final HashSet<String> coordinatedMethods;
|
||||
|
||||
/**
|
||||
@ -88,17 +106,41 @@ public void updateRequestState(RpcRequestHeaderProto.Builder header) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Server side implementation for processing state alignment info in requests.
|
||||
* Server-side implementation for processing state alignment info in
|
||||
* requests.
|
||||
* For Observer it compares the client and the server states and determines
|
||||
* if it makes sense to wait until the server catches up with the client
|
||||
* state. If not the server throws RetriableException so that the client
|
||||
* could retry the call according to the retry policy with another Observer
|
||||
* or the Active NameNode.
|
||||
*
|
||||
* @param header The RPC request header.
|
||||
* @param clientWaitTime time in milliseconds indicating how long client
|
||||
* waits for the server response. It is used to verify if the client's
|
||||
* state is too far ahead of the server's
|
||||
* @return the minimum of the state ids of the client or the server.
|
||||
* @throws RetriableException if Observer is too far behind.
|
||||
*/
|
||||
@Override
|
||||
public long receiveRequestState(RpcRequestHeaderProto header) {
|
||||
public long receiveRequestState(RpcRequestHeaderProto header,
|
||||
long clientWaitTime) throws RetriableException {
|
||||
long serverStateId =
|
||||
namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
|
||||
long clientStateId = header.getStateId();
|
||||
if (clientStateId > serverStateId &&
|
||||
HAServiceProtocol.HAServiceState.ACTIVE.equals(namesystem.getState())) {
|
||||
HAServiceState.ACTIVE.equals(namesystem.getState())) {
|
||||
FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
|
||||
", but server state is: " + serverStateId);
|
||||
return serverStateId;
|
||||
}
|
||||
if (HAServiceState.OBSERVER.equals(namesystem.getState()) &&
|
||||
clientStateId - serverStateId >
|
||||
ESTIMATED_TRANSACTIONS_PER_SECOND
|
||||
* TimeUnit.MILLISECONDS.toSeconds(clientWaitTime)
|
||||
* ESTIMATED_SERVER_TIME_MULTIPLIER) {
|
||||
throw new RetriableException(
|
||||
"Observer Node is too far behind: serverStateId = "
|
||||
+ serverStateId + " clientStateId = " + clientStateId);
|
||||
}
|
||||
return clientStateId;
|
||||
}
|
||||
|
@ -25,6 +25,7 @@
|
||||
import static org.apache.hadoop.hdfs.DFSUtil.createUri;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
@ -34,6 +35,7 @@
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.LongAccumulator;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
@ -43,6 +45,7 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.ClientGSIContext;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
@ -334,4 +337,21 @@ public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Customize stateId of the client AlignmentContext for testing.
|
||||
*/
|
||||
public static long setACStateId(DistributedFileSystem dfs,
|
||||
long stateId) throws Exception {
|
||||
ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
|
||||
((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
|
||||
dfs.getClient().getNamenode())).getProxyProvider();
|
||||
ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext());
|
||||
Field f = ac.getClass().getDeclaredField("lastSeenStateId");
|
||||
f.setAccessible(true);
|
||||
LongAccumulator lastSeenStateId = (LongAccumulator)f.get(ac);
|
||||
long currentStateId = lastSeenStateId.getThenReset();
|
||||
lastSeenStateId.accumulate(stateId);
|
||||
return currentStateId;
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
@ -138,6 +139,19 @@ public void testMultiObserver() throws Exception {
|
||||
dfsCluster.transitionToObserver(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testObserverFallBehind() throws Exception {
|
||||
dfs.mkdir(testPath, FsPermission.getDefault());
|
||||
assertSentTo(0);
|
||||
|
||||
// Set large state Id on the client
|
||||
long realStateId = HATestUtil.setACStateId(dfs, 500000);
|
||||
dfs.getFileStatus(testPath);
|
||||
// Should end up on ANN
|
||||
assertSentTo(0);
|
||||
HATestUtil.setACStateId(dfs, realStateId);
|
||||
}
|
||||
|
||||
private void assertSentTo(int... nnIndices) throws IOException {
|
||||
assertTrue("Request was not sent to any of the expected namenodes.",
|
||||
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIndices));
|
||||
|
Loading…
Reference in New Issue
Block a user