diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsck.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsck.java index 169f1912de..1926d9ebd6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsck.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsck.java @@ -159,7 +159,9 @@ public class S3GuardFsck { // Create a handler and handle each violated pairs S3GuardFsckViolationHandler handler = new S3GuardFsckViolationHandler(rawFS, metadataStore); - comparePairs.forEach(handler::handle); + for (ComparePair comparePair : comparePairs) { + handler.logError(comparePair); + } LOG.info("Total scan time: {}s", stopwatch.now(TimeUnit.SECONDS)); LOG.info("Scanned entries: {}", scannedItems); @@ -344,6 +346,31 @@ public class S3GuardFsck { return rawFS.makeQualified(new Path(s)); } + /** + * Fix violations found during check. + * + * Currently only supports handling the following violation: + * - Violation.ORPHAN_DDB_ENTRY + * + * @param violations to be handled + * @throws IOException throws the error if there's any during handling + */ + public void fixViolations(List violations) throws IOException { + S3GuardFsckViolationHandler handler = + new S3GuardFsckViolationHandler(rawFS, metadataStore); + + for (ComparePair v : violations) { + if (v.getViolations().contains(Violation.ORPHAN_DDB_ENTRY)) { + try { + handler.doFix(v); + } catch (IOException e) { + LOG.error("Error during handling the violation: ", e); + throw e; + } + } + } + } + /** * A compare pair with the pair of metadata and the list of violations. */ @@ -542,7 +569,9 @@ public class S3GuardFsck { // Create a handler and handle each violated pairs S3GuardFsckViolationHandler handler = new S3GuardFsckViolationHandler(rawFS, metadataStore); - comparePairs.forEach(handler::handle); + for (ComparePair comparePair : comparePairs) { + handler.logError(comparePair); + } stopwatch.stop(); LOG.info("Total scan time: {}s", stopwatch.now(TimeUnit.SECONDS)); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsckViolationHandler.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsckViolationHandler.java index 25db966bd2..ec09b4af6b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsckViolationHandler.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsckViolationHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a.s3guard; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.List; @@ -26,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AFileSystem; @@ -43,6 +45,10 @@ public class S3GuardFsckViolationHandler { private static String newLine = System.getProperty("line.separator"); + public enum HandleMode { + FIX, LOG + } + public S3GuardFsckViolationHandler(S3AFileSystem fs, DynamoDBMetadataStore ddbms) { @@ -50,7 +56,7 @@ public class S3GuardFsckViolationHandler { this.rawFs = fs; } - public void handle(S3GuardFsck.ComparePair comparePair) { + public void logError(S3GuardFsck.ComparePair comparePair) throws IOException { if (!comparePair.containsViolation()) { LOG.debug("There is no violation in the compare pair: {}", comparePair); return; @@ -60,11 +66,26 @@ public class S3GuardFsckViolationHandler { sB.append(newLine) .append("On path: ").append(comparePair.getPath()).append(newLine); - handleComparePair(comparePair, sB); + handleComparePair(comparePair, sB, HandleMode.LOG); LOG.error(sB.toString()); } + public void doFix(S3GuardFsck.ComparePair comparePair) throws IOException { + if (!comparePair.containsViolation()) { + LOG.debug("There is no violation in the compare pair: {}", comparePair); + return; + } + + StringBuilder sB = new StringBuilder(); + sB.append(newLine) + .append("On path: ").append(comparePair.getPath()).append(newLine); + + handleComparePair(comparePair, sB, HandleMode.FIX); + + LOG.info(sB.toString()); + } + /** * Create a new instance of the violation handler for all the violations * found in the compare pair and use it. @@ -72,16 +93,28 @@ public class S3GuardFsckViolationHandler { * @param comparePair the compare pair with violations * @param sB StringBuilder to append error strings from violations. */ - protected static void handleComparePair(S3GuardFsck.ComparePair comparePair, - StringBuilder sB) { + protected void handleComparePair(S3GuardFsck.ComparePair comparePair, + StringBuilder sB, HandleMode handleMode) throws IOException { for (S3GuardFsck.Violation violation : comparePair.getViolations()) { try { ViolationHandler handler = violation.getHandler() .getDeclaredConstructor(S3GuardFsck.ComparePair.class) .newInstance(comparePair); - final String errorStr = handler.getError(); - sB.append(errorStr); + + switch (handleMode) { + case FIX: + final String errorStr = handler.getError(); + sB.append(errorStr); + break; + case LOG: + final String fixStr = handler.fixViolation(rawFs, metadataStore); + sB.append(fixStr); + break; + default: + throw new UnsupportedOperationException("Unknown handleMode: " + handleMode); + } + } catch (NoSuchMethodException e) { LOG.error("Can not find declared constructor for handler: {}", violation.getHandler()); @@ -137,6 +170,12 @@ public class S3GuardFsckViolationHandler { public DirListingMetadata getMsDirListing() { return msDirListing; } + + public String fixViolation(S3AFileSystem fs, + DynamoDBMetadataStore ddbms) throws IOException { + return String.format("Fixing of violation: %s is not supported yet.", + this.getClass().getSimpleName()); + } } /** @@ -357,6 +396,16 @@ public class S3GuardFsckViolationHandler { public String getError() { return "The DDB entry is orphan - there is no parent in the MS."; } + + @Override + public String fixViolation(S3AFileSystem fs, DynamoDBMetadataStore ddbms) + throws IOException { + final Path path = getPathMetadata().getFileStatus().getPath(); + ddbms.forgetMetadata(path); + return String.format( + "Fixing violation by removing metadata entry from the " + + "MS on path: %s", path); + } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java index d761163d98..6e89d0cd2d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java @@ -25,6 +25,7 @@ import java.io.PrintStream; import java.net.URI; import java.net.URISyntaxException; import java.nio.file.AccessDeniedException; +import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.HashMap; @@ -1609,6 +1610,7 @@ public abstract class S3GuardTool extends Configured implements Tool, static class Fsck extends S3GuardTool { public static final String CHECK_FLAG = "check"; public static final String DDB_MS_CONSISTENCY_FLAG = "internal"; + public static final String FIX_FLAG = "fix"; public static final String NAME = "fsck"; public static final String PURPOSE = "Compares S3 with MetadataStore, and " @@ -1618,12 +1620,17 @@ public abstract class S3GuardTool extends Configured implements Tool, "\t" + PURPOSE + "\n\n" + "Common options:\n" + " -" + CHECK_FLAG + " Check the metadata store for errors, but do " - + "not fix any issues.\n"+ + + "not fix any issues.\n" + " -" + DDB_MS_CONSISTENCY_FLAG + " Check the dynamodb metadata store " - + "for internal consistency.\n"; + + "for internal consistency.\n" + + " -" + FIX_FLAG + " Fix the errors found in the metadata store. Can " + + "be used with " + CHECK_FLAG + " or " + DDB_MS_CONSISTENCY_FLAG + " flags. " + + "\n\t\tFixes: \n" + + "\t\t\t- Remove orphan entries from DDB." + + "\n"; Fsck(Configuration conf) { - super(conf, CHECK_FLAG, DDB_MS_CONSISTENCY_FLAG); + super(conf, CHECK_FLAG, DDB_MS_CONSISTENCY_FLAG, FIX_FLAG); } @Override @@ -1648,17 +1655,19 @@ public abstract class S3GuardTool extends Configured implements Tool, final CommandFormat commandFormat = getCommandFormat(); // check if there's more than one arguments - int flags = 0; - if (commandFormat.getOpt(CHECK_FLAG)) { - flags++; - } - if (commandFormat.getOpt(DDB_MS_CONSISTENCY_FLAG)) { - flags++; - } + // from CHECK and INTERNAL CONSISTENCY + int flags = countTrue(commandFormat.getOpt(CHECK_FLAG), + commandFormat.getOpt(DDB_MS_CONSISTENCY_FLAG)); if (flags > 1) { out.println(USAGE); throw invalidArgs("There should be only one parameter used for checking."); } + if (flags == 0 && commandFormat.getOpt(FIX_FLAG)) { + errorln(FIX_FLAG + " flag can be used with either " + CHECK_FLAG + " or " + + DDB_MS_CONSISTENCY_FLAG + " flag, but not alone."); + errorln(USAGE); + return ERROR; + } String s3Path = paths.get(0); try { @@ -1707,6 +1716,11 @@ public abstract class S3GuardTool extends Configured implements Tool, return ERROR; } + if (commandFormat.getOpt(FIX_FLAG)) { + S3GuardFsck s3GuardFsck = new S3GuardFsck(fs, ms); + s3GuardFsck.fixViolations(violations); + } + out.flush(); // We fail if there were compare pairs, as the returned compare pairs @@ -1716,6 +1730,10 @@ public abstract class S3GuardTool extends Configured implements Tool, } return exitValue; } + + int countTrue(Boolean... bools) { + return (int) Arrays.stream(bools).filter(p -> p).count(); + } } /** * Audits a DynamoDB S3Guard repository for all the entries being diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md index 2ccf4b8c3b..e6481f941b 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md @@ -1165,7 +1165,7 @@ Compares S3 with MetadataStore, and returns a failure status if any rules or invariants are violated. Only works with DynamoDB metadata stores. ```bash -hadoop s3guard fsck [-check | -internal] (s3a://BUCKET | s3a://PATH_PREFIX) +hadoop s3guard fsck [-check | -internal] [-fix] (s3a://BUCKET | s3a://PATH_PREFIX) ``` `-check` operation checks the metadata store from the S3 perspective, but @@ -1175,6 +1175,12 @@ The consistency issues will be logged in ERROR loglevel. `-internal` operation checks the internal consistency of the metadata store, but does not fix any issues. +`-fix` operation fixes consistency issues between the metadatastore and the S3 +bucket. This parameter is optional, and can be used together with check or +internal parameters, but not alone. +The following fix is implemented: +- Remove orphan entries from DDB + The errors found will be logged at the ERROR log level. *Note*: `-check` and `-internal` operations can be used only as separate diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardFsck.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardFsck.java index 46bc30ddf6..97da925414 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardFsck.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardFsck.java @@ -24,6 +24,7 @@ import java.net.URI; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; import org.assertj.core.api.Assertions; import org.junit.Before; @@ -499,6 +500,19 @@ public class ITestS3GuardFsck extends AbstractS3ATestBase { assertComparePairsSize(comparePairs, 1); checkForViolationInPairs(file, comparePairs, S3GuardFsck.Violation.ORPHAN_DDB_ENTRY); + + // fix the violation + s3GuardFsck.fixViolations( + comparePairs.stream().filter(cP -> cP.getViolations() + .contains(S3GuardFsck.Violation.ORPHAN_DDB_ENTRY)) + .collect(Collectors.toList()) + ); + + // assert that the violation is fixed + final List fixedComparePairs = + s3GuardFsck.checkDdbInternalConsistency(cwd); + checkNoViolationInPairs(file, fixedComparePairs, + S3GuardFsck.Violation.ORPHAN_DDB_ENTRY); } finally { cleanup(file, cwd); } @@ -596,14 +610,27 @@ public class ITestS3GuardFsck extends AbstractS3ATestBase { .contains(violation); } - private void checkNoViolationInPairs(Path file2, + /** + * Check that there is no violation in the pair provided. + * + * @param file the path to filter to in the comparePairs list. + * @param comparePairs the list to validate. + * @param violation the violation that should not be in the list. + */ + private void checkNoViolationInPairs(Path file, List comparePairs, S3GuardFsck.Violation violation) { - final S3GuardFsck.ComparePair file2Pair = comparePairs.stream() - .filter(p -> p.getPath().equals(file2)) + + if (comparePairs.size() == 0) { + LOG.info("Compare pairs is empty, so there's no violation. (As expected.)"); + return; + } + + final S3GuardFsck.ComparePair comparePair = comparePairs.stream() + .filter(p -> p.getPath().equals(file)) .findFirst().get(); - assertNotNull("The pair should not be null.", file2Pair); - Assertions.assertThat(file2Pair.getViolations()) + assertNotNull("The pair should not be null.", comparePair); + Assertions.assertThat(comparePair.getViolations()) .describedAs("Violations in the pair") .doesNotContain(violation); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java index ba93927e8d..19784ede5e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java @@ -354,6 +354,40 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase { "s3a://" + getFileSystem().getBucket())); } + @Test + public void testCLIFsckDDbFixOnlyFails() throws Exception { + describe("This test serves the purpose to run fsck with the correct " + + "parameters, so there will be no exception thrown."); + final int result = run(S3GuardTool.Fsck.NAME, + "-" + Fsck.FIX_FLAG, + "s3a://" + getFileSystem().getBucket()); + LOG.info("The return value of the run: {}", result); + assertEquals(ERROR, result); + } + + /** + * Test that the fix flag is accepted by the fsck. + * + * Note that we don't have an assert at the end of this test because + * there maybe some errors found during the check and the returned value + * will be ERROR and not SUCCESS. So if we assert on SUCCESS, then the test + * could (likely) to be flaky. + * If the FIX_FLAG parameter is not accepted here an exception will be thrown + * so the test will break. + * + * @throws Exception + */ + @Test + public void testCLIFsckDDbFixAndInternalSucceed() throws Exception { + describe("This test serves the purpose to run fsck with the correct " + + "parameters, so there will be no exception thrown."); + final int result = run(S3GuardTool.Fsck.NAME, + "-" + Fsck.FIX_FLAG, + "-" + Fsck.DDB_MS_CONSISTENCY_FLAG, + "s3a://" + getFileSystem().getBucket()); + LOG.info("The return value of the run: {}", result); + } + /** * Test that when init, the CMK option can not live without SSE enabled. */