HDFS-15127. RBF: Do not allow writes when a subcluster is unavailable for HASH_ALL mount points. Contributed by Inigo Goiri

This commit is contained in:
Ayush Saxena 2020-02-12 19:41:04 +05:30
parent 749e45dfdb
commit 3df0adaaea
3 changed files with 32 additions and 21 deletions

View File

@ -1156,22 +1156,34 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
final List<RemoteResult<T, R>> results = invokeConcurrent( final List<RemoteResult<T, R>> results = invokeConcurrent(
locations, method, standby, timeOutMs, clazz); locations, method, standby, timeOutMs, clazz);
// Go over the results and exceptions
final Map<T, R> ret = new TreeMap<>(); final Map<T, R> ret = new TreeMap<>();
final List<IOException> thrownExceptions = new ArrayList<>();
IOException firstUnavailableException = null;
for (final RemoteResult<T, R> result : results) { for (final RemoteResult<T, R> result : results) {
// Response from all servers required, use this error. if (result.hasException()) {
if (requireResponse && result.hasException()) { IOException ioe = result.getException();
throw result.getException(); thrownExceptions.add(ioe);
// Track unavailable exceptions to throw them first
if (isUnavailableException(ioe)) {
firstUnavailableException = ioe;
}
} }
if (result.hasResult()) { if (result.hasResult()) {
ret.put(result.getLocation(), result.getResult()); ret.put(result.getLocation(), result.getResult());
} }
} }
// Throw the exception for the first location if there are no results // Throw exceptions if needed
if (ret.isEmpty()) { if (!thrownExceptions.isEmpty()) {
final RemoteResult<T, R> result = results.get(0); // Throw if response from all servers required or no results
if (result.hasException()) { if (requireResponse || ret.isEmpty()) {
throw result.getException(); // Throw unavailable exceptions first
if (firstUnavailableException != null) {
throw firstUnavailableException;
} else {
throw thrownExceptions.get(0);
}
} }
} }

View File

@ -654,12 +654,6 @@ RemoteLocation getCreateLocation(
} }
} catch (FileNotFoundException fne) { } catch (FileNotFoundException fne) {
// Ignore if the file is not found // Ignore if the file is not found
} catch (IOException ioe) {
if (RouterRpcClient.isUnavailableException(ioe)) {
LOG.debug("Ignore unavailable exception: {}", ioe);
} else {
throw ioe;
}
} }
} }
return createLocation; return createLocation;
@ -677,7 +671,7 @@ private RemoteLocation getExistingLocation(String src,
RemoteMethod method = new RemoteMethod("getFileInfo", RemoteMethod method = new RemoteMethod("getFileInfo",
new Class<?>[] {String.class}, new RemoteParam()); new Class<?>[] {String.class}, new RemoteParam());
Map<RemoteLocation, HdfsFileStatus> results = rpcClient.invokeConcurrent( Map<RemoteLocation, HdfsFileStatus> results = rpcClient.invokeConcurrent(
locations, method, false, false, HdfsFileStatus.class); locations, method, true, false, HdfsFileStatus.class);
for (RemoteLocation loc : locations) { for (RemoteLocation loc : locations) {
if (results.get(loc) != null) { if (results.get(loc) != null) {
return loc; return loc;

View File

@ -309,7 +309,7 @@ private void checkDirectoriesFaultTolerant(
tasks.add(getListFailTask(router0Fs, mountPoint)); tasks.add(getListFailTask(router0Fs, mountPoint));
int filesExpected = dirs0.length + results.getSuccess(); int filesExpected = dirs0.length + results.getSuccess();
tasks.add(getListSuccessTask(router1Fs, mountPoint, filesExpected)); tasks.add(getListSuccessTask(router1Fs, mountPoint, filesExpected));
results = collectResults("List " + mountPoint, tasks); results = collectResults("List " + mountPoint, tasks);
assertEquals("Failed listing", 2, results.getSuccess()); assertEquals("Failed listing", 2, results.getSuccess());
tasks.add(getContentSummaryFailTask(router0Fs, mountPoint)); tasks.add(getContentSummaryFailTask(router0Fs, mountPoint));
@ -344,17 +344,22 @@ private void checkFilesFaultTolerant(
TaskResults results = collectResults("Create file " + dir0, tasks); TaskResults results = collectResults("Create file " + dir0, tasks);
LOG.info("Check files results for {}: {}", dir0, results); LOG.info("Check files results for {}: {}", dir0, results);
if (faultTolerant || !DestinationOrder.FOLDER_ALL.contains(order)) { if (faultTolerant) {
assertEquals(NUM_FILES, results.getSuccess()); assertEquals("Not enough success in " + mountPoint,
assertEquals(0, results.getFailure()); NUM_FILES, results.getSuccess());
assertEquals("Nothing should fail in " + mountPoint, 0,
results.getFailure());
} else { } else {
assertBothResults("check files " + dir0, NUM_FILES, results); assertEquals("Nothing should succeed in " + mountPoint,
0, results.getSuccess());
assertEquals("Everything should fail in " + mountPoint,
NUM_FILES, results.getFailure());
} }
LOG.info("Check files listing for {}", dir0); LOG.info("Check files listing for {}", dir0);
tasks.add(getListFailTask(router0Fs, dir0)); tasks.add(getListFailTask(router0Fs, dir0));
tasks.add(getListSuccessTask(router1Fs, dir0, results.getSuccess())); tasks.add(getListSuccessTask(router1Fs, dir0, results.getSuccess()));
assertEquals(2, collectResults("List " + dir0, tasks).getSuccess()); assertEquals(2, collectResults("List " + dir0, tasks).getSuccess());
tasks.add(getContentSummaryFailTask(router0Fs, dir0)); tasks.add(getContentSummaryFailTask(router0Fs, dir0));
tasks.add(getContentSummarySuccessTask( tasks.add(getContentSummarySuccessTask(