HDFS-15099. [SBN Read] checkOperation(WRITE) should throw ObserverRetryOnActiveException for ObserverNode. Contributed by Chen Liang.
This commit is contained in:
parent
2301b25899
commit
26a969ec73
@ -34,6 +34,7 @@
|
||||
|
||||
import javax.security.sasl.SaslException;
|
||||
|
||||
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
@ -678,7 +679,7 @@ public RetryAction shouldRetry(Exception e, int retries,
|
||||
e instanceof UnknownHostException ||
|
||||
e instanceof StandbyException ||
|
||||
e instanceof ConnectTimeoutException ||
|
||||
isWrappedStandbyException(e)) {
|
||||
shouldFailoverOnException(e)) {
|
||||
return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
|
||||
getFailoverOrRetrySleepTime(failovers));
|
||||
} else if (e instanceof RetriableException
|
||||
@ -730,12 +731,13 @@ private static long calculateExponentialTime(long time, int retries) {
|
||||
return calculateExponentialTime(time, retries, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
private static boolean isWrappedStandbyException(Exception e) {
|
||||
private static boolean shouldFailoverOnException(Exception e) {
|
||||
if (!(e instanceof RemoteException)) {
|
||||
return false;
|
||||
}
|
||||
Exception unwrapped = ((RemoteException)e).unwrapRemoteException(
|
||||
StandbyException.class);
|
||||
StandbyException.class,
|
||||
ObserverRetryOnActiveException.class);
|
||||
return unwrapped instanceof StandbyException;
|
||||
}
|
||||
|
||||
|
@ -20,8 +20,6 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Thrown by a remote ObserverNode indicating the operation has failed and the
|
||||
* client should retry active namenode directly (instead of retry other
|
||||
@ -29,7 +27,7 @@
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class ObserverRetryOnActiveException extends IOException {
|
||||
public class ObserverRetryOnActiveException extends StandbyException {
|
||||
static final long serialVersionUID = 1L;
|
||||
public ObserverRetryOnActiveException(String msg) {
|
||||
super(msg);
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
|
||||
/**
|
||||
@ -95,8 +96,18 @@ public void checkOperation(HAContext context, OperationCategory op)
|
||||
String faq = ". Visit https://s.apache.org/sbnn-error";
|
||||
String msg = "Operation category " + op + " is not supported in state "
|
||||
+ context.getState() + faq;
|
||||
if (op == OperationCategory.WRITE && isObserver) {
|
||||
// If observer receives a write call, return active retry
|
||||
// exception to inform client to retry on active.
|
||||
// A write should never happen on Observer. Except that,
|
||||
// if access time is enabled. A open call can transition
|
||||
// to a write operation. In this case, Observer
|
||||
// should inform the client to retry this open on Active.
|
||||
throw new ObserverRetryOnActiveException(msg);
|
||||
} else {
|
||||
throw new StandbyException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldPopulateReplQueues() {
|
||||
|
@ -52,6 +52,7 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
|
||||
import org.apache.hadoop.hdfs.tools.GetGroups;
|
||||
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
@ -366,6 +367,52 @@ public void testFsckWithObserver() throws Exception {
|
||||
assertTrue(result.contains("Status: HEALTHY"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that, if a write happens happens to go to Observer,
|
||||
* Observer would throw {@link ObserverRetryOnActiveException},
|
||||
* to inform client to retry on Active
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testObserverRetryActiveException() throws Exception {
|
||||
boolean thrownRetryException = false;
|
||||
try {
|
||||
// Force a write on Observer, which should throw
|
||||
// retry on active exception.
|
||||
dfsCluster.getNameNode(2)
|
||||
.getRpcServer()
|
||||
.mkdirs("/testActiveRetryException",
|
||||
FsPermission.createImmutable((short) 0755), true);
|
||||
} catch (ObserverRetryOnActiveException orae) {
|
||||
thrownRetryException = true;
|
||||
}
|
||||
assertTrue(thrownRetryException);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that for open call, if access time update is required,
|
||||
* the open call should go to active, instead of observer.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testAccessTimeUpdateRedirectToActive() throws Exception {
|
||||
// Create a new pat to not mess up other tests
|
||||
Path tmpTestPath = new Path("/TestObserverNodeAccessTime");
|
||||
dfs.create(tmpTestPath, (short)1).close();
|
||||
assertSentTo(0);
|
||||
dfs.open(tmpTestPath).close();
|
||||
assertSentTo(2);
|
||||
// Set access time to a time in the far past.
|
||||
// So that next read call is guaranteed to
|
||||
// have passed access time period.
|
||||
dfs.setTimes(tmpTestPath, 0, 0);
|
||||
// Verify that aTime update redirects on Active
|
||||
dfs.open(tmpTestPath).close();
|
||||
assertSentTo(0);
|
||||
}
|
||||
|
||||
private void assertSentTo(int nnIdx) throws IOException {
|
||||
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
||||
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
||||
|
Loading…
Reference in New Issue
Block a user