HDFS-14660. [SBN Read] ObserverNameNode should throw StandbyException for requests not from ObserverProxyProvider. Contributed by Chao Sun.
This commit is contained in:
parent
2fe450cb5e
commit
02bd02b5af
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -26,9 +27,11 @@
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
|
||||
import org.apache.hadoop.ipc.AlignmentContext;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||
|
||||
@ -123,7 +126,18 @@ public void updateRequestState(RpcRequestHeaderProto.Builder header) {
|
||||
*/
|
||||
@Override
|
||||
public long receiveRequestState(RpcRequestHeaderProto header,
|
||||
long clientWaitTime) throws RetriableException {
|
||||
long clientWaitTime) throws IOException {
|
||||
if (!header.hasStateId() &&
|
||||
HAServiceState.OBSERVER.equals(namesystem.getState())) {
|
||||
// This could happen if client configured with non-observer proxy provider
|
||||
// (e.g., ConfiguredFailoverProxyProvider) is accessing a cluster with
|
||||
// observers. In this case, we should let the client failover to the
|
||||
// active node, rather than potentially serving stale result (client
|
||||
// stateId is 0 if not set).
|
||||
throw new StandbyException("Observer Node received request without "
|
||||
+ "stateId. This mostly likely is because client is not configured "
|
||||
+ "with " + ObserverReadProxyProvider.class.getSimpleName());
|
||||
}
|
||||
long serverStateId = getLastSeenStateId();
|
||||
long clientStateId = header.getStateId();
|
||||
FSNamesystem.LOG.trace("Client State ID= {} and Server State ID= {}",
|
||||
|
@ -22,6 +22,7 @@
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@ -33,12 +34,15 @@
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.RpcScheduler;
|
||||
import org.apache.hadoop.ipc.Schedulable;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.After;
|
||||
@ -344,6 +348,39 @@ public void testUncoordinatedCall() throws Exception {
|
||||
reader.interrupt();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestFromNonObserverProxyProvider() throws Exception {
|
||||
// Create another HDFS client using ConfiguredFailoverProvider
|
||||
Configuration conf2 = new Configuration(conf);
|
||||
|
||||
// Populate the above configuration with only a single observer in the
|
||||
// namenode list. Also reduce retries to make test finish faster.
|
||||
HATestUtil.setFailoverConfigurations(
|
||||
conf2,
|
||||
HATestUtil.getLogicalHostname(dfsCluster),
|
||||
Collections.singletonList(
|
||||
dfsCluster.getNameNode(2).getNameNodeAddress()),
|
||||
ConfiguredFailoverProxyProvider.class);
|
||||
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
|
||||
conf2.setInt(HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, 1);
|
||||
conf2.setInt(HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, 1);
|
||||
FileSystem dfs2 = FileSystem.get(conf2);
|
||||
|
||||
dfs.mkdir(testPath, FsPermission.getDefault());
|
||||
dfsCluster.rollEditLogAndTail(0);
|
||||
|
||||
try {
|
||||
// Request should be rejected by observer and throw StandbyException
|
||||
dfs2.listStatus(testPath);
|
||||
fail("listStatus should have thrown exception");
|
||||
} catch (RemoteException re) {
|
||||
IOException e = re.unwrapRemoteException();
|
||||
assertTrue("should have thrown StandbyException but got "
|
||||
+ e.getClass().getSimpleName(),
|
||||
e instanceof StandbyException);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertSentTo(int nnIdx) throws IOException {
|
||||
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
||||
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
||||
|
Loading…
Reference in New Issue
Block a user