HDFS-15112. RBF: Do not return FileNotFoundException when a subcluster is unavailable.

This commit is contained in:
Inigo Goiri 2020-01-16 10:39:14 -08:00
parent edbbc03ce7
commit 263413e838
3 changed files with 114 additions and 23 deletions

View File

@ -21,6 +21,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
@ -436,8 +437,7 @@ private Object invokeMethod(
this.rpcMonitor.proxyOpFailureStandby(); this.rpcMonitor.proxyOpFailureStandby();
} }
failover = true; failover = true;
} else if (ioe instanceof ConnectException || } else if (isUnavailableException(ioe)) {
ioe instanceof ConnectTimeoutException) {
if (this.rpcMonitor != null) { if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpFailureCommunicate(); this.rpcMonitor.proxyOpFailureCommunicate();
} }
@ -503,8 +503,7 @@ private Object invokeMethod(
if (ioe instanceof StandbyException) { if (ioe instanceof StandbyException) {
LOG.error("{} at {} is in Standby: {}", LOG.error("{} at {} is in Standby: {}",
nnKey, addr, ioe.getMessage()); nnKey, addr, ioe.getMessage());
} else if (ioe instanceof ConnectException || } else if (isUnavailableException(ioe)) {
ioe instanceof ConnectTimeoutException) {
exConnect++; exConnect++;
LOG.error("{} at {} cannot be reached: {}", LOG.error("{} at {} cannot be reached: {}",
nnKey, addr, ioe.getMessage()); nnKey, addr, ioe.getMessage());
@ -563,8 +562,7 @@ private Object invoke(String nsId, int retryCount, final Method method,
// failover, invoker looks for standby exceptions for failover. // failover, invoker looks for standby exceptions for failover.
if (ioe instanceof StandbyException) { if (ioe instanceof StandbyException) {
throw ioe; throw ioe;
} else if (ioe instanceof ConnectException || } else if (isUnavailableException(ioe)) {
ioe instanceof ConnectTimeoutException) {
throw ioe; throw ioe;
} else { } else {
throw new StandbyException(ioe.getMessage()); throw new StandbyException(ioe.getMessage());
@ -578,6 +576,27 @@ private Object invoke(String nsId, int retryCount, final Method method,
} }
} }
/**
* Check if the exception comes from an unavailable subcluster.
* @param ioe IOException to check.
* @return If the exception comes from an unavailable subcluster.
*/
public static boolean isUnavailableException(IOException ioe) {
if (ioe instanceof ConnectException ||
ioe instanceof ConnectTimeoutException ||
ioe instanceof EOFException ||
ioe instanceof StandbyException) {
return true;
}
if (ioe instanceof RetriableException) {
Throwable cause = ioe.getCause();
if (cause instanceof NoNamenodesAvailableException) {
return true;
}
}
return false;
}
/** /**
* Check if the cluster of given nameservice id is available. * Check if the cluster of given nameservice id is available.
* @param nsId nameservice ID. * @param nsId nameservice ID.
@ -833,8 +852,7 @@ public <T> T invokeSequential(
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
final Method m = remoteMethod.getMethod(); final Method m = remoteMethod.getMethod();
IOException firstThrownException = null; List<IOException> thrownExceptions = new ArrayList<>();
IOException lastThrownException = null;
Object firstResult = null; Object firstResult = null;
// Invoke in priority order // Invoke in priority order
for (final RemoteLocationContext loc : locations) { for (final RemoteLocationContext loc : locations) {
@ -862,29 +880,33 @@ public <T> T invokeSequential(
ioe = processException(ioe, loc); ioe = processException(ioe, loc);
// Record it and move on // Record it and move on
lastThrownException = ioe; thrownExceptions.add(ioe);
if (firstThrownException == null) {
firstThrownException = lastThrownException;
}
} catch (Exception e) { } catch (Exception e) {
// Unusual error, ClientProtocol calls always use IOException (or // Unusual error, ClientProtocol calls always use IOException (or
// RemoteException). Re-wrap in IOException for compatibility with // RemoteException). Re-wrap in IOException for compatibility with
// ClientProtcol. // ClientProtcol.
LOG.error("Unexpected exception {} proxying {} to {}", LOG.error("Unexpected exception {} proxying {} to {}",
e.getClass(), m.getName(), ns, e); e.getClass(), m.getName(), ns, e);
lastThrownException = new IOException( IOException ioe = new IOException(
"Unexpected exception proxying API " + e.getMessage(), e); "Unexpected exception proxying API " + e.getMessage(), e);
if (firstThrownException == null) { thrownExceptions.add(ioe);
firstThrownException = lastThrownException;
}
} }
} }
if (firstThrownException != null) { if (!thrownExceptions.isEmpty()) {
// re-throw the last exception thrown for compatibility // An unavailable subcluster may be the actual cause
throw firstThrownException; // We cannot surface other exceptions (e.g., FileNotFoundException)
for (int i = 0; i < thrownExceptions.size(); i++) {
IOException ioe = thrownExceptions.get(i);
if (isUnavailableException(ioe)) {
throw ioe;
}
}
// re-throw the first exception thrown for compatibility
throw thrownExceptions.get(0);
} }
// Return the last result, whether it is the value we are looking for or a // Return the first result, whether it is the value or not
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
T ret = (T)firstResult; T ret = (T)firstResult;
return ret; return ret;

View File

@ -653,6 +653,12 @@ RemoteLocation getCreateLocation(
} }
} catch (FileNotFoundException fne) { } catch (FileNotFoundException fne) {
// Ignore if the file is not found // Ignore if the file is not found
} catch (IOException ioe) {
if (RouterRpcClient.isUnavailableException(ioe)) {
LOG.debug("Ignore unavailable exception: {}", ioe);
} else {
throw ioe;
}
} }
} }
return createLocation; return createLocation;

View File

@ -25,6 +25,7 @@
import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters; import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -40,6 +41,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -51,6 +53,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -72,6 +75,7 @@
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -153,9 +157,6 @@ public void setup() throws Exception {
registerSubclusters( registerSubclusters(
routers, namenodes.values(), Collections.singleton("ns1")); routers, namenodes.values(), Collections.singleton("ns1"));
LOG.info("Stop ns1 to simulate an unavailable subcluster");
namenodes.get("ns1").stop();
service = Executors.newFixedThreadPool(10); service = Executors.newFixedThreadPool(10);
} }
@ -209,6 +210,9 @@ private void updateMountPointFaultTolerant(final String mountPoint)
@Test @Test
public void testWriteWithFailedSubcluster() throws Exception { public void testWriteWithFailedSubcluster() throws Exception {
LOG.info("Stop ns1 to simulate an unavailable subcluster");
namenodes.get("ns1").stop();
// Run the actual tests with each approach // Run the actual tests with each approach
final List<Callable<Boolean>> tasks = new ArrayList<>(); final List<Callable<Boolean>> tasks = new ArrayList<>();
final List<DestinationOrder> orders = asList( final List<DestinationOrder> orders = asList(
@ -609,4 +613,63 @@ private FileSystem getRandomRouterFileSystem() throws Exception {
return userUgi.doAs( return userUgi.doAs(
(PrivilegedExceptionAction<FileSystem>) () -> getFileSystem(router)); (PrivilegedExceptionAction<FileSystem>) () -> getFileSystem(router));
} }
@Test
public void testReadWithFailedSubcluster() throws Exception {
DestinationOrder order = DestinationOrder.HASH_ALL;
final String mountPoint = "/" + order + "-testread";
final Path mountPath = new Path(mountPoint);
LOG.info("Setup {} with order {}", mountPoint, order);
createMountTableEntry(
routers, mountPoint, order, namenodes.keySet());
FileSystem fs = getRandomRouterFileSystem();
// Create a file (we don't write because we have no mock Datanodes)
final Path fileexisting = new Path(mountPath, "fileexisting");
final Path filenotexisting = new Path(mountPath, "filenotexisting");
FSDataOutputStream os = fs.create(fileexisting);
assertNotNull(os);
os.close();
// We should be able to read existing files
FSDataInputStream fsdis = fs.open(fileexisting);
assertNotNull("We should be able to read the file", fsdis);
// We shouldn't be able to read non-existing files
LambdaTestUtils.intercept(FileNotFoundException.class,
() -> fs.open(filenotexisting));
// Check the subcluster where the file got created
String nsIdWithFile = null;
for (Entry<String, MockNamenode> entry : namenodes.entrySet()) {
String nsId = entry.getKey();
MockNamenode nn = entry.getValue();
int rpc = nn.getRPCPort();
FileSystem nnfs = getFileSystem(rpc);
try {
FileStatus fileStatus = nnfs.getFileStatus(fileexisting);
assertNotNull(fileStatus);
assertNull("The file cannot be in two subclusters", nsIdWithFile);
nsIdWithFile = nsId;
} catch (FileNotFoundException fnfe) {
LOG.debug("File not found in {}", nsId);
}
}
assertNotNull("The file has to be in one subcluster", nsIdWithFile);
LOG.info("Stop {} to simulate an unavailable subcluster", nsIdWithFile);
namenodes.get(nsIdWithFile).stop();
// We should not get FileNotFoundException anymore
try {
fs.open(fileexisting);
fail("It should throw an unavailable cluster exception");
} catch(RemoteException re) {
IOException ioe = re.unwrapRemoteException();
assertTrue("Expected an unavailable exception for:" + ioe.getClass(),
RouterRpcClient.isUnavailableException(ioe));
}
}
} }