HADOOP-16423. S3Guard fsck: Check metadata consistency between S3 and metadatastore (log) (#1208). Contributed by Gabor Bota.
Change-Id: I6bbb331b6c0a41c61043e482b95504fda8a50596
This commit is contained in:
parent
44850f6784
commit
4e273a31f6
@ -1501,7 +1501,7 @@ public boolean hasMetadataStore() {
|
||||
* is set for this filesystem.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
boolean hasAuthoritativeMetadataStore() {
|
||||
public boolean hasAuthoritativeMetadataStore() {
|
||||
return hasMetadataStore() && allowAuthoritativeMetadataStore;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,483 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.s3guard;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.security.InvalidParameterException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import com.google.common.base.Stopwatch;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
|
||||
/**
|
||||
* Main class for the FSCK factored out from S3GuardTool
|
||||
* The implementation uses fixed DynamoDBMetadataStore as the backing store
|
||||
* for metadata.
|
||||
*
|
||||
* Functions:
|
||||
* <ul>
|
||||
* <li>Checking metadata consistency between S3 and metadatastore</li>
|
||||
* </ul>
|
||||
*/
|
||||
public class S3GuardFsck {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(S3GuardFsck.class);
|
||||
public static final String ROOT_PATH_STRING = "/";
|
||||
|
||||
private final S3AFileSystem rawFS;
|
||||
private final DynamoDBMetadataStore metadataStore;
|
||||
|
||||
private static final long MOD_TIME_RANGE = 2000L;
|
||||
|
||||
/**
|
||||
* Creates an S3GuardFsck.
|
||||
* @param fs the filesystem to compare to
|
||||
* @param ms metadatastore the metadatastore to compare with (dynamo)
|
||||
*/
|
||||
public S3GuardFsck(S3AFileSystem fs, MetadataStore ms)
|
||||
throws InvalidParameterException {
|
||||
this.rawFS = fs;
|
||||
|
||||
if (ms == null) {
|
||||
throw new InvalidParameterException("S3A Bucket " + fs.getBucket()
|
||||
+ " should be guarded by a "
|
||||
+ DynamoDBMetadataStore.class.getCanonicalName());
|
||||
}
|
||||
this.metadataStore = (DynamoDBMetadataStore) ms;
|
||||
|
||||
Preconditions.checkArgument(!rawFS.hasMetadataStore(),
|
||||
"Raw fs should not have a metadatastore.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares S3 to MS.
|
||||
* Iterative breadth first walk on the S3 structure from a given root.
|
||||
* Creates a list of pairs (metadata in S3 and in the MetadataStore) where
|
||||
* the consistency or any rule is violated.
|
||||
* Uses {@link S3GuardFsckViolationHandler} to handle violations.
|
||||
* The violations are listed in Enums: {@link Violation}
|
||||
*
|
||||
* @param p the root path to start the traversal
|
||||
* @return a list of {@link ComparePair}
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<ComparePair> compareS3ToMs(Path p) throws IOException {
|
||||
Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
int scannedItems = 0;
|
||||
|
||||
final Path rootPath = rawFS.qualify(p);
|
||||
S3AFileStatus root = (S3AFileStatus) rawFS.getFileStatus(rootPath);
|
||||
final List<ComparePair> comparePairs = new ArrayList<>();
|
||||
final Queue<S3AFileStatus> queue = new ArrayDeque<>();
|
||||
queue.add(root);
|
||||
|
||||
while (!queue.isEmpty()) {
|
||||
final S3AFileStatus currentDir = queue.poll();
|
||||
|
||||
|
||||
final Path currentDirPath = currentDir.getPath();
|
||||
try {
|
||||
List<FileStatus> s3DirListing = Arrays.asList(
|
||||
rawFS.listStatus(currentDirPath));
|
||||
|
||||
// Check authoritative directory flag.
|
||||
compareAuthoritativeDirectoryFlag(comparePairs, currentDirPath,
|
||||
s3DirListing);
|
||||
// Add all descendant directory to the queue
|
||||
s3DirListing.stream().filter(pm -> pm.isDirectory())
|
||||
.map(S3AFileStatus.class::cast)
|
||||
.forEach(pm -> queue.add(pm));
|
||||
|
||||
// Check file and directory metadata for consistency.
|
||||
final List<S3AFileStatus> children = s3DirListing.stream()
|
||||
.filter(status -> !status.isDirectory())
|
||||
.map(S3AFileStatus.class::cast).collect(toList());
|
||||
final List<ComparePair> compareResult =
|
||||
compareS3DirContentToMs(currentDir, children);
|
||||
comparePairs.addAll(compareResult);
|
||||
|
||||
// Increase the scanned file size.
|
||||
// One for the directory, one for the children.
|
||||
scannedItems++;
|
||||
scannedItems += children.size();
|
||||
} catch (FileNotFoundException e) {
|
||||
LOG.error("The path has been deleted since it was queued: "
|
||||
+ currentDirPath, e);
|
||||
}
|
||||
|
||||
}
|
||||
stopwatch.stop();
|
||||
|
||||
// Create a handler and handle each violated pairs
|
||||
S3GuardFsckViolationHandler handler =
|
||||
new S3GuardFsckViolationHandler(rawFS, metadataStore);
|
||||
comparePairs.forEach(handler::handle);
|
||||
|
||||
LOG.info("Total scan time: {}s", stopwatch.elapsed(TimeUnit.SECONDS));
|
||||
LOG.info("Scanned entries: {}", scannedItems);
|
||||
|
||||
return comparePairs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare the directory contents if the listing is authoritative.
|
||||
*
|
||||
* @param comparePairs the list of compare pairs to add to
|
||||
* if it contains a violation
|
||||
* @param currentDirPath the current directory path
|
||||
* @param s3DirListing the s3 directory listing to compare with
|
||||
* @throws IOException
|
||||
*/
|
||||
private void compareAuthoritativeDirectoryFlag(List<ComparePair> comparePairs,
|
||||
Path currentDirPath, List<FileStatus> s3DirListing) throws IOException {
|
||||
final DirListingMetadata msDirListing =
|
||||
metadataStore.listChildren(currentDirPath);
|
||||
if (msDirListing != null && msDirListing.isAuthoritative()) {
|
||||
ComparePair cP = new ComparePair(s3DirListing, msDirListing);
|
||||
|
||||
if (s3DirListing.size() != msDirListing.numEntries()) {
|
||||
cP.violations.add(Violation.AUTHORITATIVE_DIRECTORY_CONTENT_MISMATCH);
|
||||
} else {
|
||||
final Set<Path> msPaths = msDirListing.getListing().stream()
|
||||
.map(pm -> pm.getFileStatus().getPath()).collect(toSet());
|
||||
final Set<Path> s3Paths = s3DirListing.stream()
|
||||
.map(pm -> pm.getPath()).collect(toSet());
|
||||
if (!s3Paths.equals(msPaths)) {
|
||||
cP.violations.add(Violation.AUTHORITATIVE_DIRECTORY_CONTENT_MISMATCH);
|
||||
}
|
||||
}
|
||||
|
||||
if (cP.containsViolation()) {
|
||||
comparePairs.add(cP);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares S3 directory content to the metadata store.
|
||||
*
|
||||
* @param s3CurrentDir file status of the current directory
|
||||
* @param children the contents of the directory
|
||||
* @return the compare pairs with violations of consistency
|
||||
* @throws IOException
|
||||
*/
|
||||
protected List<ComparePair> compareS3DirContentToMs(
|
||||
S3AFileStatus s3CurrentDir,
|
||||
List<S3AFileStatus> children) throws IOException {
|
||||
final Path path = s3CurrentDir.getPath();
|
||||
final PathMetadata pathMetadata = metadataStore.get(path);
|
||||
List<ComparePair> violationComparePairs = new ArrayList<>();
|
||||
|
||||
final ComparePair rootComparePair =
|
||||
compareFileStatusToPathMetadata(s3CurrentDir, pathMetadata);
|
||||
if (rootComparePair.containsViolation()) {
|
||||
violationComparePairs.add(rootComparePair);
|
||||
}
|
||||
|
||||
children.forEach(s3ChildMeta -> {
|
||||
try {
|
||||
final PathMetadata msChildMeta =
|
||||
metadataStore.get(s3ChildMeta.getPath());
|
||||
final ComparePair comparePair =
|
||||
compareFileStatusToPathMetadata(s3ChildMeta, msChildMeta);
|
||||
if (comparePair.containsViolation()) {
|
||||
violationComparePairs.add(comparePair);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error(e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
|
||||
return violationComparePairs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares a {@link S3AFileStatus} from S3 to a {@link PathMetadata}
|
||||
* from the metadata store. Finds violated invariants and consistency
|
||||
* issues.
|
||||
*
|
||||
* @param s3FileStatus the file status from S3
|
||||
* @param msPathMetadata the path metadata from metadatastore
|
||||
* @return {@link ComparePair} with the found issues
|
||||
* @throws IOException
|
||||
*/
|
||||
protected ComparePair compareFileStatusToPathMetadata(
|
||||
S3AFileStatus s3FileStatus,
|
||||
PathMetadata msPathMetadata) throws IOException {
|
||||
final Path path = s3FileStatus.getPath();
|
||||
|
||||
if (msPathMetadata != null) {
|
||||
LOG.info("Path: {} - Length S3: {}, MS: {} " +
|
||||
"- Etag S3: {}, MS: {} ",
|
||||
path,
|
||||
s3FileStatus.getLen(), msPathMetadata.getFileStatus().getLen(),
|
||||
s3FileStatus.getETag(), msPathMetadata.getFileStatus().getETag());
|
||||
} else {
|
||||
LOG.info("Path: {} - Length S3: {} - Etag S3: {}, no record in MS.",
|
||||
path, s3FileStatus.getLen(), s3FileStatus.getETag());
|
||||
}
|
||||
|
||||
ComparePair comparePair = new ComparePair(s3FileStatus, msPathMetadata);
|
||||
|
||||
if (!path.equals(path(ROOT_PATH_STRING))) {
|
||||
final Path parentPath = path.getParent();
|
||||
final PathMetadata parentPm = metadataStore.get(parentPath);
|
||||
|
||||
if (parentPm == null) {
|
||||
comparePair.violations.add(Violation.NO_PARENT_ENTRY);
|
||||
} else {
|
||||
if (!parentPm.getFileStatus().isDirectory()) {
|
||||
comparePair.violations.add(Violation.PARENT_IS_A_FILE);
|
||||
}
|
||||
if (parentPm.isDeleted()) {
|
||||
comparePair.violations.add(Violation.PARENT_TOMBSTONED);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.debug("Entry is in the root directory, so there's no parent");
|
||||
}
|
||||
|
||||
// If the msPathMetadata is null, we RETURN because
|
||||
// there is no metadata compare with
|
||||
if (msPathMetadata == null) {
|
||||
comparePair.violations.add(Violation.NO_METADATA_ENTRY);
|
||||
return comparePair;
|
||||
}
|
||||
|
||||
final S3AFileStatus msFileStatus = msPathMetadata.getFileStatus();
|
||||
if (s3FileStatus.isDirectory() && !msFileStatus.isDirectory()) {
|
||||
comparePair.violations.add(Violation.DIR_IN_S3_FILE_IN_MS);
|
||||
}
|
||||
if (!s3FileStatus.isDirectory() && msFileStatus.isDirectory()) {
|
||||
comparePair.violations.add(Violation.FILE_IN_S3_DIR_IN_MS);
|
||||
}
|
||||
|
||||
if(msPathMetadata.isDeleted()) {
|
||||
comparePair.violations.add(Violation.TOMBSTONED_IN_MS_NOT_DELETED_IN_S3);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attribute check
|
||||
*/
|
||||
if (s3FileStatus.getLen() != msFileStatus.getLen()) {
|
||||
comparePair.violations.add(Violation.LENGTH_MISMATCH);
|
||||
}
|
||||
|
||||
// ModTime should be in the accuracy range defined.
|
||||
long modTimeDiff = Math.abs(
|
||||
s3FileStatus.getModificationTime() - msFileStatus.getModificationTime()
|
||||
);
|
||||
if (modTimeDiff > MOD_TIME_RANGE) {
|
||||
comparePair.violations.add(Violation.MOD_TIME_MISMATCH);
|
||||
}
|
||||
|
||||
if(msPathMetadata.getFileStatus().getVersionId() == null
|
||||
|| s3FileStatus.getVersionId() == null ) {
|
||||
LOG.debug("Missing versionIDs skipped. A HEAD request is "
|
||||
+ "required for each object to get the versionID.");
|
||||
} else if(!s3FileStatus.getVersionId().equals(msFileStatus.getVersionId())) {
|
||||
comparePair.violations.add(Violation.VERSIONID_MISMATCH);
|
||||
}
|
||||
|
||||
// check etag only for files, and not directories
|
||||
if (!s3FileStatus.isDirectory()) {
|
||||
if (msPathMetadata.getFileStatus().getETag() == null) {
|
||||
comparePair.violations.add(Violation.NO_ETAG);
|
||||
} else if (s3FileStatus.getETag() != null &&
|
||||
!s3FileStatus.getETag().equals(msFileStatus.getETag())) {
|
||||
comparePair.violations.add(Violation.ETAG_MISMATCH);
|
||||
}
|
||||
}
|
||||
|
||||
return comparePair;
|
||||
}
|
||||
|
||||
private Path path(String s) {
|
||||
return rawFS.makeQualified(new Path(s));
|
||||
}
|
||||
|
||||
/**
|
||||
* A compare pair with the pair of metadata and the list of violations.
|
||||
*/
|
||||
public static class ComparePair {
|
||||
private final S3AFileStatus s3FileStatus;
|
||||
private final PathMetadata msPathMetadata;
|
||||
|
||||
private final List<FileStatus> s3DirListing;
|
||||
private final DirListingMetadata msDirListing;
|
||||
|
||||
private final Path path;
|
||||
|
||||
private final Set<Violation> violations = new HashSet<>();
|
||||
|
||||
ComparePair(S3AFileStatus status, PathMetadata pm) {
|
||||
this.s3FileStatus = status;
|
||||
this.msPathMetadata = pm;
|
||||
this.s3DirListing = null;
|
||||
this.msDirListing = null;
|
||||
this.path = status.getPath();
|
||||
}
|
||||
|
||||
ComparePair(List<FileStatus> s3DirListing, DirListingMetadata msDirListing) {
|
||||
this.s3DirListing = s3DirListing;
|
||||
this.msDirListing = msDirListing;
|
||||
this.s3FileStatus = null;
|
||||
this.msPathMetadata = null;
|
||||
this.path = msDirListing.getPath();
|
||||
}
|
||||
|
||||
public S3AFileStatus getS3FileStatus() {
|
||||
return s3FileStatus;
|
||||
}
|
||||
|
||||
public PathMetadata getMsPathMetadata() {
|
||||
return msPathMetadata;
|
||||
}
|
||||
|
||||
public Set<Violation> getViolations() {
|
||||
return violations;
|
||||
}
|
||||
|
||||
public boolean containsViolation() {
|
||||
return !violations.isEmpty();
|
||||
}
|
||||
|
||||
public DirListingMetadata getMsDirListing() {
|
||||
return msDirListing;
|
||||
}
|
||||
|
||||
public List<FileStatus> getS3DirListing() {
|
||||
return s3DirListing;
|
||||
}
|
||||
|
||||
public Path getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return "ComparePair{" + "s3FileStatus=" + s3FileStatus
|
||||
+ ", msPathMetadata=" + msPathMetadata + ", s3DirListing=" +
|
||||
s3DirListing + ", msDirListing=" + msDirListing + ", path="
|
||||
+ path + ", violations=" + violations + '}';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Violation with severity and the handler.
|
||||
* Defines the severity of the violation between 0-2
|
||||
* where 0 is the most severe and 2 is the least severe.
|
||||
*/
|
||||
public enum Violation {
|
||||
/**
|
||||
* No entry in metadatastore.
|
||||
*/
|
||||
NO_METADATA_ENTRY(1,
|
||||
S3GuardFsckViolationHandler.NoMetadataEntry.class),
|
||||
/**
|
||||
* A file or directory entry does not have a parent entry - excluding
|
||||
* files and directories in the root.
|
||||
*/
|
||||
NO_PARENT_ENTRY(0,
|
||||
S3GuardFsckViolationHandler.NoParentEntry.class),
|
||||
/**
|
||||
* An entry’s parent is a file.
|
||||
*/
|
||||
PARENT_IS_A_FILE(0,
|
||||
S3GuardFsckViolationHandler.ParentIsAFile.class),
|
||||
/**
|
||||
* A file exists under a path for which there is a
|
||||
* tombstone entry in the MS.
|
||||
*/
|
||||
PARENT_TOMBSTONED(0,
|
||||
S3GuardFsckViolationHandler.ParentTombstoned.class),
|
||||
/**
|
||||
* A directory in S3 is a file entry in the MS.
|
||||
*/
|
||||
DIR_IN_S3_FILE_IN_MS(0,
|
||||
S3GuardFsckViolationHandler.DirInS3FileInMs.class),
|
||||
/**
|
||||
* A file in S3 is a directory in the MS.
|
||||
*/
|
||||
FILE_IN_S3_DIR_IN_MS(0,
|
||||
S3GuardFsckViolationHandler.FileInS3DirInMs.class),
|
||||
AUTHORITATIVE_DIRECTORY_CONTENT_MISMATCH(1,
|
||||
S3GuardFsckViolationHandler.AuthDirContentMismatch.class),
|
||||
/**
|
||||
* An entry in the MS is tombstoned, but the object is not deleted on S3
|
||||
*/
|
||||
TOMBSTONED_IN_MS_NOT_DELETED_IN_S3(0,
|
||||
S3GuardFsckViolationHandler.TombstonedInMsNotDeletedInS3.class),
|
||||
/**
|
||||
* Attribute mismatch.
|
||||
*/
|
||||
LENGTH_MISMATCH(0,
|
||||
S3GuardFsckViolationHandler.LengthMismatch.class),
|
||||
MOD_TIME_MISMATCH(2,
|
||||
S3GuardFsckViolationHandler.ModTimeMismatch.class),
|
||||
/**
|
||||
* If there's a versionID the mismatch is severe.
|
||||
*/
|
||||
VERSIONID_MISMATCH(0,
|
||||
S3GuardFsckViolationHandler.VersionIdMismatch.class),
|
||||
/**
|
||||
* If there's an etag the mismatch is severe.
|
||||
*/
|
||||
ETAG_MISMATCH(0,
|
||||
S3GuardFsckViolationHandler.EtagMismatch.class),
|
||||
/**
|
||||
* Don't worry too much if we don't have an etag.
|
||||
*/
|
||||
NO_ETAG(2,
|
||||
S3GuardFsckViolationHandler.NoEtag.class);
|
||||
|
||||
private final int severity;
|
||||
private final Class<? extends S3GuardFsckViolationHandler.ViolationHandler> handler;
|
||||
|
||||
Violation(int s,
|
||||
Class<? extends S3GuardFsckViolationHandler.ViolationHandler> h) {
|
||||
this.severity = s;
|
||||
this.handler = h;
|
||||
}
|
||||
|
||||
public int getSeverity() {
|
||||
return severity;
|
||||
}
|
||||
|
||||
public Class<? extends S3GuardFsckViolationHandler.ViolationHandler> getHandler() {
|
||||
return handler;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,346 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.s3guard;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
|
||||
/**
|
||||
* Violation handler for the S3Guard's fsck.
|
||||
*/
|
||||
public class S3GuardFsckViolationHandler {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
S3GuardFsckViolationHandler.class);
|
||||
|
||||
// The rawFS and metadataStore are here to prepare when the ViolationHandlers
|
||||
// will not just log, but fix the violations, so they will have access.
|
||||
private final S3AFileSystem rawFs;
|
||||
private final DynamoDBMetadataStore metadataStore;
|
||||
|
||||
private static String newLine = System.getProperty("line.separator");
|
||||
|
||||
public S3GuardFsckViolationHandler(S3AFileSystem fs,
|
||||
DynamoDBMetadataStore ddbms) {
|
||||
|
||||
this.metadataStore = ddbms;
|
||||
this.rawFs = fs;
|
||||
}
|
||||
|
||||
public void handle(S3GuardFsck.ComparePair comparePair) {
|
||||
if (!comparePair.containsViolation()) {
|
||||
LOG.debug("There is no violation in the compare pair: {}", comparePair);
|
||||
return;
|
||||
}
|
||||
|
||||
StringBuilder sB = new StringBuilder();
|
||||
sB.append(newLine)
|
||||
.append("On path: ").append(comparePair.getPath()).append(newLine);
|
||||
|
||||
handleComparePair(comparePair, sB);
|
||||
|
||||
LOG.error(sB.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance of the violation handler for all the violations
|
||||
* found in the compare pair and use it.
|
||||
*
|
||||
* @param comparePair the compare pair with violations
|
||||
* @param sB StringBuilder to append error strings from violations.
|
||||
*/
|
||||
protected static void handleComparePair(S3GuardFsck.ComparePair comparePair,
|
||||
StringBuilder sB) {
|
||||
|
||||
for (S3GuardFsck.Violation violation : comparePair.getViolations()) {
|
||||
try {
|
||||
ViolationHandler handler = violation.getHandler()
|
||||
.getDeclaredConstructor(S3GuardFsck.ComparePair.class)
|
||||
.newInstance(comparePair);
|
||||
final String errorStr = handler.getError();
|
||||
sB.append(errorStr);
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.error("Can not find declared constructor for handler: {}",
|
||||
violation.getHandler());
|
||||
} catch (IllegalAccessException | InstantiationException | InvocationTargetException e) {
|
||||
LOG.error("Can not instantiate handler: {}",
|
||||
violation.getHandler());
|
||||
}
|
||||
sB.append(newLine);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Violation handler abstract class.
|
||||
* This class should be extended for violation handlers.
|
||||
*/
|
||||
public static abstract class ViolationHandler {
|
||||
private final PathMetadata pathMetadata;
|
||||
private final S3AFileStatus s3FileStatus;
|
||||
private final S3AFileStatus msFileStatus;
|
||||
private final List<FileStatus> s3DirListing;
|
||||
private final DirListingMetadata msDirListing;
|
||||
|
||||
public ViolationHandler(S3GuardFsck.ComparePair comparePair) {
|
||||
pathMetadata = comparePair.getMsPathMetadata();
|
||||
s3FileStatus = comparePair.getS3FileStatus();
|
||||
if (pathMetadata != null) {
|
||||
msFileStatus = pathMetadata.getFileStatus();
|
||||
} else {
|
||||
msFileStatus = null;
|
||||
}
|
||||
s3DirListing = comparePair.getS3DirListing();
|
||||
msDirListing = comparePair.getMsDirListing();
|
||||
}
|
||||
|
||||
public abstract String getError();
|
||||
|
||||
public PathMetadata getPathMetadata() {
|
||||
return pathMetadata;
|
||||
}
|
||||
|
||||
public S3AFileStatus getS3FileStatus() {
|
||||
return s3FileStatus;
|
||||
}
|
||||
|
||||
public S3AFileStatus getMsFileStatus() {
|
||||
return msFileStatus;
|
||||
}
|
||||
|
||||
public List<FileStatus> getS3DirListing() {
|
||||
return s3DirListing;
|
||||
}
|
||||
|
||||
public DirListingMetadata getMsDirListing() {
|
||||
return msDirListing;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when there's no matching metadata entry in the MS.
|
||||
*/
|
||||
public static class NoMetadataEntry extends ViolationHandler {
|
||||
|
||||
public NoMetadataEntry(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getError() {
|
||||
return "No PathMetadata for this path in the MS.";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when there's no parent entry.
|
||||
*/
|
||||
public static class NoParentEntry extends ViolationHandler {
|
||||
|
||||
public NoParentEntry(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getError() {
|
||||
return "Entry does not have a parent entry (not root)";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when the parent of an entry is a file.
|
||||
*/
|
||||
public static class ParentIsAFile extends ViolationHandler {
|
||||
|
||||
public ParentIsAFile(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getError() {
|
||||
return "The entry's parent in the metastore database is a file.";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when the parent of an entry is tombstoned.
|
||||
*/
|
||||
public static class ParentTombstoned extends ViolationHandler {
|
||||
|
||||
public ParentTombstoned(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getError() {
|
||||
return "The entry in the metastore database has a parent entry " +
|
||||
"which is a tombstone marker";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when there's a directory is a file metadata in MS.
|
||||
*/
|
||||
public static class DirInS3FileInMs extends ViolationHandler {
|
||||
|
||||
public DirInS3FileInMs(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getError() {
|
||||
return "A directory in S3 is a file entry in the MS";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when a file metadata is a directory in MS.
|
||||
*/
|
||||
public static class FileInS3DirInMs extends ViolationHandler {
|
||||
|
||||
public FileInS3DirInMs(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getError() {
|
||||
return "A file in S3 is a directory entry in the MS";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when there's a directory listing content mismatch.
|
||||
*/
|
||||
public static class AuthDirContentMismatch extends ViolationHandler {
|
||||
|
||||
public AuthDirContentMismatch(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getError() {
|
||||
final String str = String.format(
|
||||
"The content of an authoritative directory listing does "
|
||||
+ "not match the content of the S3 listing. S3: %s, MS: %s",
|
||||
Arrays.asList(getS3DirListing()), getMsDirListing().getListing());
|
||||
return str;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when there's a length mismatch.
|
||||
*/
|
||||
public static class LengthMismatch extends ViolationHandler {
|
||||
|
||||
public LengthMismatch(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override public String getError() {
|
||||
return String.format("File length mismatch - S3: %s, MS: %s",
|
||||
getS3FileStatus().getLen(), getMsFileStatus().getLen());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when there's a modtime mismatch.
|
||||
*/
|
||||
public static class ModTimeMismatch extends ViolationHandler {
|
||||
|
||||
public ModTimeMismatch(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getError() {
|
||||
return String.format("File timestamp mismatch - S3: %s, MS: %s",
|
||||
getS3FileStatus().getModificationTime(),
|
||||
getMsFileStatus().getModificationTime());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when there's a version id mismatch.
|
||||
*/
|
||||
public static class VersionIdMismatch extends ViolationHandler {
|
||||
|
||||
public VersionIdMismatch(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getError() {
|
||||
return String.format("getVersionId mismatch - S3: %s, MS: %s",
|
||||
getS3FileStatus().getVersionId(), getMsFileStatus().getVersionId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when there's an etag mismatch.
|
||||
*/
|
||||
public static class EtagMismatch extends ViolationHandler {
|
||||
|
||||
public EtagMismatch(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getError() {
|
||||
return String.format("Etag mismatch - S3: %s, MS: %s",
|
||||
getS3FileStatus().getETag(), getMsFileStatus().getETag());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when there's no etag.
|
||||
*/
|
||||
public static class NoEtag extends ViolationHandler {
|
||||
|
||||
public NoEtag(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getError() {
|
||||
return "No etag.";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The violation handler when there's a tombstoned entry in the ms is
|
||||
* present, but the object is not deleted in S3.
|
||||
*/
|
||||
public static class TombstonedInMsNotDeletedInS3 extends ViolationHandler {
|
||||
|
||||
public TombstonedInMsNotDeletedInS3(S3GuardFsck.ComparePair comparePair) {
|
||||
super(comparePair);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getError() {
|
||||
return "The entry for the path is tombstoned in the MS.";
|
||||
}
|
||||
}
|
||||
}
|
@ -94,7 +94,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||
"\t" + Diff.NAME + " - " + Diff.PURPOSE + "\n" +
|
||||
"\t" + Prune.NAME + " - " + Prune.PURPOSE + "\n" +
|
||||
"\t" + SetCapacity.NAME + " - " + SetCapacity.PURPOSE + "\n" +
|
||||
"\t" + SelectTool.NAME + " - " + SelectTool.PURPOSE + "\n";
|
||||
"\t" + SelectTool.NAME + " - " + SelectTool.PURPOSE + "\n" +
|
||||
"\t" + Fsck.NAME + " - " + Fsck.PURPOSE + "\n";
|
||||
private static final String DATA_IN_S3_IS_PRESERVED
|
||||
= "(all data in S3 is preserved)";
|
||||
|
||||
@ -1485,6 +1486,97 @@ private void vprintln(PrintStream out, String format, Object...
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fsck - check for consistency between S3 and the metadatastore.
|
||||
*/
|
||||
static class Fsck extends S3GuardTool {
|
||||
public static final String CHECK_FLAG = "check";
|
||||
|
||||
public static final String NAME = "fsck";
|
||||
public static final String PURPOSE = "Compares S3 with MetadataStore, and "
|
||||
+ "returns a failure status if any rules or invariants are violated. "
|
||||
+ "Only works with DynamoDB metadata stores.";
|
||||
private static final String USAGE = NAME + " [OPTIONS] [s3a://BUCKET]\n" +
|
||||
"\t" + PURPOSE + "\n\n" +
|
||||
"Common options:\n" +
|
||||
" -" + CHECK_FLAG + " Check the metadata store for errors, but do "
|
||||
+ "not fix any issues.\n";
|
||||
|
||||
Fsck(Configuration conf) {
|
||||
super(conf, CHECK_FLAG);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUsage() {
|
||||
return USAGE;
|
||||
}
|
||||
|
||||
public int run(String[] args, PrintStream out) throws
|
||||
InterruptedException, IOException {
|
||||
List<String> paths = parseArgs(args);
|
||||
if (paths.isEmpty()) {
|
||||
out.println(USAGE);
|
||||
throw invalidArgs("no arguments");
|
||||
}
|
||||
int exitValue = EXIT_SUCCESS;
|
||||
|
||||
String s3Path = paths.get(0);
|
||||
try {
|
||||
initS3AFileSystem(s3Path);
|
||||
} catch (Exception e) {
|
||||
errorln("Failed to initialize S3AFileSystem from path: " + s3Path);
|
||||
throw e;
|
||||
}
|
||||
|
||||
URI uri = toUri(s3Path);
|
||||
Path root;
|
||||
if (uri.getPath().isEmpty()) {
|
||||
root = new Path("/");
|
||||
} else {
|
||||
root = new Path(uri.getPath());
|
||||
}
|
||||
|
||||
final S3AFileSystem fs = getFilesystem();
|
||||
initMetadataStore(false);
|
||||
final MetadataStore ms = getStore();
|
||||
|
||||
if (ms == null ||
|
||||
!(ms instanceof DynamoDBMetadataStore)) {
|
||||
errorln(s3Path + " path uses MS: " + ms);
|
||||
errorln(NAME + " can be only used with a DynamoDB backed s3a bucket.");
|
||||
errorln(USAGE);
|
||||
return ERROR;
|
||||
}
|
||||
|
||||
final CommandFormat commandFormat = getCommandFormat();
|
||||
if (commandFormat.getOpt(CHECK_FLAG)) {
|
||||
// do the check
|
||||
S3GuardFsck s3GuardFsck = new S3GuardFsck(fs, ms);
|
||||
try {
|
||||
final List<S3GuardFsck.ComparePair> comparePairs
|
||||
= s3GuardFsck.compareS3ToMs(fs.qualify(root));
|
||||
if (comparePairs.size() > 0) {
|
||||
exitValue = EXIT_FAIL;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
errorln("No supported operation is selected.");
|
||||
errorln(USAGE);
|
||||
return ERROR;
|
||||
}
|
||||
|
||||
out.flush();
|
||||
return exitValue;
|
||||
}
|
||||
}
|
||||
|
||||
private static S3GuardTool command;
|
||||
|
||||
/**
|
||||
@ -1664,6 +1756,9 @@ public static int run(Configuration conf, String...args) throws
|
||||
// because this is the defacto S3 CLI.
|
||||
command = new SelectTool(conf);
|
||||
break;
|
||||
case Fsck.NAME:
|
||||
command = new Fsck(conf);
|
||||
break;
|
||||
default:
|
||||
printHelp();
|
||||
throw new ExitUtil.ExitException(E_USAGE,
|
||||
|
@ -61,6 +61,11 @@
|
||||
import static org.apache.hadoop.fs.s3a.Constants.RETRY_INTERVAL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.PROBE_INTERVAL_MILLIS;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.STABILIZATION_TIME;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.TIMESTAMP_SLEEP;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.awaitDeletedFileDisappearance;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.awaitFileStatus;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingContainsPath;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingDoesNotContainPath;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit;
|
||||
@ -115,12 +120,6 @@
|
||||
@RunWith(Parameterized.class)
|
||||
public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
|
||||
|
||||
public static final int TIMESTAMP_SLEEP = 2000;
|
||||
|
||||
public static final int STABILIZATION_TIME = 20_000;
|
||||
|
||||
public static final int PROBE_INTERVAL_MILLIS = 2500;
|
||||
|
||||
private S3AFileSystem guardedFs;
|
||||
private S3AFileSystem rawFS;
|
||||
|
||||
|
@ -55,6 +55,7 @@
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
@ -72,6 +73,7 @@
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
|
||||
import static org.junit.Assert.*;
|
||||
@ -92,6 +94,10 @@ public final class S3ATestUtils {
|
||||
public static final String UNSET_PROPERTY = "unset";
|
||||
public static final int PURGE_DELAY_SECONDS = 60 * 60;
|
||||
|
||||
public static final int TIMESTAMP_SLEEP = 2000;
|
||||
public static final int STABILIZATION_TIME = 20_000;
|
||||
public static final int PROBE_INTERVAL_MILLIS = 500;
|
||||
|
||||
/** Add any deprecated keys. */
|
||||
@SuppressWarnings("deprecation")
|
||||
private static void addDeprecatedKeys() {
|
||||
@ -1309,4 +1315,32 @@ public static void checkListingContainsPath(S3AFileSystem fs, Path filePath)
|
||||
listStatusHasIt);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a deleted file to no longer be visible.
|
||||
* @param fs filesystem
|
||||
* @param testFilePath path to query
|
||||
* @throws Exception failure
|
||||
*/
|
||||
public static void awaitDeletedFileDisappearance(final S3AFileSystem fs,
|
||||
final Path testFilePath) throws Exception {
|
||||
eventually(
|
||||
STABILIZATION_TIME, PROBE_INTERVAL_MILLIS,
|
||||
() -> intercept(FileNotFoundException.class,
|
||||
() -> fs.getFileStatus(testFilePath)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a file to be visible.
|
||||
* @param fs filesystem
|
||||
* @param testFilePath path to query
|
||||
* @return the file status.
|
||||
* @throws Exception failure
|
||||
*/
|
||||
public static S3AFileStatus awaitFileStatus(S3AFileSystem fs,
|
||||
final Path testFilePath)
|
||||
throws Exception {
|
||||
return (S3AFileStatus) eventually(
|
||||
STABILIZATION_TIME, PROBE_INTERVAL_MILLIS,
|
||||
() -> fs.getFileStatus(testFilePath));
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,504 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.s3guard;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.AUTHORITATIVE_PATH;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.awaitFileStatus;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||
|
||||
/**
|
||||
* Integration tests for the S3Guard Fsck against a dyamodb backed metadata
|
||||
* store.
|
||||
*/
|
||||
public class ITestS3GuardFsck extends AbstractS3ATestBase {
|
||||
|
||||
private S3AFileSystem guardedFs;
|
||||
private S3AFileSystem rawFs;
|
||||
|
||||
private MetadataStore metadataStore;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
// These test will fail if no ms
|
||||
assumeTrue("FS needs to have a metadatastore.",
|
||||
fs.hasMetadataStore());
|
||||
assumeTrue("Metadatastore should persist authoritative bit",
|
||||
metadataStorePersistsAuthoritativeBit(fs.getMetadataStore()));
|
||||
|
||||
guardedFs = fs;
|
||||
metadataStore = fs.getMetadataStore();
|
||||
|
||||
// create raw fs without s3guard
|
||||
rawFs = createUnguardedFS();
|
||||
assertFalse("Raw FS still has S3Guard " + rawFs,
|
||||
rawFs.hasMetadataStore());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void teardown() throws Exception {
|
||||
if (guardedFs != null) {
|
||||
IOUtils.cleanupWithLogger(LOG, guardedFs);
|
||||
}
|
||||
IOUtils.cleanupWithLogger(LOG, rawFs);
|
||||
super.teardown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a test filesystem which is always unguarded.
|
||||
* This filesystem MUST be closed in test teardown.
|
||||
* @return the new FS
|
||||
*/
|
||||
private S3AFileSystem createUnguardedFS() throws Exception {
|
||||
S3AFileSystem testFS = getFileSystem();
|
||||
Configuration config = new Configuration(testFS.getConf());
|
||||
URI uri = testFS.getUri();
|
||||
|
||||
removeBaseAndBucketOverrides(uri.getHost(), config,
|
||||
S3_METADATA_STORE_IMPL, METADATASTORE_AUTHORITATIVE,
|
||||
AUTHORITATIVE_PATH);
|
||||
S3AFileSystem fs2 = new S3AFileSystem();
|
||||
fs2.initialize(uri, config);
|
||||
return fs2;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIDetectNoMetadataEntry() throws Exception {
|
||||
final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
final Path file = new Path(cwd, "file");
|
||||
try {
|
||||
touchRawAndWaitRaw(file);
|
||||
|
||||
final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
|
||||
final List<S3GuardFsck.ComparePair> comparePairs =
|
||||
s3GuardFsck.compareS3ToMs(cwd);
|
||||
|
||||
assertComparePairsSize(comparePairs, 2);
|
||||
final S3GuardFsck.ComparePair pair = comparePairs.get(0);
|
||||
checkForViolationInPairs(file, comparePairs,
|
||||
S3GuardFsck.Violation.NO_METADATA_ENTRY);
|
||||
} finally {
|
||||
// delete the working directory with all of its contents
|
||||
cleanup(file, cwd);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIDetectNoParentEntry() throws Exception {
|
||||
final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
final Path file = new Path(cwd, "file");
|
||||
try {
|
||||
touchGuardedAndWaitRaw(file);
|
||||
// delete the parent from the MS
|
||||
metadataStore.forgetMetadata(cwd);
|
||||
|
||||
final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
|
||||
final List<S3GuardFsck.ComparePair> comparePairs =
|
||||
s3GuardFsck.compareS3ToMs(cwd);
|
||||
|
||||
assertComparePairsSize(comparePairs, 2);
|
||||
// check the parent that it does not exist
|
||||
checkForViolationInPairs(cwd, comparePairs,
|
||||
S3GuardFsck.Violation.NO_METADATA_ENTRY);
|
||||
// check the child that there's no parent entry.
|
||||
checkForViolationInPairs(file, comparePairs,
|
||||
S3GuardFsck.Violation.NO_PARENT_ENTRY);
|
||||
} finally {
|
||||
cleanup(file, cwd);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIDetectParentIsAFile() throws Exception {
|
||||
final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
final Path file = new Path(cwd, "file");
|
||||
try {
|
||||
touchGuardedAndWaitRaw(file);
|
||||
// modify the cwd metadata and set that it's not a directory
|
||||
final S3AFileStatus newParentFile = MetadataStoreTestBase
|
||||
.basicFileStatus(cwd, 1, false, 1);
|
||||
metadataStore.put(new PathMetadata(newParentFile));
|
||||
|
||||
final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
|
||||
final List<S3GuardFsck.ComparePair> comparePairs =
|
||||
s3GuardFsck.compareS3ToMs(cwd);
|
||||
|
||||
assertComparePairsSize(comparePairs, 2);
|
||||
// check the parent that it does not exist
|
||||
checkForViolationInPairs(cwd, comparePairs,
|
||||
S3GuardFsck.Violation.DIR_IN_S3_FILE_IN_MS);
|
||||
// check the child that the parent is a file.
|
||||
checkForViolationInPairs(file, comparePairs,
|
||||
S3GuardFsck.Violation.PARENT_IS_A_FILE);
|
||||
} finally {
|
||||
cleanup(file, cwd);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIDetectParentTombstoned() throws Exception {
|
||||
final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
final Path file = new Path(cwd, "file");
|
||||
try {
|
||||
touchGuardedAndWaitRaw(file);
|
||||
// modify the parent metadata and set that it's not a directory
|
||||
final PathMetadata cwdPmd = metadataStore.get(cwd);
|
||||
cwdPmd.setIsDeleted(true);
|
||||
metadataStore.put(cwdPmd);
|
||||
|
||||
final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
|
||||
final List<S3GuardFsck.ComparePair> comparePairs =
|
||||
s3GuardFsck.compareS3ToMs(cwd);
|
||||
|
||||
// check the child that the parent is tombstoned
|
||||
checkForViolationInPairs(file, comparePairs,
|
||||
S3GuardFsck.Violation.PARENT_TOMBSTONED);
|
||||
} finally {
|
||||
cleanup(file, cwd);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIDetectDirInS3FileInMs() throws Exception {
|
||||
final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
try {
|
||||
// create a file with guarded fs
|
||||
mkdirs(cwd);
|
||||
awaitFileStatus(guardedFs, cwd);
|
||||
// modify the cwd metadata and set that it's not a directory
|
||||
final S3AFileStatus newParentFile = MetadataStoreTestBase
|
||||
.basicFileStatus(cwd, 1, false, 1);
|
||||
metadataStore.put(new PathMetadata(newParentFile));
|
||||
|
||||
final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
|
||||
final List<S3GuardFsck.ComparePair> comparePairs =
|
||||
s3GuardFsck.compareS3ToMs(cwd);
|
||||
assertComparePairsSize(comparePairs, 1);
|
||||
|
||||
// check the child that the dir in s3 is a file in the ms
|
||||
checkForViolationInPairs(cwd, comparePairs,
|
||||
S3GuardFsck.Violation.DIR_IN_S3_FILE_IN_MS);
|
||||
} finally {
|
||||
cleanup(cwd);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIDetectFileInS3DirInMs() throws Exception {
|
||||
final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
final Path file = new Path(cwd, "file");
|
||||
try {
|
||||
touchGuardedAndWaitRaw(file);
|
||||
// modify the cwd metadata and set that it's not a directory
|
||||
final S3AFileStatus newFile = MetadataStoreTestBase
|
||||
.basicFileStatus(file, 1, true, 1);
|
||||
metadataStore.put(new PathMetadata(newFile));
|
||||
|
||||
final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
|
||||
final List<S3GuardFsck.ComparePair> comparePairs =
|
||||
s3GuardFsck.compareS3ToMs(cwd);
|
||||
|
||||
assertComparePairsSize(comparePairs, 1);
|
||||
// check the child that the dir in s3 is a file in the ms
|
||||
checkForViolationInPairs(file, comparePairs,
|
||||
S3GuardFsck.Violation.FILE_IN_S3_DIR_IN_MS);
|
||||
} finally {
|
||||
cleanup(file, cwd);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIAuthoritativeDirectoryContentMismatch() throws Exception {
|
||||
assumeTrue("Authoritative directory listings should be enabled for this "
|
||||
+ "test", guardedFs.hasAuthoritativeMetadataStore());
|
||||
// first dir listing will be correct
|
||||
final Path cwdCorrect = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
final Path fileC1 = new Path(cwdCorrect, "fileC1");
|
||||
final Path fileC2 = new Path(cwdCorrect, "fileC2");
|
||||
|
||||
// second dir listing will be incorrect: missing entry from Dynamo
|
||||
final Path cwdIncorrect = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
final Path fileIc1 = new Path(cwdIncorrect, "fileC1");
|
||||
final Path fileIc2 = new Path(cwdIncorrect, "fileC2");
|
||||
try {
|
||||
touchGuardedAndWaitRaw(fileC1);
|
||||
touchGuardedAndWaitRaw(fileC2);
|
||||
touchGuardedAndWaitRaw(fileIc1);
|
||||
|
||||
// get listing from ms and set it authoritative
|
||||
final DirListingMetadata dlmC = metadataStore.listChildren(cwdCorrect);
|
||||
final DirListingMetadata dlmIc = metadataStore.listChildren(cwdIncorrect);
|
||||
dlmC.setAuthoritative(true);
|
||||
dlmIc.setAuthoritative(true);
|
||||
metadataStore.put(dlmC, null);
|
||||
metadataStore.put(dlmIc, null);
|
||||
|
||||
// add a file raw so the listing will be different.
|
||||
touchRawAndWaitRaw(fileIc2);
|
||||
|
||||
final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
|
||||
final List<S3GuardFsck.ComparePair> pairsCorrect =
|
||||
s3GuardFsck.compareS3ToMs(cwdCorrect);
|
||||
final List<S3GuardFsck.ComparePair> pairsIncorrect =
|
||||
s3GuardFsck.compareS3ToMs(cwdIncorrect);
|
||||
|
||||
// Assert that the correct dir does not contain the violation.
|
||||
assertTrue(pairsCorrect.stream()
|
||||
.noneMatch(p -> p.getPath().equals(cwdCorrect)));
|
||||
|
||||
// Assert that the incorrect listing contains the violation.
|
||||
checkForViolationInPairs(cwdIncorrect, pairsIncorrect,
|
||||
S3GuardFsck.Violation.AUTHORITATIVE_DIRECTORY_CONTENT_MISMATCH);
|
||||
} finally {
|
||||
cleanup(fileC1, fileC2, fileIc1, fileIc2, cwdCorrect, cwdIncorrect);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIDetectLengthMismatch() throws Exception {
|
||||
final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
final Path file = new Path(cwd, "file");
|
||||
try {
|
||||
// create a file with guarded fs
|
||||
touchGuardedAndWaitRaw(file);
|
||||
|
||||
// modify the file metadata so the length will not match
|
||||
final S3AFileStatus newFile = MetadataStoreTestBase
|
||||
.basicFileStatus(file, 9999, false, 1);
|
||||
metadataStore.put(new PathMetadata(newFile));
|
||||
|
||||
final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
|
||||
final List<S3GuardFsck.ComparePair> comparePairs =
|
||||
s3GuardFsck.compareS3ToMs(cwd);
|
||||
|
||||
assertComparePairsSize(comparePairs, 1);
|
||||
// Assert that the correct dir does not contain the violation.
|
||||
assertTrue(comparePairs.stream()
|
||||
.noneMatch(p -> p.getPath().equals(cwd)));
|
||||
// Assert that the incorrect file meta contains the violation.
|
||||
checkForViolationInPairs(file, comparePairs,
|
||||
S3GuardFsck.Violation.LENGTH_MISMATCH);
|
||||
} finally {
|
||||
cleanup(file, cwd);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIDetectModTimeMismatch() throws Exception {
|
||||
final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
final Path file = new Path(cwd, "file");
|
||||
try {
|
||||
// create a file with guarded fs
|
||||
touchGuardedAndWaitRaw(file);
|
||||
// modify the parent meta entry so the MOD_TIME will surely be up to date
|
||||
final FileStatus oldCwdFileStatus = rawFs.getFileStatus(cwd);
|
||||
final S3AFileStatus newCwdFileStatus = MetadataStoreTestBase
|
||||
.basicFileStatus(cwd, 0, true,
|
||||
oldCwdFileStatus.getModificationTime());
|
||||
metadataStore.put(new PathMetadata(newCwdFileStatus));
|
||||
|
||||
// modify the file metadata so the length will not match
|
||||
final S3AFileStatus newFileStatus = MetadataStoreTestBase
|
||||
.basicFileStatus(file, 0, false, 1);
|
||||
metadataStore.put(new PathMetadata(newFileStatus));
|
||||
|
||||
final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
|
||||
final List<S3GuardFsck.ComparePair> comparePairs =
|
||||
s3GuardFsck.compareS3ToMs(cwd);
|
||||
|
||||
assertComparePairsSize(comparePairs, 1);
|
||||
// Assert that the correct dir does not contain the violation.
|
||||
assertTrue(comparePairs.stream()
|
||||
.noneMatch(p -> p.getPath().equals(cwd)));
|
||||
// check the file meta that there's a violation.
|
||||
checkForViolationInPairs(file, comparePairs,
|
||||
S3GuardFsck.Violation.MOD_TIME_MISMATCH);
|
||||
} finally {
|
||||
cleanup(file, cwd);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIEtagMismatch() throws Exception {
|
||||
final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
final Path file = new Path(cwd, "file");
|
||||
try {
|
||||
touchGuardedAndWaitRaw(file);
|
||||
// modify the file metadata so the etag will not match
|
||||
final S3AFileStatus newFileStatus = new S3AFileStatus(1, 1, file, 1, "",
|
||||
"etag", "versionId");
|
||||
metadataStore.put(new PathMetadata(newFileStatus));
|
||||
|
||||
final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
|
||||
final List<S3GuardFsck.ComparePair> comparePairs =
|
||||
s3GuardFsck.compareS3ToMs(cwd);
|
||||
|
||||
assertComparePairsSize(comparePairs, 1);
|
||||
// check the child that there's a BLOCKSIZE_MISMATCH
|
||||
checkForViolationInPairs(file, comparePairs,
|
||||
S3GuardFsck.Violation.ETAG_MISMATCH);
|
||||
} finally {
|
||||
cleanup(file, cwd);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testINoEtag() throws Exception {
|
||||
final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
final Path file1 = new Path(cwd, "file1");
|
||||
final Path file2 = new Path(cwd, "file2");
|
||||
try {
|
||||
// create a file1 with guarded fs
|
||||
touchGuardedAndWaitRaw(file1);
|
||||
touchGuardedAndWaitRaw(file2);
|
||||
// modify the file1 metadata so there's no etag
|
||||
final S3AFileStatus newFile1Status =
|
||||
new S3AFileStatus(1, 1, file1, 1, "", null, "versionId");
|
||||
final S3AFileStatus newFile2Status =
|
||||
new S3AFileStatus(1, 1, file2, 1, "", "etag", "versionId");
|
||||
metadataStore.put(new PathMetadata(newFile1Status));
|
||||
metadataStore.put(new PathMetadata(newFile2Status));
|
||||
|
||||
final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
|
||||
final List<S3GuardFsck.ComparePair> comparePairs =
|
||||
s3GuardFsck.compareS3ToMs(cwd);
|
||||
|
||||
assertComparePairsSize(comparePairs, 2);
|
||||
|
||||
// check file 1 that there's NO_ETAG
|
||||
checkForViolationInPairs(file1, comparePairs,
|
||||
S3GuardFsck.Violation.NO_ETAG);
|
||||
// check the child that there's no NO_ETAG violation
|
||||
checkNoViolationInPairs(file2, comparePairs,
|
||||
S3GuardFsck.Violation.NO_ETAG);
|
||||
} finally {
|
||||
cleanup(file1, file2, cwd);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTombstonedInMsNotDeletedInS3() throws Exception {
|
||||
final Path cwd = path("/" + getMethodName() + "-" + UUID.randomUUID());
|
||||
final Path file = new Path(cwd, "file");
|
||||
try {
|
||||
// create a file with guarded fs
|
||||
touchGuardedAndWaitRaw(file);
|
||||
// set isDeleted flag in ms to true (tombstone item)
|
||||
final PathMetadata fileMeta = metadataStore.get(file);
|
||||
fileMeta.setIsDeleted(true);
|
||||
metadataStore.put(fileMeta);
|
||||
|
||||
final S3GuardFsck s3GuardFsck = new S3GuardFsck(rawFs, metadataStore);
|
||||
final List<S3GuardFsck.ComparePair> comparePairs =
|
||||
s3GuardFsck.compareS3ToMs(cwd);
|
||||
|
||||
assertComparePairsSize(comparePairs, 1);
|
||||
|
||||
// check fil1 that there's the violation
|
||||
checkForViolationInPairs(file, comparePairs,
|
||||
S3GuardFsck.Violation.TOMBSTONED_IN_MS_NOT_DELETED_IN_S3);
|
||||
// check the child that there's no NO_ETAG violation
|
||||
} finally {
|
||||
cleanup(file, cwd);
|
||||
}
|
||||
}
|
||||
|
||||
protected void assertComparePairsSize(
|
||||
List<S3GuardFsck.ComparePair> comparePairs, int num) {
|
||||
Assertions.assertThat(comparePairs)
|
||||
.describedAs("Number of compare pairs")
|
||||
.hasSize(num);
|
||||
}
|
||||
|
||||
private void touchGuardedAndWaitRaw(Path file) throws Exception {
|
||||
touchAndWait(guardedFs, rawFs, file);
|
||||
}
|
||||
|
||||
private void touchRawAndWaitRaw(Path file) throws Exception {
|
||||
touchAndWait(rawFs, rawFs, file);
|
||||
}
|
||||
|
||||
private void touchAndWait(FileSystem forTouch, FileSystem forWait, Path file)
|
||||
throws IOException {
|
||||
touch(forTouch, file);
|
||||
touch(forWait, file);
|
||||
}
|
||||
|
||||
private void checkForViolationInPairs(Path file,
|
||||
List<S3GuardFsck.ComparePair> comparePairs,
|
||||
S3GuardFsck.Violation violation) {
|
||||
final S3GuardFsck.ComparePair childPair = comparePairs.stream()
|
||||
.filter(p -> p.getPath().equals(file))
|
||||
.findFirst().get();
|
||||
assertNotNull("The pair should not be null.", childPair);
|
||||
assertTrue("The pair must contain a violation.",
|
||||
childPair.containsViolation());
|
||||
Assertions.assertThat(childPair.getViolations())
|
||||
.describedAs("Violations in the pair")
|
||||
.contains(violation);
|
||||
}
|
||||
|
||||
private void checkNoViolationInPairs(Path file2,
|
||||
List<S3GuardFsck.ComparePair> comparePairs,
|
||||
S3GuardFsck.Violation violation) {
|
||||
final S3GuardFsck.ComparePair file2Pair = comparePairs.stream()
|
||||
.filter(p -> p.getPath().equals(file2))
|
||||
.findFirst().get();
|
||||
assertNotNull("The pair should not be null.", file2Pair);
|
||||
Assertions.assertThat(file2Pair.getViolations())
|
||||
.describedAs("Violations in the pair")
|
||||
.doesNotContain(violation);
|
||||
}
|
||||
|
||||
private void cleanup(Path... paths) {
|
||||
for (Path path : paths) {
|
||||
try {
|
||||
metadataStore.forgetMetadata(path);
|
||||
rawFs.delete(path, true);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error during cleanup.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.fs.s3a.s3guard;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -32,6 +33,7 @@
|
||||
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
|
||||
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
|
||||
import com.amazonaws.services.dynamodbv2.model.Tag;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.AssumptionViolatedException;
|
||||
@ -42,6 +44,7 @@
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Destroy;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Init;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_NAME_KEY;
|
||||
@ -289,4 +292,31 @@ public void testDestroyUnknownTable() throws Throwable {
|
||||
"-meta", "dynamodb://" + getTestTableName(DYNAMODB_TABLE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCLIFsckWithoutParam() throws Exception {
|
||||
intercept(ExitUtil.ExitException.class, () -> run(Fsck.NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCLIFsckWithParam() throws Exception {
|
||||
final int result = run(S3GuardTool.Fsck.NAME, "-check",
|
||||
"s3a://" + getFileSystem().getBucket());
|
||||
LOG.info("This test serves the purpose to run fsck with the correct " +
|
||||
"parameters, so there will be no exception thrown. " +
|
||||
"The return value of the run: {}", result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCLIFsckWithParamParentOfRoot() throws Exception {
|
||||
intercept(IOException.class, "Invalid URI",
|
||||
() -> run(S3GuardTool.Fsck.NAME, "-check",
|
||||
"s3a://" + getFileSystem().getBucket() + "/.."));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCLIFsckFailInitializeFs() throws Exception {
|
||||
intercept(FileNotFoundException.class, "does not exist",
|
||||
() -> run(S3GuardTool.Fsck.NAME, "-check",
|
||||
"s3a://this-bucket-does-not-exist-" + UUID.randomUUID()));
|
||||
}
|
||||
}
|
||||
|
@ -1199,7 +1199,7 @@ protected S3AFileStatus basicFileStatus(Path path, int size, boolean isDir)
|
||||
return basicFileStatus(path, size, isDir, modTime);
|
||||
}
|
||||
|
||||
protected S3AFileStatus basicFileStatus(int size, boolean isDir,
|
||||
public static S3AFileStatus basicFileStatus(int size, boolean isDir,
|
||||
long blockSize, long modificationTime, Path path) {
|
||||
if (isDir) {
|
||||
return new S3AFileStatus(Tristate.UNKNOWN, path, null);
|
||||
|
Loading…
Reference in New Issue
Block a user