+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.LocatedFileStatus;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link LocatedFileStatus} extended to also carry ETag and object version ID.
+ */
+public class S3ALocatedFileStatus extends LocatedFileStatus {
+
+ private static final long serialVersionUID = 3597192103662929338L;
+
+ private final String eTag;
+ private final String versionId;
+
+ public S3ALocatedFileStatus(S3AFileStatus status, BlockLocation[] locations,
+ String eTag, String versionId) {
+ super(checkNotNull(status), locations);
+ this.eTag = eTag;
+ this.versionId = versionId;
+ }
+
+ public String getETag() {
+ return eTag;
+ }
+
+ public String getVersionId() {
+ return versionId;
+ }
+
+ // equals() and hashCode() overridden to avoid FindBugs warning.
+ // Base implementation is equality on Path only, which is still appropriate.
+
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 85181c3af8..4d9fc3292f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -533,16 +533,20 @@ public static String stringify(AmazonS3Exception e) {
* @param summary summary from AWS
* @param blockSize block size to declare.
* @param owner owner of the file
+ * @param eTag S3 object eTag or null if unavailable
+ * @param versionId S3 object versionId or null if unavailable
* @return a status entry
*/
public static S3AFileStatus createFileStatus(Path keyPath,
S3ObjectSummary summary,
long blockSize,
- String owner) {
+ String owner,
+ String eTag,
+ String versionId) {
long size = summary.getSize();
return createFileStatus(keyPath,
objectRepresentsDirectory(summary.getKey(), size),
- size, summary.getLastModified(), blockSize, owner);
+ size, summary.getLastModified(), blockSize, owner, eTag, versionId);
}
/**
@@ -555,22 +559,27 @@ public static S3AFileStatus createFileStatus(Path keyPath,
* @param size file length
* @param blockSize block size for file status
* @param owner Hadoop username
+ * @param eTag S3 object eTag or null if unavailable
+ * @param versionId S3 object versionId or null if unavailable
* @return a status entry
*/
public static S3AFileStatus createUploadFileStatus(Path keyPath,
- boolean isDir, long size, long blockSize, String owner) {
+ boolean isDir, long size, long blockSize, String owner,
+ String eTag, String versionId) {
Date date = isDir ? null : new Date();
- return createFileStatus(keyPath, isDir, size, date, blockSize, owner);
+ return createFileStatus(keyPath, isDir, size, date, blockSize, owner,
+ eTag, versionId);
}
/* Date 'modified' is ignored when isDir is true. */
private static S3AFileStatus createFileStatus(Path keyPath, boolean isDir,
- long size, Date modified, long blockSize, String owner) {
+ long size, Date modified, long blockSize, String owner,
+ String eTag, String versionId) {
if (isDir) {
return new S3AFileStatus(Tristate.UNKNOWN, keyPath, owner);
} else {
return new S3AFileStatus(size, dateToLong(modified), keyPath, blockSize,
- owner);
+ owner, eTag, versionId);
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java
index 023d0c3cf2..1a0135bb9b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java
@@ -42,6 +42,7 @@ public S3GuardExistsRetryPolicy(Configuration conf) {
protected Map, RetryPolicy> createExceptionMap() {
Map, RetryPolicy> b = super.createExceptionMap();
b.put(FileNotFoundException.class, retryIdempotentCalls);
+ b.put(RemoteFileChangedException.class, retryIdempotentCalls);
return b;
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
index d67e3e1e8c..2e62ff6728 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
@@ -34,16 +34,22 @@ public class S3ObjectAttributes {
private final String key;
private final S3AEncryptionMethods serverSideEncryptionAlgorithm;
private final String serverSideEncryptionKey;
+ private final String eTag;
+ private final String versionId;
public S3ObjectAttributes(
String bucket,
String key,
S3AEncryptionMethods serverSideEncryptionAlgorithm,
- String serverSideEncryptionKey) {
+ String serverSideEncryptionKey,
+ String eTag,
+ String versionId) {
this.bucket = bucket;
this.key = key;
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
this.serverSideEncryptionKey = serverSideEncryptionKey;
+ this.eTag = eTag;
+ this.versionId = versionId;
}
public String getBucket() {
@@ -61,4 +67,12 @@ public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
public String getServerSideEncryptionKey() {
return serverSideEncryptionKey;
}
+
+ public String getETag() {
+ return eTag;
+ }
+
+ public String getVersionId() {
+ return versionId;
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index ea091720c2..54386addd7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -226,7 +226,8 @@ public String initiateMultiPartUpload(String destKey) throws IOException {
/**
* Finalize a multipart PUT operation.
* This completes the upload, and, if that works, calls
- * {@link S3AFileSystem#finishedWrite(String, long)} to update the filesystem.
+ * {@link S3AFileSystem#finishedWrite(String, long, String, String)}
+ * to update the filesystem.
* Retry policy: retrying, translated.
* @param destKey destination of the commit
* @param uploadId multipart operation Id
@@ -247,20 +248,22 @@ private CompleteMultipartUploadResult finalizeMultipartUpload(
throw new IOException(
"No upload parts in multipart upload to " + destKey);
}
- CompleteMultipartUploadResult uploadResult = invoker.retry("Completing multipart commit", destKey,
- true,
- retrying,
- () -> {
- // a copy of the list is required, so that the AWS SDK doesn't
- // attempt to sort an unmodifiable list.
- return owner.getAmazonS3Client().completeMultipartUpload(
- new CompleteMultipartUploadRequest(bucket,
- destKey,
- uploadId,
- new ArrayList<>(partETags)));
- }
+ CompleteMultipartUploadResult uploadResult =
+ invoker.retry("Completing multipart commit", destKey,
+ true,
+ retrying,
+ () -> {
+ // a copy of the list is required, so that the AWS SDK doesn't
+ // attempt to sort an unmodifiable list.
+ return owner.getAmazonS3Client().completeMultipartUpload(
+ new CompleteMultipartUploadRequest(bucket,
+ destKey,
+ uploadId,
+ new ArrayList<>(partETags)));
+ }
);
- owner.finishedWrite(destKey, length);
+ owner.finishedWrite(destKey, length, uploadResult.getETag(),
+ uploadResult.getVersionId());
return uploadResult;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
index f3d8bc20c8..b0e9d6f454 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
@@ -20,8 +20,11 @@
import java.util.Locale;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.transfer.model.CopyResult;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +33,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -47,7 +51,7 @@ public abstract class ChangeDetectionPolicy {
LoggerFactory.getLogger(ChangeDetectionPolicy.class);
@VisibleForTesting
- public static final String CHANGE_DETECTED = "change detected";
+ public static final String CHANGE_DETECTED = "change detected on client";
private final Mode mode;
private final boolean requireVersion;
@@ -200,6 +204,28 @@ public static ChangeDetectionPolicy createPolicy(final Mode mode,
public abstract String getRevisionId(ObjectMetadata objectMetadata,
String uri);
+ /**
+ * Like {{@link #getRevisionId(ObjectMetadata, String)}}, but retrieves the
+ * revision identifier from {@link S3ObjectAttributes}.
+ *
+ * @param s3Attributes the object attributes
+ * @return the revisionId string as interpreted by this policy, or potentially
+ * null if the attribute is unavailable (such as when the policy says to use
+ * versionId but object versioning is not enabled for the bucket).
+ */
+ public abstract String getRevisionId(S3ObjectAttributes s3Attributes);
+
+ /**
+ * Like {{@link #getRevisionId(ObjectMetadata, String)}}, but retrieves the
+ * revision identifier from {@link CopyResult}.
+ *
+ * @param copyResult the copy result
+ * @return the revisionId string as interpreted by this policy, or potentially
+ * null if the attribute is unavailable (such as when the policy says to use
+ * versionId but object versioning is not enabled for the bucket).
+ */
+ public abstract String getRevisionId(CopyResult copyResult);
+
/**
* Applies the given {@link #getRevisionId(ObjectMetadata, String) revisionId}
* as a server-side qualification on the {@code GetObjectRequest}.
@@ -210,6 +236,26 @@ public abstract String getRevisionId(ObjectMetadata objectMetadata,
public abstract void applyRevisionConstraint(GetObjectRequest request,
String revisionId);
+ /**
+ * Applies the given {@link #getRevisionId(ObjectMetadata, String) revisionId}
+ * as a server-side qualification on the {@code CopyObjectRequest}.
+ *
+ * @param request the request
+ * @param revisionId the revision id
+ */
+ public abstract void applyRevisionConstraint(CopyObjectRequest request,
+ String revisionId);
+
+ /**
+ * Applies the given {@link #getRevisionId(ObjectMetadata, String) revisionId}
+ * as a server-side qualification on the {@code GetObjectMetadataRequest}.
+ *
+ * @param request the request
+ * @param revisionId the revision id
+ */
+ public abstract void applyRevisionConstraint(GetObjectMetadataRequest request,
+ String revisionId);
+
/**
* Takes appropriate action based on {@link #getMode() mode} when a change has
* been detected.
@@ -234,6 +280,7 @@ public ImmutablePair onChangeDetected(
long position,
String operation,
long timesAlreadyDetected) {
+ String positionText = position >= 0 ? (" at " + position) : "";
switch (mode) {
case None:
// something changed; we don't care.
@@ -242,8 +289,9 @@ public ImmutablePair onChangeDetected(
if (timesAlreadyDetected == 0) {
// only warn on the first detection to avoid a noisy log
LOG.warn(
- String.format("%s change detected on %s %s at %d. Expected %s got %s",
- getSource(), operation, uri, position, revisionId,
+ String.format(
+ "%s change detected on %s %s%s. Expected %s got %s",
+ getSource(), operation, uri, positionText, revisionId,
newRevisionId));
return new ImmutablePair<>(true, null);
}
@@ -251,15 +299,16 @@ public ImmutablePair onChangeDetected(
case Client:
case Server:
default:
- // mode == Client (or Server, but really won't be called for Server)
+ // mode == Client or Server; will trigger on version failures
+ // of getObjectMetadata even on server.
return new ImmutablePair<>(true,
new RemoteFileChangedException(uri,
operation,
String.format("%s "
+ CHANGE_DETECTED
- + " while reading at position %s."
+ + " during %s%s."
+ " Expected %s got %s",
- getSource(), position, revisionId, newRevisionId)));
+ getSource(), operation, positionText, revisionId, newRevisionId)));
}
}
@@ -277,11 +326,38 @@ public String getRevisionId(ObjectMetadata objectMetadata, String uri) {
return objectMetadata.getETag();
}
+ @Override
+ public String getRevisionId(S3ObjectAttributes s3Attributes) {
+ return s3Attributes.getETag();
+ }
+
+ @Override
+ public String getRevisionId(CopyResult copyResult) {
+ return copyResult.getETag();
+ }
+
@Override
public void applyRevisionConstraint(GetObjectRequest request,
String revisionId) {
- LOG.debug("Restricting request to etag {}", revisionId);
- request.withMatchingETagConstraint(revisionId);
+ if (revisionId != null) {
+ LOG.debug("Restricting get request to etag {}", revisionId);
+ request.withMatchingETagConstraint(revisionId);
+ }
+ }
+
+ @Override
+ public void applyRevisionConstraint(CopyObjectRequest request,
+ String revisionId) {
+ if (revisionId != null) {
+ LOG.debug("Restricting copy request to etag {}", revisionId);
+ request.withMatchingETagConstraint(revisionId);
+ }
+ }
+
+ @Override
+ public void applyRevisionConstraint(GetObjectMetadataRequest request,
+ String revisionId) {
+ // GetObjectMetadataRequest doesn't support eTag qualification
}
@Override
@@ -323,11 +399,41 @@ public String getRevisionId(ObjectMetadata objectMetadata, String uri) {
return versionId;
}
+ @Override
+ public String getRevisionId(S3ObjectAttributes s3Attributes) {
+ return s3Attributes.getVersionId();
+ }
+
+ @Override
+ public String getRevisionId(CopyResult copyResult) {
+ return copyResult.getVersionId();
+ }
+
@Override
public void applyRevisionConstraint(GetObjectRequest request,
String revisionId) {
- LOG.debug("Restricting request to version {}", revisionId);
- request.withVersionId(revisionId);
+ if (revisionId != null) {
+ LOG.debug("Restricting get request to version {}", revisionId);
+ request.withVersionId(revisionId);
+ }
+ }
+
+ @Override
+ public void applyRevisionConstraint(CopyObjectRequest request,
+ String revisionId) {
+ if (revisionId != null) {
+ LOG.debug("Restricting copy request to version {}", revisionId);
+ request.withSourceVersionId(revisionId);
+ }
+ }
+
+ @Override
+ public void applyRevisionConstraint(GetObjectMetadataRequest request,
+ String revisionId) {
+ if (revisionId != null) {
+ LOG.debug("Restricting metadata request to version {}", revisionId);
+ request.withVersionId(revisionId);
+ }
}
@Override
@@ -361,12 +467,34 @@ public String getRevisionId(final ObjectMetadata objectMetadata,
return null;
}
+ @Override
+ public String getRevisionId(final S3ObjectAttributes s3ObjectAttributes) {
+ return null;
+ }
+
+ @Override
+ public String getRevisionId(CopyResult copyResult) {
+ return null;
+ }
+
@Override
public void applyRevisionConstraint(final GetObjectRequest request,
final String revisionId) {
}
+ @Override
+ public void applyRevisionConstraint(CopyObjectRequest request,
+ String revisionId) {
+
+ }
+
+ @Override
+ public void applyRevisionConstraint(GetObjectMetadataRequest request,
+ String revisionId) {
+
+ }
+
@Override
public String toString() {
return "NoChangeDetection";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
index f76602b953..75fecd5f14 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
@@ -20,11 +20,15 @@
import java.util.concurrent.atomic.AtomicLong;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.SdkBaseException;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.transfer.model.CopyResult;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.fs.s3a.NoVersionAttributeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,14 +36,18 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.s3a.NoVersionAttributeException;
import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.http.HttpStatus.SC_PRECONDITION_FAILED;
/**
- * Change tracking for input streams: the revision ID/etag
- * the previous request is recorded and when the next request comes in,
- * it is compared.
+ * Change tracking for input streams: the version ID or etag of the object is
+ * tracked and compared on open/re-open. An initial version ID or etag may or
+ * may not be available, depending on usage (e.g. if S3Guard is utilized).
+ *
* Self-contained for testing and use in different streams.
*/
@InterfaceAudience.Private
@@ -49,7 +57,7 @@ public class ChangeTracker {
private static final Logger LOG =
LoggerFactory.getLogger(ChangeTracker.class);
- public static final String CHANGE_REPORTED_BY_S3 = "reported by S3";
+ public static final String CHANGE_REPORTED_BY_S3 = "Change reported by S3";
/** Policy to use. */
private final ChangeDetectionPolicy policy;
@@ -76,13 +84,20 @@ public class ChangeTracker {
* @param uri URI of object being tracked
* @param policy policy to track.
* @param versionMismatches reference to the version mismatch counter
+ * @param s3ObjectAttributes attributes of the object, potentially including
+ * an eTag or versionId to match depending on {@code policy}
*/
public ChangeTracker(final String uri,
final ChangeDetectionPolicy policy,
- final AtomicLong versionMismatches) {
+ final AtomicLong versionMismatches,
+ final S3ObjectAttributes s3ObjectAttributes) {
this.policy = checkNotNull(policy);
this.uri = uri;
this.versionMismatches = versionMismatches;
+ this.revisionId = policy.getRevisionId(s3ObjectAttributes);
+ if (revisionId != null) {
+ LOG.debug("Revision ID for object at {}: {}", uri, revisionId);
+ }
}
public String getRevisionId() {
@@ -115,6 +130,33 @@ public boolean maybeApplyConstraint(
return false;
}
+ /**
+ * Apply any revision control set by the policy if it is to be
+ * enforced on the server.
+ * @param request request to modify
+ * @return true iff a constraint was added.
+ */
+ public boolean maybeApplyConstraint(
+ final CopyObjectRequest request) {
+
+ if (policy.getMode() == ChangeDetectionPolicy.Mode.Server
+ && revisionId != null) {
+ policy.applyRevisionConstraint(request, revisionId);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean maybeApplyConstraint(
+ final GetObjectMetadataRequest request) {
+
+ if (policy.getMode() == ChangeDetectionPolicy.Mode.Server
+ && revisionId != null) {
+ policy.applyRevisionConstraint(request, revisionId);
+ return true;
+ }
+ return false;
+ }
/**
* Process the response from the server for validation against the
@@ -135,29 +177,106 @@ public void processResponse(final S3Object object,
// object was not returned.
versionMismatches.incrementAndGet();
throw new RemoteFileChangedException(uri, operation,
- String.format("%s change "
- + CHANGE_REPORTED_BY_S3
- + " while reading"
+ String.format(CHANGE_REPORTED_BY_S3
+ + " during %s"
+ " at position %s."
- + " Version %s was unavailable",
- getSource(),
+ + " %s %s was unavailable",
+ operation,
pos,
+ getSource(),
getRevisionId()));
} else {
throw new PathIOException(uri, "No data returned from GET request");
}
}
- final ObjectMetadata metadata = object.getObjectMetadata();
+ processMetadata(object.getObjectMetadata(), operation);
+ }
+
+ /**
+ * Process the response from the server for validation against the
+ * change policy.
+ * @param copyResult result of a copy operation
+ * @throws PathIOException raised on failure
+ * @throws RemoteFileChangedException if the remote file has changed.
+ */
+ public void processResponse(final CopyResult copyResult)
+ throws PathIOException {
+ // ETag (sometimes, depending on encryption and/or multipart) is not the
+ // same on the copied object as the original. Version Id seems to never
+ // be the same on the copy. As such, there isn't really anything that
+ // can be verified on the response, except that a revision ID is present
+ // if required.
+ String newRevisionId = policy.getRevisionId(copyResult);
+ LOG.debug("Copy result {}: {}", policy.getSource(), newRevisionId);
+ if (newRevisionId == null && policy.isRequireVersion()) {
+ throw new NoVersionAttributeException(uri, String.format(
+ "Change detection policy requires %s",
+ policy.getSource()));
+ }
+ }
+
+ /**
+ * Process an exception generated against the change policy.
+ * If the exception indicates the file has changed, this method throws
+ * {@code RemoteFileChangedException} with the original exception as the
+ * cause.
+ * @param e the exception
+ * @param operation the operation performed when the exception was
+ * generated (e.g. "copy", "read", "select").
+ * @throws RemoteFileChangedException if the remote file has changed.
+ */
+ public void processException(SdkBaseException e, String operation) throws
+ RemoteFileChangedException {
+ if (e instanceof AmazonServiceException) {
+ AmazonServiceException serviceException = (AmazonServiceException) e;
+ // This isn't really going to be hit due to
+ // https://github.com/aws/aws-sdk-java/issues/1644
+ if (serviceException.getStatusCode() == SC_PRECONDITION_FAILED) {
+ versionMismatches.incrementAndGet();
+ throw new RemoteFileChangedException(uri, operation, String.format(
+ RemoteFileChangedException.PRECONDITIONS_FAILED
+ + " on %s."
+ + " Version %s was unavailable",
+ getSource(),
+ getRevisionId()),
+ serviceException);
+ }
+ }
+ }
+
+ /**
+ * Process metadata response from server for validation against the change
+ * policy.
+ * @param metadata metadata returned from server
+ * @param operation operation in progress
+ * @throws PathIOException raised on failure
+ * @throws RemoteFileChangedException if the remote file has changed.
+ */
+ public void processMetadata(final ObjectMetadata metadata,
+ final String operation) throws PathIOException {
final String newRevisionId = policy.getRevisionId(metadata, uri);
+ processNewRevision(newRevisionId, operation, -1);
+ }
+
+ /**
+ * Validate a revision from the server against our expectations.
+ * @param newRevisionId new revision.
+ * @param operation operation in progress
+ * @param pos offset in the file; -1 for "none"
+ * @throws PathIOException raised on failure
+ * @throws RemoteFileChangedException if the remote file has changed.
+ */
+ private void processNewRevision(final String newRevisionId,
+ final String operation, final long pos) throws PathIOException {
if (newRevisionId == null && policy.isRequireVersion()) {
throw new NoVersionAttributeException(uri, String.format(
"Change detection policy requires %s",
policy.getSource()));
}
if (revisionId == null) {
- // revisionId is null on first (re)open. Pin it so change can be detected
- // if object has been updated
+ // revisionId may be null on first (re)open. Pin it so change can be
+ // detected if object has been updated
LOG.debug("Setting revision ID for object at {}: {}",
uri, newRevisionId);
revisionId = newRevisionId;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyOutcome.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyOutcome.java
new file mode 100644
index 0000000000..16459ac45b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyOutcome.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import com.amazonaws.SdkBaseException;
+import com.amazonaws.services.s3.transfer.Copy;
+import com.amazonaws.services.s3.transfer.model.CopyResult;
+
+/**
+ * Extracts the outcome of a TransferManager-executed copy operation.
+ */
+public final class CopyOutcome {
+
+ /**
+ * Result of a successful copy.
+ */
+ private final CopyResult copyResult;
+
+ /** the copy was interrupted. */
+ private final InterruptedException interruptedException;
+
+ /**
+ * The copy raised an AWS Exception of some form.
+ */
+ private final SdkBaseException awsException;
+
+ public CopyOutcome(CopyResult copyResult,
+ InterruptedException interruptedException,
+ SdkBaseException awsException) {
+ this.copyResult = copyResult;
+ this.interruptedException = interruptedException;
+ this.awsException = awsException;
+ }
+
+ public CopyResult getCopyResult() {
+ return copyResult;
+ }
+
+ public InterruptedException getInterruptedException() {
+ return interruptedException;
+ }
+
+ public SdkBaseException getAwsException() {
+ return awsException;
+ }
+
+ /**
+ * Calls {@code Copy.waitForCopyResult()} to await the result, converts
+ * it to a copy outcome.
+ * Exceptions caught and
+ * @param copy the copy operation.
+ * @return the outcome.
+ */
+ public static CopyOutcome waitForCopy(Copy copy) {
+ try {
+ CopyResult result = copy.waitForCopyResult();
+ return new CopyOutcome(result, null, null);
+ } catch (SdkBaseException e) {
+ return new CopyOutcome(null, null, e);
+ } catch (InterruptedException e) {
+ return new CopyOutcome(null, e, null);
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DDBPathMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DDBPathMetadata.java
index 78568dc4bb..e3a529ac14 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DDBPathMetadata.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DDBPathMetadata.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.fs.s3a.s3guard;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.Tristate;
/**
@@ -36,18 +36,18 @@ public DDBPathMetadata(PathMetadata pmd) {
this.setLastUpdated(pmd.getLastUpdated());
}
- public DDBPathMetadata(FileStatus fileStatus) {
+ public DDBPathMetadata(S3AFileStatus fileStatus) {
super(fileStatus);
this.isAuthoritativeDir = false;
}
- public DDBPathMetadata(FileStatus fileStatus, Tristate isEmptyDir,
+ public DDBPathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir,
boolean isDeleted) {
super(fileStatus, isEmptyDir, isDeleted);
this.isAuthoritativeDir = false;
}
- public DDBPathMetadata(FileStatus fileStatus, Tristate isEmptyDir,
+ public DDBPathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir,
boolean isDeleted, boolean isAuthoritativeDir, long lastUpdated) {
super(fileStatus, isEmptyDir, isDeleted);
this.isAuthoritativeDir = isAuthoritativeDir;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
index dcee35824e..88a46745b1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
@@ -28,9 +28,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
/**
* {@code DescendantsIterator} is a {@link RemoteIterator} that implements
@@ -83,7 +83,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class DescendantsIterator implements RemoteIterator {
+public class DescendantsIterator implements RemoteIterator {
private final MetadataStore metadataStore;
private final Queue queue = new LinkedList<>();
@@ -121,7 +121,7 @@ public boolean hasNext() throws IOException {
}
@Override
- public FileStatus next() throws IOException {
+ public S3AFileStatus next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException("No more descendants.");
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
index 88f24aa984..1059dd1486 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.Tristate;
/**
@@ -61,7 +62,7 @@ public class DirListingMetadata extends ExpirableMetadata {
* Create a directory listing metadata container.
*
* @param path Path of the directory. If this path has a host component, then
- * all paths added later via {@link #put(FileStatus)} must also have
+ * all paths added later via {@link #put(S3AFileStatus)} must also have
* the same host.
* @param listing Entries in the directory.
* @param isAuthoritative true iff listing is the full contents of the
@@ -225,7 +226,7 @@ public void remove(Path childPath) {
* @return true if the status was added or replaced with a new value. False
* if the same FileStatus value was already present.
*/
- public boolean put(FileStatus childFileStatus) {
+ public boolean put(S3AFileStatus childFileStatus) {
Preconditions.checkNotNull(childFileStatus,
"childFileStatus must be non-null");
Path childPath = childStatusToPathKey(childFileStatus);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index 769d3d4c4c..a9e1f33689 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -88,6 +88,7 @@
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3AUtils;
@@ -129,6 +130,14 @@
* This attribute is meaningful only to file items.
*
optional long attribute revealing block size of the file.
* This attribute is meaningful only to file items.
+ *
optional string attribute tracking the s3 eTag of the file.
+ * May be absent if the metadata was entered with a version of S3Guard
+ * before this was tracked.
+ * This attribute is meaningful only to file items.
+ *
optional string attribute tracking the s3 versionId of the file.
+ * May be absent if the metadata was entered with a version of S3Guard
+ * before this was tracked.
+ * This attribute is meaningful only to file items.
*
*
* The DynamoDB partition key is the parent, and the range key is the child.
@@ -155,20 +164,20 @@
* This is persisted to a single DynamoDB table as:
*
*
*
* This choice of schema is efficient for read access patterns.
@@ -618,16 +627,15 @@ private DDBPathMetadata innerGet(Path path, boolean wantEmptyDirectoryFlag)
}
/**
- * Make a FileStatus object for a directory at given path. The FileStatus
- * only contains what S3A needs, and omits mod time since S3A uses its own
- * implementation which returns current system time.
- * @param owner username of owner
+ * Make a S3AFileStatus object for a directory at given path.
+ * The FileStatus only contains what S3A needs, and omits mod time
+ * since S3A uses its own implementation which returns current system time.
+ * @param dirOwner username of owner
* @param path path to dir
- * @return new FileStatus
+ * @return new S3AFileStatus
*/
- private FileStatus makeDirStatus(String owner, Path path) {
- return new FileStatus(0, true, 1, 0, 0, 0, null,
- owner, null, path);
+ private S3AFileStatus makeDirStatus(String dirOwner, Path path) {
+ return new S3AFileStatus(Tristate.UNKNOWN, path, dirOwner);
}
@Override
@@ -710,7 +718,7 @@ Collection completeAncestry(
while (!parent.isRoot() && !ancestry.containsKey(parent)) {
LOG.debug("auto-create ancestor path {} for child path {}",
parent, path);
- final FileStatus status = makeDirStatus(parent, username);
+ final S3AFileStatus status = makeDirStatus(parent, username);
ancestry.put(parent, new DDBPathMetadata(status, Tristate.FALSE,
false));
parent = parent.getParent();
@@ -915,7 +923,7 @@ Collection fullPathsToPut(DDBPathMetadata meta)
while (path != null && !path.isRoot()) {
final Item item = getConsistentItem(path);
if (!itemExists(item)) {
- final FileStatus status = makeDirStatus(path, username);
+ final S3AFileStatus status = makeDirStatus(path, username);
metasToPut.add(new DDBPathMetadata(status, Tristate.FALSE, false,
meta.isAuthoritativeDir(), meta.getLastUpdated()));
path = path.getParent();
@@ -938,9 +946,8 @@ private boolean itemExists(Item item) {
}
/** Create a directory FileStatus using current system time as mod time. */
- static FileStatus makeDirStatus(Path f, String owner) {
- return new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
- null, owner, owner, f);
+ static S3AFileStatus makeDirStatus(Path f, String owner) {
+ return new S3AFileStatus(Tristate.UNKNOWN, f, owner);
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
index b8f9635dcd..9276388679 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.Tristate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -231,7 +232,7 @@ public void move(Collection pathsToDelete,
public void put(PathMetadata meta) throws IOException {
Preconditions.checkNotNull(meta);
- FileStatus status = meta.getFileStatus();
+ S3AFileStatus status = meta.getFileStatus();
Path path = standardize(status.getPath());
synchronized (this) {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java
index 378d10980c..e4e76c50d6 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java
@@ -33,9 +33,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
/**
* {@code MetadataStoreListFilesIterator} is a {@link RemoteIterator} that
@@ -85,14 +85,14 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MetadataStoreListFilesIterator implements
- RemoteIterator {
+ RemoteIterator {
public static final Logger LOG = LoggerFactory.getLogger(
MetadataStoreListFilesIterator.class);
private final boolean allowAuthoritative;
private final MetadataStore metadataStore;
private final Set tombstones = new HashSet<>();
- private Iterator leafNodesIterator = null;
+ private Iterator leafNodesIterator = null;
public MetadataStoreListFilesIterator(MetadataStore ms, PathMetadata meta,
boolean allowAuthoritative) throws IOException {
@@ -104,7 +104,7 @@ public MetadataStoreListFilesIterator(MetadataStore ms, PathMetadata meta,
private void prefetch(PathMetadata meta) throws IOException {
final Queue queue = new LinkedList<>();
- final Collection leafNodes = new ArrayList<>();
+ final Collection leafNodes = new ArrayList<>();
if (meta != null) {
final Path path = meta.getFileStatus().getPath();
@@ -121,7 +121,7 @@ private void prefetch(PathMetadata meta) throws IOException {
while(!queue.isEmpty()) {
PathMetadata nextMetadata = queue.poll();
- FileStatus nextStatus = nextMetadata.getFileStatus();
+ S3AFileStatus nextStatus = nextMetadata.getFileStatus();
if (nextStatus.isFile()) {
// All files are leaf nodes by definition
leafNodes.add(nextStatus);
@@ -159,7 +159,7 @@ public boolean hasNext() {
}
@Override
- public FileStatus next() {
+ public S3AFileStatus next() {
return leafNodesIterator.next();
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
index 56645fead7..8824439db7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
@@ -21,8 +21,8 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.Tristate;
/**
@@ -33,7 +33,7 @@
@InterfaceStability.Evolving
public class PathMetadata extends ExpirableMetadata {
- private final FileStatus fileStatus;
+ private S3AFileStatus fileStatus;
private Tristate isEmptyDirectory;
private boolean isDeleted;
@@ -43,24 +43,25 @@ public class PathMetadata extends ExpirableMetadata {
* @return the entry.
*/
public static PathMetadata tombstone(Path path) {
- long now = System.currentTimeMillis();
- FileStatus status = new FileStatus(0, false, 0, 0, now, path);
- return new PathMetadata(status, Tristate.UNKNOWN, true);
+ S3AFileStatus s3aStatus = new S3AFileStatus(0,
+ System.currentTimeMillis(), path, 0, null,
+ null, null);
+ return new PathMetadata(s3aStatus, Tristate.UNKNOWN, true);
}
/**
* Creates a new {@code PathMetadata} containing given {@code FileStatus}.
* @param fileStatus file status containing an absolute path.
*/
- public PathMetadata(FileStatus fileStatus) {
- this(fileStatus, Tristate.UNKNOWN);
+ public PathMetadata(S3AFileStatus fileStatus) {
+ this(fileStatus, Tristate.UNKNOWN, false);
}
- public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir) {
+ public PathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir) {
this(fileStatus, isEmptyDir, false);
}
- public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir, boolean
+ public PathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir, boolean
isDeleted) {
Preconditions.checkNotNull(fileStatus, "fileStatus must be non-null");
Preconditions.checkNotNull(fileStatus.getPath(), "fileStatus path must be" +
@@ -75,7 +76,7 @@ public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir, boolean
/**
* @return {@code FileStatus} contained in this {@code PathMetadata}.
*/
- public final FileStatus getFileStatus() {
+ public final S3AFileStatus getFileStatus() {
return fileStatus;
}
@@ -91,6 +92,7 @@ public Tristate isEmptyDirectory() {
void setIsEmptyDirectory(Tristate isEmptyDirectory) {
this.isEmptyDirectory = isEmptyDirectory;
+ fileStatus.setIsEmptyDirectory(isEmptyDirectory);
}
public boolean isDeleted() {
@@ -128,10 +130,11 @@ public String toString() {
* @param sb target StringBuilder
*/
public void prettyPrint(StringBuilder sb) {
- sb.append(String.format("%-5s %-20s %-7d %-8s %-6s",
+ sb.append(String.format("%-5s %-20s %-7d %-8s %-6s %-20s %-20s",
fileStatus.isDirectory() ? "dir" : "file",
fileStatus.getPath().toString(), fileStatus.getLen(),
- isEmptyDirectory.name(), isDeleted));
+ isEmptyDirectory.name(), isDeleted,
+ fileStatus.getETag(), fileStatus.getVersionId()));
sb.append(fileStatus);
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
index c6f70bf277..c9559ec151 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
@@ -40,9 +40,9 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.Tristate;
/**
@@ -70,6 +70,8 @@ final class PathMetadataDynamoDBTranslation {
static final String IS_DELETED = "is_deleted";
static final String IS_AUTHORITATIVE = "is_authoritative";
static final String LAST_UPDATED = "last_updated";
+ static final String ETAG = "etag";
+ static final String VERSION_ID = "version_id";
/** Used while testing backward compatibility. */
@VisibleForTesting
@@ -135,7 +137,7 @@ static DDBPathMetadata itemToPathMetadata(Item item, String username) {
boolean isDir = item.hasAttribute(IS_DIR) && item.getBoolean(IS_DIR);
boolean isAuthoritativeDir = false;
- final FileStatus fileStatus;
+ final S3AFileStatus fileStatus;
long lastUpdated = 0;
if (isDir) {
isAuthoritativeDir = !IGNORED_FIELDS.contains(IS_AUTHORITATIVE)
@@ -146,8 +148,10 @@ static DDBPathMetadata itemToPathMetadata(Item item, String username) {
long len = item.hasAttribute(FILE_LENGTH) ? item.getLong(FILE_LENGTH) : 0;
long modTime = item.hasAttribute(MOD_TIME) ? item.getLong(MOD_TIME) : 0;
long block = item.hasAttribute(BLOCK_SIZE) ? item.getLong(BLOCK_SIZE) : 0;
- fileStatus = new FileStatus(len, false, 1, block, modTime, 0, null,
- username, username, path);
+ String eTag = item.getString(ETAG);
+ String versionId = item.getString(VERSION_ID);
+ fileStatus = new S3AFileStatus(
+ len, modTime, path, block, username, eTag, versionId);
}
lastUpdated =
!IGNORED_FIELDS.contains(LAST_UPDATED)
@@ -172,7 +176,7 @@ static DDBPathMetadata itemToPathMetadata(Item item, String username) {
*/
static Item pathMetadataToItem(DDBPathMetadata meta) {
Preconditions.checkNotNull(meta);
- final FileStatus status = meta.getFileStatus();
+ final S3AFileStatus status = meta.getFileStatus();
final Item item = new Item().withPrimaryKey(pathToKey(status.getPath()));
if (status.isDirectory()) {
item.withBoolean(IS_DIR, true);
@@ -183,6 +187,12 @@ static Item pathMetadataToItem(DDBPathMetadata meta) {
item.withLong(FILE_LENGTH, status.getLen())
.withLong(MOD_TIME, status.getModificationTime())
.withLong(BLOCK_SIZE, status.getBlockSize());
+ if (status.getETag() != null) {
+ item.withString(ETAG, status.getETag());
+ }
+ if (status.getVersionId() != null) {
+ item.withString(VERSION_ID, status.getVersionId());
+ }
}
item.withBoolean(IS_DELETED, meta.isDeleted());
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index 8234777c3b..26c75e8213 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -68,7 +68,7 @@ public final class S3Guard {
static final Class extends DynamoDBClientFactory>
S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT =
DynamoDBClientFactory.DefaultDynamoDBClientFactory.class;
- private static final FileStatus[] EMPTY_LISTING = new FileStatus[0];
+ private static final S3AFileStatus[] EMPTY_LISTING = new S3AFileStatus[0];
// Utility class. All static functions.
private S3Guard() { }
@@ -164,7 +164,7 @@ public static S3AFileStatus putAndReturn(MetadataStore ms,
* @param dirMeta directory listing -may be null
* @return a possibly-empty array of file status entries
*/
- public static FileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
+ public static S3AFileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
if (dirMeta == null) {
return EMPTY_LISTING;
}
@@ -178,7 +178,7 @@ public static FileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
}
}
- return statuses.toArray(new FileStatus[0]);
+ return statuses.toArray(new S3AFileStatus[0]);
}
/**
@@ -201,7 +201,7 @@ public static FileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
* @throws IOException if metadata store update failed
*/
public static FileStatus[] dirListingUnion(MetadataStore ms, Path path,
- List backingStatuses, DirListingMetadata dirMeta,
+ List backingStatuses, DirListingMetadata dirMeta,
boolean isAuthoritative, ITtlTimeProvider timeProvider)
throws IOException {
@@ -232,7 +232,7 @@ public static FileStatus[] dirListingUnion(MetadataStore ms, Path path,
pm -> pm.getFileStatus().getPath(), PathMetadata::getFileStatus)
);
- for (FileStatus s : backingStatuses) {
+ for (S3AFileStatus s : backingStatuses) {
if (deleted.contains(s.getPath())) {
continue;
}
@@ -323,7 +323,7 @@ public static void makeDirsOrdered(MetadataStore ms, List dirs,
* [/a/b/file0, /a/b/file1, /a/b/file2, /a/b/file3], isAuthoritative =
* true
*/
- FileStatus prevStatus = null;
+ S3AFileStatus prevStatus = null;
// Use new batched put to reduce round trips.
List pathMetas = new ArrayList<>(dirs.size());
@@ -334,8 +334,8 @@ public static void makeDirsOrdered(MetadataStore ms, List dirs,
boolean isLeaf = (prevStatus == null);
Path f = dirs.get(i);
assertQualified(f);
- FileStatus status =
- createUploadFileStatus(f, true, 0, 0, owner);
+ S3AFileStatus status =
+ createUploadFileStatus(f, true, 0, 0, owner, null, null);
// We only need to put a DirListingMetadata if we are setting
// authoritative bit
@@ -383,7 +383,8 @@ public static void addMoveDir(MetadataStore ms, Collection srcPaths,
}
assertQualified(srcPath, dstPath);
- FileStatus dstStatus = createUploadFileStatus(dstPath, true, 0, 0, owner);
+ S3AFileStatus dstStatus = createUploadFileStatus(dstPath, true, 0,
+ 0, owner, null, null);
addMoveStatus(srcPaths, dstMetas, srcPath, dstStatus);
}
@@ -399,16 +400,18 @@ public static void addMoveDir(MetadataStore ms, Collection srcPaths,
* @param size length of file moved
* @param blockSize blocksize to associate with destination file
* @param owner file owner to use in created records
+ * @param eTag the s3 object eTag of file moved
+ * @param versionId the s3 object versionId of file moved
*/
public static void addMoveFile(MetadataStore ms, Collection srcPaths,
Collection dstMetas, Path srcPath, Path dstPath,
- long size, long blockSize, String owner) {
+ long size, long blockSize, String owner, String eTag, String versionId) {
if (isNullMetadataStore(ms)) {
return;
}
assertQualified(srcPath, dstPath);
- FileStatus dstStatus = createUploadFileStatus(dstPath, false,
- size, blockSize, owner);
+ S3AFileStatus dstStatus = createUploadFileStatus(dstPath, false,
+ size, blockSize, owner, eTag, versionId);
addMoveStatus(srcPaths, dstMetas, srcPath, dstStatus);
}
@@ -465,9 +468,8 @@ public static void addAncestors(MetadataStore metadataStore,
while (!parent.isRoot()) {
PathMetadata directory = metadataStore.get(parent);
if (directory == null || directory.isDeleted()) {
- FileStatus status = new FileStatus(0, true, 1, 0, 0, 0, null, username,
- null, parent);
- PathMetadata meta = new PathMetadata(status, Tristate.FALSE, false);
+ S3AFileStatus s3aStatus = new S3AFileStatus(Tristate.FALSE, parent, username);
+ PathMetadata meta = new PathMetadata(s3aStatus, Tristate.FALSE, false);
newDirs.add(meta);
} else {
break;
@@ -480,7 +482,7 @@ public static void addAncestors(MetadataStore metadataStore,
private static void addMoveStatus(Collection srcPaths,
Collection dstMetas,
Path srcPath,
- FileStatus dstStatus) {
+ S3AFileStatus dstStatus) {
srcPaths.add(srcPath);
dstMetas.add(new PathMetadata(dstStatus));
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 1ac167f5a6..448ea9213f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -44,12 +44,12 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.MultipartUtils;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
@@ -703,7 +703,7 @@ private void putParentsIfNotPresent(FileStatus f) throws IOException {
if (dirCache.contains(parent)) {
return;
}
- FileStatus dir = DynamoDBMetadataStore.makeDirStatus(parent,
+ S3AFileStatus dir = DynamoDBMetadataStore.makeDirStatus(parent,
f.getOwner());
getStore().put(new PathMetadata(dir));
dirCache.add(parent);
@@ -718,13 +718,13 @@ private void putParentsIfNotPresent(FileStatus f) throws IOException {
*/
private long importDir(FileStatus status) throws IOException {
Preconditions.checkArgument(status.isDirectory());
- RemoteIterator it = getFilesystem()
+ RemoteIterator it = getFilesystem()
.listFilesAndEmptyDirectories(status.getPath(), true);
long items = 0;
while (it.hasNext()) {
- LocatedFileStatus located = it.next();
- FileStatus child;
+ S3ALocatedFileStatus located = it.next();
+ S3AFileStatus child;
if (located.isDirectory()) {
child = DynamoDBMetadataStore.makeDirStatus(located.getPath(),
located.getOwner());
@@ -734,7 +734,9 @@ private long importDir(FileStatus status) throws IOException {
located.getModificationTime(),
located.getPath(),
located.getBlockSize(),
- located.getOwner());
+ located.getOwner(),
+ located.getETag(),
+ located.getVersionId());
}
putParentsIfNotPresent(child);
getStore().put(new PathMetadata(child));
@@ -761,7 +763,8 @@ public int run(String[] args, PrintStream out) throws Exception {
filePath = "/";
}
Path path = new Path(filePath);
- FileStatus status = getFilesystem().getFileStatus(path);
+ S3AFileStatus status = (S3AFileStatus) getFilesystem()
+ .getFileStatus(path);
try {
initMetadataStore(false);
@@ -1163,7 +1166,7 @@ public int run(String[] args, PrintStream out)
magic ? "is" : "is not");
println(out, "%nS3A Client");
-
+ printOption(out, "\tSigning Algorithm", SIGNING_ALGORITHM, "(unset)");
String endpoint = conf.getTrimmed(ENDPOINT, "");
println(out, "\tEndpoint: %s=%s",
ENDPOINT,
@@ -1172,6 +1175,10 @@ public int run(String[] args, PrintStream out)
printOption(out, "\tEncryption", SERVER_SIDE_ENCRYPTION_ALGORITHM,
"none");
printOption(out, "\tInput seek policy", INPUT_FADVISE, INPUT_FADV_NORMAL);
+ printOption(out, "\tChange Detection Source", CHANGE_DETECT_SOURCE,
+ CHANGE_DETECT_SOURCE_DEFAULT);
+ printOption(out, "\tChange Detection Mode", CHANGE_DETECT_MODE,
+ CHANGE_DETECT_MODE_DEFAULT);
// look at delegation token support
if (fs.getDelegationTokens().isPresent()) {
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
index 09e123d6ed..ef9c999359 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
@@ -690,10 +690,15 @@ Filesystem s3a://landsat-pds is not using S3Guard
The "magic" committer is not supported
S3A Client
- Endpoint: fs.s3a.endpoint=(unset)
+ Signing Algorithm: fs.s3a.signing-algorithm=(unset)
+ Endpoint: fs.s3a.endpoint=s3.amazonaws.com
Encryption: fs.s3a.server-side-encryption-algorithm=none
Input seek policy: fs.s3a.experimental.input.fadvise=normal
-2017-09-27 19:18:57,917 INFO util.ExitUtil: Exiting with status 46: 46: The magic committer is not enabled for s3a://landsat-pds
+ Change Detection Source: fs.s3a.change.detection.source=etag
+ Change Detection Mode: fs.s3a.change.detection.mode=server
+Delegation token support is disabled
+2019-05-17 13:53:38,245 [main] INFO util.ExitUtil (ExitUtil.java:terminate(210)) -
+ Exiting with status 46: 46: The magic committer is not enabled for s3a://landsat-pds
```
## Error message: "File being created has a magic path, but the filesystem has magic file support disabled:
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/delegation_tokens.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/delegation_tokens.md
index 30226f85eb..aad3f355b2 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/delegation_tokens.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/delegation_tokens.md
@@ -522,12 +522,15 @@ $ hadoop s3guard bucket-info s3a://landsat-pds/
Filesystem s3a://landsat-pds
Location: us-west-2
Filesystem s3a://landsat-pds is not using S3Guard
-The "magic" committer is supported
+The "magic" committer is not supported
S3A Client
+ Signing Algorithm: fs.s3a.signing-algorithm=(unset)
Endpoint: fs.s3a.endpoint=s3.amazonaws.com
Encryption: fs.s3a.server-side-encryption-algorithm=none
Input seek policy: fs.s3a.experimental.input.fadvise=normal
+ Change Detection Source: fs.s3a.change.detection.source=etag
+ Change Detection Mode: fs.s3a.change.detection.mode=server
Delegation Support enabled: token kind = S3ADelegationToken/Session
Hadoop security mode: SIMPLE
```
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
index bb09d576dc..a766abc616 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
@@ -644,9 +644,13 @@ Metadata Store Diagnostics:
The "magic" committer is supported
S3A Client
+ Signing Algorithm: fs.s3a.signing-algorithm=(unset)
Endpoint: fs.s3a.endpoint=s3-eu-west-1.amazonaws.com
Encryption: fs.s3a.server-side-encryption-algorithm=none
Input seek policy: fs.s3a.experimental.input.fadvise=normal
+ Change Detection Source: fs.s3a.change.detection.source=etag
+ Change Detection Mode: fs.s3a.change.detection.mode=server
+Delegation token support is disabled
```
This listing includes all the information about the table supplied from
@@ -1097,6 +1101,111 @@ based on usage information collected from previous days, and choosing a
combination of retry counts and an interval which allow for the clients to cope with
some throttling, but not to time-out other applications.
+## Read-After-Overwrite Consistency
+
+S3Guard provides read-after-overwrite consistency through ETags (default) or
+object versioning checked either on the server (default) or client. This works
+such that a reader reading a file after an overwrite either sees the new version
+of the file or an error. Without S3Guard, new readers may see the original
+version. Once S3 reaches eventual consistency, new readers will see the new
+version.
+
+Readers using S3Guard will usually see the new file version, but may
+in rare cases see `RemoteFileChangedException` instead. This would occur if
+an S3 object read cannot provide the version tracked in S3Guard metadata.
+
+S3Guard achieves this behavior by storing ETags and object version IDs in the
+S3Guard metadata store (e.g. DynamoDB). On opening a file, S3AFileSystem
+will look in S3 for the version of the file indicated by the ETag or object
+version ID stored in the metadata store. If that version is unavailable,
+`RemoteFileChangedException` is thrown. Whether ETag or version ID and
+server or client mode is used is determed by the
+[fs.s3a.change.detection configuration options](./index.html#Handling_Read-During-Overwrite).
+
+### No Versioning Metadata Available
+
+When the first S3AFileSystem clients are upgraded to a version of
+`S3AFileSystem` that contains these change tracking features, any existing
+S3Guard metadata will not contain ETags or object version IDs. Reads of files
+tracked in such S3Guard metadata will access whatever version of the file is
+available in S3 at the time of read. Only if the file is subsequently updated
+will S3Guard start tracking ETag and object version ID and as such generating
+`RemoteFileChangedException` if an inconsistency is detected.
+
+Similarly, when S3Guard metadata is pruned, S3Guard will no longer be able to
+detect an inconsistent read. S3Guard metadata should be retained for at least
+as long as the perceived possible read-after-overwrite temporary inconsistency
+window. That window is expected to be short, but there are no guarantees so it
+is at the administrator's discretion to weigh the risk.
+
+### Known Limitations
+
+#### S3 Select
+
+S3 Select does not provide a capability for server-side ETag or object
+version ID qualification. Whether `fs.s3a.change.detection.mode` is "client" or
+"server", S3Guard will cause a client-side check of the file version before
+opening the file with S3 Select. If the current version does not match the
+version tracked in S3Guard, `RemoteFileChangedException` is thrown.
+
+It is still possible that the S3 Select read will access a different version of
+the file, if the visible file version changes between the version check and
+the opening of the file. This can happen due to eventual consistency or
+an overwrite of the file between the version check and the open of the file.
+
+#### Rename
+
+Rename is implemented via copy in S3. With `fs.s3a.change.detection.mode` set
+to "client", a fully reliable mechansim for ensuring the copied content is the expected
+content is not possible. This is the case since there isn't necessarily a way
+to know the expected ETag or version ID to appear on the object resulting from
+the copy.
+
+Furthermore, if `fs.s3a.change.detection.mode` is "server" and a third-party S3
+implementation is used that doesn't honor the provided ETag or version ID,
+S3AFileSystem and S3Guard cannot detect it.
+
+When `fs.s3.change.detection.mode` is "client", a client-side check
+will be performed before the copy to ensure the current version of the file
+matches S3Guard metadata. If not, `RemoteFileChangedException` is thrown.
+Similar to as discussed with regard to S3 Select, this is not sufficient to
+guarantee that same version is the version copied.
+
+When `fs.s3.change.detection.mode` server, the expected version is also specified
+in the underlying S3 `CopyObjectRequest`. As long as the server honors it, the
+copied object will be correct.
+
+All this said, with the defaults of `fs.s3.change.detection.mode` of "server" and
+`fs.s3.change.detection.source` of "etag", when working with Amazon's S3, copy should in fact
+either copy the expected file version or, in the case of an eventual consistency
+anomaly, generate `RemoteFileChangedException`. The same should be true when
+`fs.s3.change.detection.source` = "versionid".
+
+#### Out of Sync Metadata
+
+The S3Guard version tracking metadata (ETag or object version ID) could become
+out of sync with the true current object metadata in S3. For example, S3Guard
+is still tracking v1 of some file after v2 has been written. This could occur
+for reasons such as a writer writing without utilizing S3Guard and/or
+S3AFileSystem or simply due to a write with S3AFileSystem and S3Guard that wrote
+successfully to S3, but failed in communication with S3Guard's metadata store
+(e.g. DynamoDB).
+
+If this happens, reads of the affected file(s) will result in
+`RemoteFileChangedException` until one of:
+
+* the S3Guard metadata is corrected out-of-band
+* the file is overwritten (causing an S3Guard metadata update)
+* the S3Guard metadata is pruned
+
+The S3Guard metadata for a file can be corrected with the `s3guard import`
+command as discussed above. The command can take a file URI instead of a
+bucket URI to correct the metadata for a single file. For example:
+
+```bash
+hadoop s3guard import [-meta URI] s3a://my-bucket/file-with-bad-metadata
+```
+
## Troubleshooting
### Error: `S3Guard table lacks version marker.`
@@ -1215,6 +1324,97 @@ sync.
See [Fail on Error](#fail-on-error) for more detail.
+### Error `RemoteFileChangedException`
+
+An exception like the following could occur for a couple of reasons:
+
+* the S3Guard metadata is out of sync with the true S3 metadata. For
+example, the S3Guard DynamoDB table is tracking a different ETag than the ETag
+shown in the exception. This may suggest the object was updated in S3 without
+involvement from S3Guard or there was a transient failure when S3Guard tried to
+write to DynamoDB.
+
+* S3 is exhibiting read-after-overwrite temporary inconsistency. The S3Guard
+metadata was updated with a new ETag during a recent write, but the current read
+is not seeing that ETag due to S3 eventual consistency. This exception prevents
+the reader from an inconsistent read where the reader sees an older version of
+the file.
+
+```
+org.apache.hadoop.fs.s3a.RemoteFileChangedException: open 's3a://my-bucket/test/file.txt':
+ Change reported by S3 while reading at position 0.
+ ETag 4e886e26c072fef250cfaf8037675405 was unavailable
+ at org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:167)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:207)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:355)
+ at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:195)
+ at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
+ at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
+ at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:193)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:215)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:348)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:381)
+ at java.io.FilterInputStream.read(FilterInputStream.java:83)
+```
+
+### Error `AWSClientIOException: copyFile` caused by `NullPointerException`
+
+The AWS SDK has an [issue](https://github.com/aws/aws-sdk-java/issues/1644)
+where it will throw a relatively generic `AmazonClientException` caused by
+`NullPointerException` when copying a file and specifying a precondition
+that cannot be met. This can bubble up from `S3AFileSystem.rename()`. It
+suggests that the file in S3 is inconsistent with the metadata in S3Guard.
+
+```
+org.apache.hadoop.fs.s3a.AWSClientIOException: copyFile(test/rename-eventually2.dat, test/dest2.dat) on test/rename-eventually2.dat: com.amazonaws.AmazonClientException: Unable to complete transfer: null: Unable to complete transfer: null
+ at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:201)
+ at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
+ at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:314)
+ at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:406)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:310)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:285)
+ at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:3034)
+ at org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:1258)
+ at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:1119)
+ at org.apache.hadoop.fs.s3a.ITestS3ARemoteFileChanged.lambda$testRenameEventuallyConsistentFile2$6(ITestS3ARemoteFileChanged.java:556)
+ at org.apache.hadoop.test.LambdaTestUtils.intercept(LambdaTestUtils.java:498)
+ at org.apache.hadoop.fs.s3a.ITestS3ARemoteFileChanged.testRenameEventuallyConsistentFile2(ITestS3ARemoteFileChanged.java:554)
+ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
+ at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
+ at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
+ at java.lang.reflect.Method.invoke(Method.java:498)
+ at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
+ at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
+ at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
+ at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
+ at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
+ at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
+ at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
+ at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
+ at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
+ at java.util.concurrent.FutureTask.run(FutureTask.java:266)
+ at java.lang.Thread.run(Thread.java:748)
+Caused by: com.amazonaws.AmazonClientException: Unable to complete transfer: null
+ at com.amazonaws.services.s3.transfer.internal.AbstractTransfer.unwrapExecutionException(AbstractTransfer.java:286)
+ at com.amazonaws.services.s3.transfer.internal.AbstractTransfer.rethrowExecutionException(AbstractTransfer.java:265)
+ at com.amazonaws.services.s3.transfer.internal.CopyImpl.waitForCopyResult(CopyImpl.java:67)
+ at org.apache.hadoop.fs.s3a.impl.CopyOutcome.waitForCopy(CopyOutcome.java:72)
+ at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFile$14(S3AFileSystem.java:3047)
+ at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
+ ... 25 more
+Caused by: java.lang.NullPointerException
+ at com.amazonaws.services.s3.transfer.internal.CopyCallable.copyInOneChunk(CopyCallable.java:154)
+ at com.amazonaws.services.s3.transfer.internal.CopyCallable.call(CopyCallable.java:134)
+ at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:132)
+ at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:43)
+ at java.util.concurrent.FutureTask.run(FutureTask.java:266)
+ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
+ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
+ ... 1 more
+```
+
## Other Topics
For details on how to test S3Guard, see [Testing S3Guard](./testing.html#s3guard)
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
index 707694c8c6..f61b46a8da 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
@@ -298,6 +298,24 @@ plugin:
```bash
mvn surefire-report:failsafe-report-only
```
+## Testing Versioned Stores
+
+Some tests (specifically some in `ITestS3ARemoteFileChanged`) require
+a versioned bucket for full test coverage as well as S3Guard being enabled.
+
+To enable versioning in a bucket.
+
+1. In the AWS S3 Management console find and select the bucket.
+1. In the Properties "tab", set it as versioned.
+1. Important Create a lifecycle rule to automatically clean up old versions
+after 24h. This avoids running up bills for objects which tests runs create and
+then delete.
+1. Run the tests again.
+
+Once a bucket is converted to being versioned, it cannot be converted back
+to being unversioned.
+
+
## Scale Tests
There are a set of tests designed to measure the scalability and performance
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index 3123221bd8..8cdac9e352 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -970,8 +970,8 @@ and the like. The standard strategy here is to save to HDFS and then copy to S3.
```
org.apache.hadoop.fs.s3a.RemoteFileChangedException: re-open `s3a://my-bucket/test/file.txt':
- ETag change reported by S3 while reading at position 1949.
- Version f9c186d787d4de9657e99f280ba26555 was unavailable
+ Change reported by S3 while reading at position 1949.
+ ETag f9c186d787d4de9657e99f280ba26555 was unavailable
at org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:137)
at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:200)
at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:346)
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
index 03c91e62ce..886795a9d9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
@@ -56,19 +56,26 @@ public abstract class AbstractS3AMockTest {
@Before
public void setup() throws Exception {
+ Configuration conf = createConfiguration();
+ fs = new S3AFileSystem();
+ URI uri = URI.create(FS_S3A + "://" + BUCKET);
+ fs.initialize(uri, conf);
+ s3 = fs.getAmazonS3ClientForTesting("mocking");
+ }
+
+ public Configuration createConfiguration() {
Configuration conf = new Configuration();
conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
S3ClientFactory.class);
- // We explicitly disable MetadataStore even if it's configured. For unit
+ // We explicitly disable MetadataStore. For unit
// test we don't issue request to AWS DynamoDB service.
conf.setClass(S3_METADATA_STORE_IMPL, NullMetadataStore.class,
MetadataStore.class);
// FS is always magic
conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
- fs = new S3AFileSystem();
- URI uri = URI.create(FS_S3A + "://" + BUCKET);
- fs.initialize(uri, conf);
- s3 = fs.getAmazonS3ClientForTesting("mocking");
+ // use minimum multipart size for faster triggering
+ conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+ return conf;
}
@After
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
index 7abd474976..870172ec3e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
@@ -22,7 +22,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
import org.apache.hadoop.test.LambdaTestUtils;
+
+import org.junit.Assume;
import org.junit.Test;
import java.io.FileNotFoundException;
@@ -43,6 +47,12 @@ public class ITestS3ADelayedFNF extends AbstractS3ATestBase {
@Test
public void testNotFoundFirstRead() throws Exception {
FileSystem fs = getFileSystem();
+ ChangeDetectionPolicy changeDetectionPolicy =
+ ((S3AFileSystem) fs).getChangeDetectionPolicy();
+ Assume.assumeFalse("FNF not expected when using a bucket with"
+ + " object versioning",
+ changeDetectionPolicy.getSource() == Source.VersionId);
+
Path p = path("some-file");
ContractTestUtils.createFile(fs, p, false, new byte[] {20, 21, 22});
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
index 6ac803e308..da671030c2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
@@ -24,9 +24,13 @@
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
import org.apache.hadoop.test.LambdaTestUtils;
+
+import org.junit.Assume;
import org.junit.Test;
import java.io.FileNotFoundException;
@@ -106,6 +110,12 @@ public void testGetFileStatus() throws Exception {
@Test
public void testOpenDeleteRead() throws Exception {
S3AFileSystem fs = getFileSystem();
+ ChangeDetectionPolicy changeDetectionPolicy =
+ ((S3AFileSystem) fs).getChangeDetectionPolicy();
+ Assume.assumeFalse("FNF not expected when using a bucket with"
+ + " object versioning",
+ changeDetectionPolicy.getSource() == Source.VersionId);
+
Path p = path("testOpenDeleteRead.txt");
writeTextFile(fs, p, "1337c0d3z", true);
try (InputStream s = fs.open(p)) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
index 98dd2026f5..c345a1f9da 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
@@ -19,77 +19,258 @@
package org.apache.hadoop.fs.s3a;
import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Optional;
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.CopyObjectResult;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.google.common.base.Charsets;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Mode;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
+import org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readUTF8;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.CHANGE_DETECTED;
+import static org.apache.hadoop.fs.s3a.select.SelectConstants.S3_SELECT_CAPABILITY;
+import static org.apache.hadoop.fs.s3a.select.SelectConstants.SELECT_SQL;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
/**
* Test S3A remote file change detection.
+ * This is a very parameterized test; the first three parameters
+ * define configuration options for the tests, while the final one
+ * declares the expected outcomes given those options.
+ *
+ * This test uses mocking to insert transient failures into the S3 client,
+ * underneath the S3A Filesystem instance.
+ *
+ * This is used to simulate eventual consistency, so force the change policy
+ * failure modes to be encountered.
+ *
+ * If changes are made to the filesystem such that the number of calls to
+ * operations such as {@link S3AFileSystem#getObjectMetadata(Path)} are
+ * changed, the number of failures which the mock layer must generate may
+ * change.
+ *
+ * As the S3Guard auth mode flag does control whether or not a HEAD is issued
+ * in a call to {@code getFileStatus()}; the test parameter {@link #authMode}
+ * is used to help predict this count.
+ *
+ * Important: if you are seeing failures in this test after changing
+ * one of the rename/copy/open operations, it may be that an increase,
+ * decrease or change in the number of low-level S3 HEAD/GET operations is
+ * triggering the failures.
+ * Please review the changes to see that you haven't unintentionally done this.
+ * If it is intentional, please update the parameters here.
+ *
+ * If you are seeing failures without such a change, and nobody else is,
+ * it is likely that you have a different bucket configuration option which
+ * is somehow triggering a regression. If you can work out which option
+ * this is, then extend {@link #createConfiguration()} to reset that parameter
+ * too.
+ *
+ * Note: to help debug these issues, set the log for this to DEBUG:
+ *