From 3df0adaaea485bcbd4ae1a04fe160f3148c14437 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 12 Feb 2020 19:41:04 +0530 Subject: [PATCH] HDFS-15127. RBF: Do not allow writes when a subcluster is unavailable for HASH_ALL mount points. Contributed by Inigo Goiri --- .../federation/router/RouterRpcClient.java | 28 +++++++++++++------ .../federation/router/RouterRpcServer.java | 8 +----- .../router/TestRouterFaultTolerant.java | 17 +++++++---- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 7003f96f04..dae4b93564 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1156,22 +1156,34 @@ public Map invokeConcurrent( final List> results = invokeConcurrent( locations, method, standby, timeOutMs, clazz); + // Go over the results and exceptions final Map ret = new TreeMap<>(); + final List thrownExceptions = new ArrayList<>(); + IOException firstUnavailableException = null; for (final RemoteResult result : results) { - // Response from all servers required, use this error. - if (requireResponse && result.hasException()) { - throw result.getException(); + if (result.hasException()) { + IOException ioe = result.getException(); + thrownExceptions.add(ioe); + // Track unavailable exceptions to throw them first + if (isUnavailableException(ioe)) { + firstUnavailableException = ioe; + } } if (result.hasResult()) { ret.put(result.getLocation(), result.getResult()); } } - // Throw the exception for the first location if there are no results - if (ret.isEmpty()) { - final RemoteResult result = results.get(0); - if (result.hasException()) { - throw result.getException(); + // Throw exceptions if needed + if (!thrownExceptions.isEmpty()) { + // Throw if response from all servers required or no results + if (requireResponse || ret.isEmpty()) { + // Throw unavailable exceptions first + if (firstUnavailableException != null) { + throw firstUnavailableException; + } else { + throw thrownExceptions.get(0); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 72e1ca6fe5..345ec705f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -654,12 +654,6 @@ RemoteLocation getCreateLocation( } } catch (FileNotFoundException fne) { // Ignore if the file is not found - } catch (IOException ioe) { - if (RouterRpcClient.isUnavailableException(ioe)) { - LOG.debug("Ignore unavailable exception: {}", ioe); - } else { - throw ioe; - } } } return createLocation; @@ -677,7 +671,7 @@ private RemoteLocation getExistingLocation(String src, RemoteMethod method = new RemoteMethod("getFileInfo", new Class[] {String.class}, new RemoteParam()); Map results = rpcClient.invokeConcurrent( - locations, method, false, false, HdfsFileStatus.class); + locations, method, true, false, HdfsFileStatus.class); for (RemoteLocation loc : locations) { if (results.get(loc) != null) { return loc; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java index 8907ce5b91..5e0e117523 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java @@ -309,7 +309,7 @@ private void checkDirectoriesFaultTolerant( tasks.add(getListFailTask(router0Fs, mountPoint)); int filesExpected = dirs0.length + results.getSuccess(); tasks.add(getListSuccessTask(router1Fs, mountPoint, filesExpected)); - results = collectResults("List " + mountPoint, tasks); + results = collectResults("List " + mountPoint, tasks); assertEquals("Failed listing", 2, results.getSuccess()); tasks.add(getContentSummaryFailTask(router0Fs, mountPoint)); @@ -344,17 +344,22 @@ private void checkFilesFaultTolerant( TaskResults results = collectResults("Create file " + dir0, tasks); LOG.info("Check files results for {}: {}", dir0, results); - if (faultTolerant || !DestinationOrder.FOLDER_ALL.contains(order)) { - assertEquals(NUM_FILES, results.getSuccess()); - assertEquals(0, results.getFailure()); + if (faultTolerant) { + assertEquals("Not enough success in " + mountPoint, + NUM_FILES, results.getSuccess()); + assertEquals("Nothing should fail in " + mountPoint, 0, + results.getFailure()); } else { - assertBothResults("check files " + dir0, NUM_FILES, results); + assertEquals("Nothing should succeed in " + mountPoint, + 0, results.getSuccess()); + assertEquals("Everything should fail in " + mountPoint, + NUM_FILES, results.getFailure()); } LOG.info("Check files listing for {}", dir0); tasks.add(getListFailTask(router0Fs, dir0)); tasks.add(getListSuccessTask(router1Fs, dir0, results.getSuccess())); - assertEquals(2, collectResults("List " + dir0, tasks).getSuccess()); + assertEquals(2, collectResults("List " + dir0, tasks).getSuccess()); tasks.add(getContentSummaryFailTask(router0Fs, dir0)); tasks.add(getContentSummarySuccessTask(