diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml index 2615566654..855aac974c 100644 --- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml @@ -28,4 +28,40 @@ + + + + + + + + + + + + + + + + + + + + + + + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/FailureInjectionPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/FailureInjectionPolicy.java new file mode 100644 index 0000000000..8cd6036b0c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/FailureInjectionPolicy.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import static org.apache.hadoop.fs.s3a.Constants.*; + +/** + * Simple object which stores current failure injection settings. + * "Delaying a key" can mean: + * - Removing it from the S3 client's listings while delay is in effect. + * - Causing input stream reads to fail. + * - Causing the S3 side of getFileStatus(), i.e. + * AmazonS3#getObjectMetadata(), to throw FileNotFound. + */ +public class FailureInjectionPolicy { + /** + * Keys containing this substring will be subject to delayed visibility. + */ + public static final String DEFAULT_DELAY_KEY_SUBSTRING = "DELAY_LISTING_ME"; + + /** + * How many seconds affected keys will have delayed visibility. + * This should probably be a config value. + */ + public static final long DEFAULT_DELAY_KEY_MSEC = 5 * 1000; + + public static final float DEFAULT_DELAY_KEY_PROBABILITY = 1.0f; + + /** Special config value since we can't store empty strings in XML. */ + public static final String MATCH_ALL_KEYS = "*"; + + private static final Logger LOG = + LoggerFactory.getLogger(InconsistentAmazonS3Client.class); + + /** Empty string matches all keys. */ + private String delayKeySubstring; + + /** Probability to delay visibility of a matching key. */ + private float delayKeyProbability; + + /** Time in milliseconds to delay visibility of newly modified object. */ + private long delayKeyMsec; + + /** + * Probability of throttling a request. + */ + private float throttleProbability; + + /** + * limit for failures before operations succeed; if 0 then "no limit". + */ + private int failureLimit = 0; + + public FailureInjectionPolicy(Configuration conf) { + + this.delayKeySubstring = conf.get(FAIL_INJECT_INCONSISTENCY_KEY, + DEFAULT_DELAY_KEY_SUBSTRING); + // "" is a substring of all strings, use it to match all keys. + if (this.delayKeySubstring.equals(MATCH_ALL_KEYS)) { + this.delayKeySubstring = ""; + } + this.delayKeyProbability = validProbability( + conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, + DEFAULT_DELAY_KEY_PROBABILITY)); + this.delayKeyMsec = conf.getLong(FAIL_INJECT_INCONSISTENCY_MSEC, + DEFAULT_DELAY_KEY_MSEC); + this.setThrottleProbability(conf.getFloat(FAIL_INJECT_THROTTLE_PROBABILITY, + 0.0f)); + } + + public String getDelayKeySubstring() { + return delayKeySubstring; + } + + public float getDelayKeyProbability() { + return delayKeyProbability; + } + + public long getDelayKeyMsec() { + return delayKeyMsec; + } + + public float getThrottleProbability() { + return throttleProbability; + } + + public int getFailureLimit() { + return failureLimit; + } + + public void setFailureLimit(int failureLimit) { + this.failureLimit = failureLimit; + } + + /** + * Set the probability of throttling a request. + * @param throttleProbability the probability of a request being throttled. + */ + public void setThrottleProbability(float throttleProbability) { + this.throttleProbability = validProbability(throttleProbability); + } + + public static boolean trueWithProbability(float p) { + return Math.random() < p; + } + + /** + * Should we delay listing visibility for this key? + * @param key key which is being put + * @return true if we should delay + */ + public boolean shouldDelay(String key) { + float p = getDelayKeyProbability(); + boolean delay = key.contains(getDelayKeySubstring()); + delay = delay && trueWithProbability(p); + LOG.debug("{}, p={} -> {}", key, p, delay); + return delay; + } + + @Override + public String toString() { + return String.format("FailureInjectionPolicy:" + + " %s msec delay, substring %s, delay probability %s;" + + " throttle probability %s" + "; failure limit %d", + delayKeyMsec, delayKeySubstring, delayKeyProbability, + throttleProbability, failureLimit); + } + + /** + * Validate a probability option. + * @param p probability + * @return the probability, if valid + * @throws IllegalArgumentException if the probability is out of range. + */ + private static float validProbability(float p) { + Preconditions.checkArgument(p >= 0.0f && p <= 1.0f, + "Probability out of range 0 to 1 %s", p); + return p; + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java index d15806146c..99ed87da8c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java @@ -38,6 +38,7 @@ import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsResult; +import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; @@ -48,6 +49,7 @@ import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; @@ -60,8 +62,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import static org.apache.hadoop.fs.s3a.Constants.*; - /** * A wrapper around {@link com.amazonaws.services.s3.AmazonS3} that injects * inconsistency and/or errors. Used for testing S3Guard. @@ -71,49 +71,16 @@ @InterfaceStability.Unstable public class InconsistentAmazonS3Client extends AmazonS3Client { - /** - * Keys containing this substring will be subject to delayed visibility. - */ - public static final String DEFAULT_DELAY_KEY_SUBSTRING = "DELAY_LISTING_ME"; - - /** - * How many seconds affected keys will be delayed from appearing in listing. - * This should probably be a config value. - */ - public static final long DEFAULT_DELAY_KEY_MSEC = 5 * 1000; - - public static final float DEFAULT_DELAY_KEY_PROBABILITY = 1.0f; - - /** Special config value since we can't store empty strings in XML. */ - public static final String MATCH_ALL_KEYS = "*"; - private static final Logger LOG = LoggerFactory.getLogger(InconsistentAmazonS3Client.class); - /** Empty string matches all keys. */ - private String delayKeySubstring; - - /** Probability to delay visibility of a matching key. */ - private float delayKeyProbability; - - /** Time in milliseconds to delay visibility of newly modified object. */ - private long delayKeyMsec; - - /** - * Probability of throttling a request. - */ - private float throttleProbability; + private FailureInjectionPolicy policy; /** * Counter of failures since last reset. */ private final AtomicLong failureCounter = new AtomicLong(0); - /** - * limit for failures before operations succeed; if 0 then "no limit". - */ - private int failureLimit = 0; - /** * Composite of data we need to track about recently deleted objects: * when it was deleted (same was with recently put objects) and the object @@ -150,36 +117,42 @@ public S3ObjectSummary summary() { public InconsistentAmazonS3Client(AWSCredentialsProvider credentials, ClientConfiguration clientConfiguration, Configuration conf) { super(credentials, clientConfiguration); - setupConfig(conf); + policy = new FailureInjectionPolicy(conf); } - protected void setupConfig(Configuration conf) { - delayKeySubstring = conf.get(FAIL_INJECT_INCONSISTENCY_KEY, - DEFAULT_DELAY_KEY_SUBSTRING); - // "" is a substring of all strings, use it to match all keys. - if (delayKeySubstring.equals(MATCH_ALL_KEYS)) { - delayKeySubstring = ""; - } - delayKeyProbability = validProbability( - conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, - DEFAULT_DELAY_KEY_PROBABILITY)); - delayKeyMsec = conf.getLong(FAIL_INJECT_INCONSISTENCY_MSEC, - DEFAULT_DELAY_KEY_MSEC); - setThrottleProbability(conf.getFloat(FAIL_INJECT_THROTTLE_PROBABILITY, - 0.0f)); - LOG.info("{}", this); + /** + * Clear any accumulated inconsistency state. Used by tests to make paths + * visible again. + * @param fs S3AFileSystem under test + * @throws Exception on failure + */ + public static void clearInconsistency(S3AFileSystem fs) throws Exception { + AmazonS3 s3 = fs.getAmazonS3ClientForTesting("s3guard"); + InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3); + ic.clearInconsistency(); + } + + /** + * A way for tests to patch in a different fault injection policy at runtime. + * @param fs filesystem under test + * + */ + public static void setFailureInjectionPolicy(S3AFileSystem fs, + FailureInjectionPolicy policy) throws Exception { + AmazonS3 s3 = fs.getAmazonS3ClientForTesting("s3guard"); + InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3); + ic.replacePolicy(policy); + } + + private void replacePolicy(FailureInjectionPolicy pol) { + this.policy = pol; } @Override public String toString() { - return String.format( - "Inconsistent S3 Client with" - + " %s msec delay, substring %s, delay probability %s;" - + " throttle probability %s" - + "; failure limit %d, failure count %d", - delayKeyMsec, delayKeySubstring, delayKeyProbability, - throttleProbability, failureLimit, failureCounter.get()); + return String.format("Inconsistent S3 Client: %s; failure count %d", + policy, failureCounter.get()); } /** @@ -470,7 +443,7 @@ private boolean isKeyDelayed(Long enqueueTime, String key) { return false; } long currentTime = System.currentTimeMillis(); - long deadline = enqueueTime + delayKeyMsec; + long deadline = enqueueTime + policy.getDelayKeyMsec(); if (currentTime >= deadline) { delayedDeletes.remove(key); LOG.debug("no longer delaying {}", key); @@ -482,7 +455,7 @@ private boolean isKeyDelayed(Long enqueueTime, String key) { } private void registerDeleteObject(String key, String bucket) { - if (shouldDelay(key)) { + if (policy.shouldDelay(key)) { // Record summary so we can add it back for some time post-deletion ListObjectsRequest request = new ListObjectsRequest() .withBucketName(bucket) @@ -498,28 +471,11 @@ private void registerDeleteObject(String key, String bucket) { private void registerPutObject(PutObjectRequest req) { String key = req.getKey(); - if (shouldDelay(key)) { + if (policy.shouldDelay(key)) { enqueueDelayedPut(key); } } - /** - * Should we delay listing visibility for this key? - * @param key key which is being put - * @return true if we should delay - */ - private boolean shouldDelay(String key) { - boolean delay = key.contains(delayKeySubstring); - delay = delay && trueWithProbability(delayKeyProbability); - LOG.debug("{} -> {}", key, delay); - return delay; - } - - - private boolean trueWithProbability(float p) { - return Math.random() < p; - } - /** * Record this key as something that should not become visible in * listObject replies for a while, to simulate eventual list consistency. @@ -561,20 +517,8 @@ public MultipartUploadListing listMultipartUploads( return super.listMultipartUploads(listMultipartUploadsRequest); } - public float getDelayKeyProbability() { - return delayKeyProbability; - } - public long getDelayKeyMsec() { - return delayKeyMsec; - } - - /** - * Get the probability of the request being throttled. - * @return a value 0 - 1.0f. - */ - public float getThrottleProbability() { - return throttleProbability; + return policy.getDelayKeyMsec(); } /** @@ -582,36 +526,28 @@ public float getThrottleProbability() { * @param throttleProbability the probability of a request being throttled. */ public void setThrottleProbability(float throttleProbability) { - this.throttleProbability = validProbability(throttleProbability); - } - - /** - * Validate a probability option. - * @param p probability - * @return the probability, if valid - * @throws IllegalArgumentException if the probability is out of range. - */ - private float validProbability(float p) { - Preconditions.checkArgument(p >= 0.0f && p <= 1.0f, - "Probability out of range 0 to 1 %s", p); - return p; + policy.setThrottleProbability(throttleProbability); } /** * Conditionally fail the operation. + * @param errorMsg description of failure + * @param statusCode http status code for error * @throws AmazonClientException if the client chooses to fail * the request. */ - private void maybeFail() throws AmazonClientException { + private void maybeFail(String errorMsg, int statusCode) + throws AmazonClientException { // code structure here is to line up for more failures later AmazonServiceException ex = null; - if (trueWithProbability(throttleProbability)) { + if (policy.trueWithProbability(policy.getThrottleProbability())) { // throttle the request - ex = new AmazonServiceException("throttled" + ex = new AmazonServiceException(errorMsg + " count = " + (failureCounter.get() + 1), null); - ex.setStatusCode(503); + ex.setStatusCode(statusCode); } + int failureLimit = policy.getFailureLimit(); if (ex != null) { long count = failureCounter.incrementAndGet(); if (failureLimit == 0 @@ -621,16 +557,37 @@ private void maybeFail() throws AmazonClientException { } } + private void maybeFail() { + maybeFail("throttled", 503); + } + /** * Set the limit on failures before all operations pass through. * This resets the failure count. * @param limit limit; "0" means "no limit" */ public void setFailureLimit(int limit) { - this.failureLimit = limit; + policy.setFailureLimit(limit); failureCounter.set(0); } + @Override + public S3Object getObject(GetObjectRequest var1) throws SdkClientException, + AmazonServiceException { + maybeFail("file not found", 404); + S3Object o = super.getObject(var1); + LOG.debug("Wrapping in InconsistentS3Object for key {}", var1.getKey()); + return new InconsistentS3Object(o, policy); + } + + @Override + public S3Object getObject(String bucketName, String key) + throws SdkClientException, AmazonServiceException { + S3Object o = super.getObject(bucketName, key); + LOG.debug("Wrapping in InconsistentS3Object for key {}", key); + return new InconsistentS3Object(o, policy); + } + /** Since ObjectListing is immutable, we just override it with wrapper. */ @SuppressWarnings("serial") private static class CustomObjectListing extends ObjectListing { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3Object.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3Object.java new file mode 100644 index 0000000000..496ca1b908 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3Object.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; + +import com.amazonaws.services.s3.internal.AmazonS3ExceptionBuilder; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper around S3Object so we can do failure injection on + * getObjectContent() and S3ObjectInputStream. + * See also {@link InconsistentAmazonS3Client}. + */ +@SuppressWarnings({"NonSerializableFieldInSerializableClass", "serial"}) +public class InconsistentS3Object extends S3Object { + + // This should be configurable, probably. + public static final int MAX_READ_FAILURES = 100; + + private static int readFailureCounter = 0; + private transient S3Object wrapped; + private transient FailureInjectionPolicy policy; + private final static transient Logger LOG = LoggerFactory.getLogger( + InconsistentS3Object.class); + + public InconsistentS3Object(S3Object wrapped, FailureInjectionPolicy policy) { + this.wrapped = wrapped; + this.policy = policy; + } + + @Override + public S3ObjectInputStream getObjectContent() { + return new InconsistentS3InputStream(wrapped.getObjectContent()); + } + + @Override + public String toString() { + return "InconsistentS3Object wrapping: " + wrapped.toString(); + } + + @Override + public ObjectMetadata getObjectMetadata() { + return wrapped.getObjectMetadata(); + } + + @Override + public void setObjectMetadata(ObjectMetadata metadata) { + wrapped.setObjectMetadata(metadata); + } + + @Override + public void setObjectContent(S3ObjectInputStream objectContent) { + wrapped.setObjectContent(objectContent); + } + + @Override + public void setObjectContent(InputStream objectContent) { + wrapped.setObjectContent(objectContent); + } + + @Override + public String getBucketName() { + return wrapped.getBucketName(); + } + + @Override + public void setBucketName(String bucketName) { + wrapped.setBucketName(bucketName); + } + + @Override + public String getKey() { + return wrapped.getKey(); + } + + @Override + public void setKey(String key) { + wrapped.setKey(key); + } + + @Override + public String getRedirectLocation() { + return wrapped.getRedirectLocation(); + } + + @Override + public void setRedirectLocation(String redirectLocation) { + wrapped.setRedirectLocation(redirectLocation); + } + + @Override + public Integer getTaggingCount() { + return wrapped.getTaggingCount(); + } + + @Override + public void setTaggingCount(Integer taggingCount) { + wrapped.setTaggingCount(taggingCount); + } + + @Override + public void close() throws IOException { + wrapped.close(); + } + + @Override + public boolean isRequesterCharged() { + return wrapped.isRequesterCharged(); + } + + @Override + public void setRequesterCharged(boolean isRequesterCharged) { + wrapped.setRequesterCharged(isRequesterCharged); + } + + private AmazonS3Exception mockException(String msg, int httpResponse) { + AmazonS3ExceptionBuilder builder = new AmazonS3ExceptionBuilder(); + builder.setErrorMessage(msg); + builder.setStatusCode(httpResponse); // this is the important part + builder.setErrorCode(String.valueOf(httpResponse)); + return builder.build(); + } + + /** + * Insert a failiure injection point for a read call. + * @throw IOException, as codepath is on InputStream, not other SDK call. + */ + private void readFailpoint(int off, int len) throws IOException { + if (shouldInjectFailure(getKey())) { + String error = String.format( + "read(b, %d, %d) on key %s failed: injecting error %d/%d" + + " for test.", off, len, getKey(), readFailureCounter, + MAX_READ_FAILURES); + throw new FileNotFoundException(error); + } + } + + /** + * Insert a failiure injection point for an InputStream skip() call. + * @throw IOException, as codepath is on InputStream, not other SDK call. + */ + private void skipFailpoint(long len) throws IOException { + if (shouldInjectFailure(getKey())) { + String error = String.format( + "skip(%d) on key %s failed: injecting error %d/%d for test.", + len, getKey(), readFailureCounter, MAX_READ_FAILURES); + throw new FileNotFoundException(error); + } + } + + private boolean shouldInjectFailure(String key) { + if (policy.shouldDelay(key) && + readFailureCounter < MAX_READ_FAILURES) { + readFailureCounter++; + return true; + } + return false; + } + + /** + * Wraps S3ObjectInputStream and implements failure injection. + */ + protected class InconsistentS3InputStream extends S3ObjectInputStream { + private S3ObjectInputStream wrapped; + + public InconsistentS3InputStream(S3ObjectInputStream wrapped) { + // seems awkward to have the stream wrap itself. + super(wrapped, wrapped.getHttpRequest()); + this.wrapped = wrapped; + } + + @Override + public void abort() { + wrapped.abort(); + } + + @Override + public int available() throws IOException { + return wrapped.available(); + } + + @Override + public void close() throws IOException { + wrapped.close(); + } + + @Override + public long skip(long n) throws IOException { + skipFailpoint(n); + return wrapped.skip(n); + } + + @Override + public int read() throws IOException { + LOG.debug("read() for key {}", getKey()); + readFailpoint(0, 1); + return wrapped.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + LOG.debug("read(b, {}, {}) for key {}", off, len, getKey()); + readFailpoint(off, len); + return wrapped.read(b, off, len); + } + + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java index 875948eb85..a007ba156a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java @@ -310,6 +310,9 @@ public T retryUntranslated( boolean shouldRetry; do { try { + if (retryCount > 0) { + LOG.debug("retry #{}", retryCount); + } // execute the operation, returning if successful return operation.execute(); } catch (IOException | SdkBaseException e) { @@ -327,8 +330,6 @@ public T retryUntranslated( (SdkBaseException)caught); } - - int attempts = retryCount + 1; try { // decide action base on operation, invocation count, etc retryAction = retryPolicy.shouldRetry(translated, retryCount, 0, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 99901ba3b0..4424ebaa5e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -166,6 +166,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { // APIs on an uninitialized filesystem. private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT); + // Only used for very specific code paths which behave differently for + // S3Guard. Retries FileNotFound, so be careful if you use this. + private Invoker s3guardInvoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, + Invoker.LOG_EVENT); private final Retried onRetry = this::operationRetried; private String bucket; private int maxKeys; @@ -251,6 +255,8 @@ public void initialize(URI name, Configuration originalConf) s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf) .createS3Client(name); invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry); + s3guardInvoker = new Invoker(new S3GuardExistsRetryPolicy(getConf()), + onRetry); writeHelper = new WriteOperationHelper(this, getConf()); maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); @@ -697,18 +703,20 @@ public FSDataInputStream open(Path f, int bufferSize) } return new FSDataInputStream( - new S3AInputStream(new S3ObjectAttributes( - bucket, - pathToKey(f), - serverSideEncryptionAlgorithm, - getServerSideEncryptionKey(bucket, getConf())), - fileStatus.getLen(), - s3, + new S3AInputStream(new S3AReadOpContext(hasMetadataStore(), + invoker, + s3guardInvoker, statistics, instrumentation, + fileStatus), + new S3ObjectAttributes(bucket, + pathToKey(f), + serverSideEncryptionAlgorithm, + getServerSideEncryptionKey(bucket, getConf())), + fileStatus.getLen(), + s3, readAhead, - inputPolicy, - invoker)); + inputPolicy)); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 00741437a7..c54d3e2621 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,10 +71,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { */ private volatile boolean closed; private S3ObjectInputStream wrappedStream; - private final FileSystem.Statistics stats; + private final S3AReadOpContext context; private final AmazonS3 client; private final String bucket; private final String key; + private final String pathStr; private final long contentLength; private final String uri; private static final Logger LOG = @@ -85,7 +85,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { private String serverSideEncryptionKey; private S3AInputPolicy inputPolicy; private long readahead = Constants.DEFAULT_READAHEAD_RANGE; - private final Invoker invoker; /** * This is the actual position within the object, used by @@ -108,40 +107,33 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { * Create the stream. * This does not attempt to open it; that is only done on the first * actual read() operation. + * @param ctx operation context * @param s3Attributes object attributes from a HEAD request * @param contentLength length of content * @param client S3 client to use - * @param stats statistics to update - * @param instrumentation instrumentation to update * @param readahead readahead bytes * @param inputPolicy IO policy - * @param invoker preconfigured invoker */ - public S3AInputStream(S3ObjectAttributes s3Attributes, - long contentLength, - AmazonS3 client, - FileSystem.Statistics stats, - S3AInstrumentation instrumentation, - long readahead, - S3AInputPolicy inputPolicy, - Invoker invoker) { + public S3AInputStream(S3AReadOpContext ctx, S3ObjectAttributes s3Attributes, + long contentLength, AmazonS3 client, long readahead, + S3AInputPolicy inputPolicy) { Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), "No Bucket"); Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key"); Preconditions.checkArgument(contentLength >= 0, "Negative content length"); + this.context = ctx; this.bucket = s3Attributes.getBucket(); this.key = s3Attributes.getKey(); + this.pathStr = ctx.dstFileStatus.getPath().toString(); this.contentLength = contentLength; this.client = client; - this.stats = stats; this.uri = "s3a://" + this.bucket + "/" + this.key; - this.streamStatistics = instrumentation.newInputStreamStatistics(); + this.streamStatistics = ctx.instrumentation.newInputStreamStatistics(); this.serverSideEncryptionAlgorithm = s3Attributes.getServerSideEncryptionAlgorithm(); this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey(); setInputPolicy(inputPolicy); setReadahead(readahead); - this.invoker = invoker; } /** @@ -162,6 +154,7 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) { * @param length length requested * @throws IOException on any failure to open the object */ + @Retries.OnceTranslated private synchronized void reopen(String reason, long targetPos, long length) throws IOException { @@ -185,7 +178,7 @@ private synchronized void reopen(String reason, long targetPos, long length) } String text = String.format("Failed to %s %s at %d", (opencount == 0 ? "open" : "re-open"), uri, targetPos); - S3Object object = invoker.retry(text, uri, true, + S3Object object = context.getReadInvoker().once(text, uri, () -> client.getObject(request)); wrappedStream = object.getObjectContent(); contentRangeStart = targetPos; @@ -241,6 +234,7 @@ private void seekQuietly(long positiveTargetPos) { * @param length length of content that needs to be read from targetPos * @throws IOException */ + @Retries.OnceTranslated private void seekInStream(long targetPos, long length) throws IOException { checkNotClosed(); if (wrappedStream == null) { @@ -317,14 +311,22 @@ public boolean seekToNewSource(long targetPos) throws IOException { * @param targetPos position from where data should be read * @param len length of the content that needs to be read */ + @Retries.RetryTranslated private void lazySeek(long targetPos, long len) throws IOException { - //For lazy seek - seekInStream(targetPos, len); - //re-open at specific location if needed - if (wrappedStream == null) { - reopen("read from new offset", targetPos, len); - } + // With S3Guard, the metadatastore gave us metadata for the file in + // open(), so we use a slightly different retry policy. + Invoker invoker = context.getReadInvoker(); + invoker.retry("lazySeek", pathStr, true, + () -> { + //For lazy seek + seekInStream(targetPos, len); + + //re-open at specific location if needed + if (wrappedStream == null) { + reopen("read from new offset", targetPos, len); + } + }); } /** @@ -334,29 +336,44 @@ private void lazySeek(long targetPos, long len) throws IOException { */ private void incrementBytesRead(long bytesRead) { streamStatistics.bytesRead(bytesRead); - if (stats != null && bytesRead > 0) { - stats.incrementBytesRead(bytesRead); + if (context.stats != null && bytesRead > 0) { + context.stats.incrementBytesRead(bytesRead); } } @Override + @Retries.RetryTranslated // Some retries only happen w/ S3Guard, as intended. public synchronized int read() throws IOException { checkNotClosed(); if (this.contentLength == 0 || (nextReadPos >= contentLength)) { return -1; } - int byteRead; try { lazySeek(nextReadPos, 1); - byteRead = wrappedStream.read(); } catch (EOFException e) { return -1; - } catch (IOException e) { - onReadFailure(e, 1); - byteRead = wrappedStream.read(); } + // With S3Guard, the metadatastore gave us metadata for the file in + // open(), so we use a slightly different retry policy. + // read() may not be likely to fail, but reopen() does a GET which + // certainly could. + Invoker invoker = context.getReadInvoker(); + int byteRead = invoker.retry("read", pathStr, true, + () -> { + int b; + try { + b = wrappedStream.read(); + } catch (EOFException e) { + return -1; + } catch (IOException e) { + onReadFailure(e, 1); + b = wrappedStream.read(); + } + return b; + }); + if (byteRead >= 0) { pos++; nextReadPos++; @@ -375,10 +392,11 @@ public synchronized int read() throws IOException { * @param length length of data being attempted to read * @throws IOException any exception thrown on the re-open attempt. */ + @Retries.OnceTranslated private void onReadFailure(IOException ioe, int length) throws IOException { - LOG.info("Got exception while trying to read from stream {}" - + " trying to recover: "+ ioe, uri); - LOG.debug("While trying to read from stream {}", uri, ioe); + + LOG.info("Got exception while trying to read from stream {}" + + " trying to recover: " + ioe, uri); streamStatistics.readException(); reopen("failure recovery", pos, length); } @@ -392,6 +410,7 @@ private void onReadFailure(IOException ioe, int length) throws IOException { * @throws IOException if there are other problems */ @Override + @Retries.RetryTranslated // Some retries only happen w/ S3Guard, as intended. public synchronized int read(byte[] buf, int off, int len) throws IOException { checkNotClosed(); @@ -412,18 +431,27 @@ public synchronized int read(byte[] buf, int off, int len) return -1; } - int bytesRead; - try { - streamStatistics.readOperationStarted(nextReadPos, len); - bytesRead = wrappedStream.read(buf, off, len); - } catch (EOFException e) { - onReadFailure(e, len); - // the base implementation swallows EOFs. - return -1; - } catch (IOException e) { - onReadFailure(e, len); - bytesRead = wrappedStream.read(buf, off, len); - } + // With S3Guard, the metadatastore gave us metadata for the file in + // open(), so we use a slightly different retry policy. + // read() may not be likely to fail, but reopen() does a GET which + // certainly could. + Invoker invoker = context.getReadInvoker(); + + streamStatistics.readOperationStarted(nextReadPos, len); + int bytesRead = invoker.retry("read", pathStr, true, + () -> { + int bytes; + try { + bytes = wrappedStream.read(buf, off, len); + } catch (EOFException e) { + // the base implementation swallows EOFs. + return -1; + } catch (IOException e) { + onReadFailure(e, len); + bytes= wrappedStream.read(buf, off, len); + } + return bytes; + }); if (bytesRead > 0) { pos += bytesRead; @@ -481,6 +509,7 @@ public synchronized void close() throws IOException { * @param length length of the stream. * @param forceAbort force an abort; used if explicitly requested. */ + @Retries.OnceRaw private void closeStream(String reason, long length, boolean forceAbort) { if (wrappedStream != null) { @@ -645,6 +674,7 @@ public String toString() { * */ @Override + @Retries.RetryTranslated // Some retries only happen w/ S3Guard, as intended. public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { checkNotClosed(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOpContext.java new file mode 100644 index 0000000000..fba39b9a5f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOpContext.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import javax.annotation.Nullable; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; + +/** + * Base class for operation context struct passed through codepaths for main + * S3AFileSystem operations. + * Anything op-specific should be moved to a subclass of this. + */ +@SuppressWarnings("visibilitymodifier") // I want a struct of finals, for real. +public class S3AOpContext { + + final boolean isS3GuardEnabled; + final Invoker invoker; + @Nullable final FileSystem.Statistics stats; + final S3AInstrumentation instrumentation; + @Nullable final Invoker s3guardInvoker; + + /** FileStatus for "destination" path being operated on. */ + protected final FileStatus dstFileStatus; + + /** + * Alternate constructor that allows passing in two invokers, the common + * one, and another with the S3Guard Retry Policy. + * @param isS3GuardEnabled true if s3Guard is active + * @param invoker invoker, which contains retry policy + * @param s3guardInvoker s3guard-specific retry policy invoker + * @param stats optional stats object + * @param instrumentation instrumentation to use + * @param dstFileStatus file status from existence check + */ + public S3AOpContext(boolean isS3GuardEnabled, Invoker invoker, + Invoker s3guardInvoker, @Nullable FileSystem.Statistics stats, + S3AInstrumentation instrumentation, FileStatus dstFileStatus) { + + Preconditions.checkNotNull(invoker, "Null invoker arg"); + Preconditions.checkNotNull(instrumentation, "Null instrumentation arg"); + Preconditions.checkNotNull(dstFileStatus, "Null dstFileStatus arg"); + this.isS3GuardEnabled = isS3GuardEnabled; + Preconditions.checkArgument(!isS3GuardEnabled || s3guardInvoker != null, + "S3Guard invoker required: S3Guard is enabled."); + this.invoker = invoker; + this.s3guardInvoker = s3guardInvoker; + this.stats = stats; + this.instrumentation = instrumentation; + this.dstFileStatus = dstFileStatus; + } + + /** + * Constructor using common invoker and retry policy. + * @param isS3GuardEnabled true if s3Guard is active + * @param invoker invoker, which contains retry policy + * @param stats optional stats object + * @param instrumentation instrumentation to use + * @param dstFileStatus + */ + public S3AOpContext(boolean isS3GuardEnabled, Invoker invoker, + @Nullable FileSystem.Statistics stats, S3AInstrumentation instrumentation, + FileStatus dstFileStatus) { + this(isS3GuardEnabled, invoker, null, stats, instrumentation, + dstFileStatus); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java new file mode 100644 index 0000000000..220cd0d8a0 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; + +import javax.annotation.Nullable; + +/** + * Read-specific operation context struct. + */ +public class S3AReadOpContext extends S3AOpContext { + public S3AReadOpContext(boolean isS3GuardEnabled, Invoker invoker, + Invoker s3guardInvoker, @Nullable FileSystem.Statistics stats, + S3AInstrumentation instrumentation, FileStatus dstFileStatus) { + super(isS3GuardEnabled, invoker, s3guardInvoker, stats, instrumentation, + dstFileStatus); + } + + public S3AReadOpContext(boolean isS3GuardEnabled, Invoker invoker, + @Nullable FileSystem.Statistics stats, S3AInstrumentation instrumentation, + FileStatus dstFileStatus) { + super(isS3GuardEnabled, invoker, stats, instrumentation, dstFileStatus); + } + + /** + * Get invoker to use for read operations. When S3Guard is enabled we use + * the S3Guard invoker, which deals with things like FileNotFoundException + * differently. + * @return invoker to use for read codepaths + */ + public Invoker getReadInvoker() { + if (isS3GuardEnabled) { + return s3guardInvoker; + } else { + return invoker; + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java index e37a55498e..d857330395 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java @@ -76,10 +76,30 @@ * @see Amazon S3 Error Best Practices * @see Dynamo DB Commmon errors */ +@SuppressWarnings("visibilitymodifier") // I want a struct of finals, for real. public class S3ARetryPolicy implements RetryPolicy { + /** Final retry policy we end up with. */ private final RetryPolicy retryPolicy; + // Retry policies for mapping exceptions to + + /** Base policy from configuration. */ + protected final RetryPolicy fixedRetries; + + /** Rejection of all non-idempotent calls except specific failures. */ + protected final RetryPolicy retryIdempotentCalls; + + /** Policy for throttle requests, which are considered repeatable, even for + * non-idempotent calls, as the service rejected the call entirely. */ + protected final RetryPolicy throttlePolicy; + + /** No retry on network and tangible API issues. */ + protected final RetryPolicy fail = RetryPolicies.TRY_ONCE_THEN_FAIL; + + /** Client connectivity: fixed retries without care for idempotency. */ + protected final RetryPolicy connectivityFailure; + /** * Instantiate. * @param conf configuration to read. @@ -88,7 +108,7 @@ public S3ARetryPolicy(Configuration conf) { Preconditions.checkArgument(conf != null, "Null configuration"); // base policy from configuration - RetryPolicy fixedRetries = retryUpToMaximumCountWithFixedSleep( + fixedRetries = retryUpToMaximumCountWithFixedSleep( conf.getInt(RETRY_LIMIT, RETRY_LIMIT_DEFAULT), conf.getTimeDuration(RETRY_INTERVAL, RETRY_INTERVAL_DEFAULT, @@ -97,25 +117,33 @@ public S3ARetryPolicy(Configuration conf) { // which is wrapped by a rejection of all non-idempotent calls except // for specific failures. - RetryPolicy retryIdempotentCalls = new FailNonIOEs( + retryIdempotentCalls = new FailNonIOEs( new IdempotencyRetryFilter(fixedRetries)); // and a separate policy for throttle requests, which are considered // repeatable, even for non-idempotent calls, as the service // rejected the call entirely - RetryPolicy throttlePolicy = exponentialBackoffRetry( + throttlePolicy = exponentialBackoffRetry( conf.getInt(RETRY_THROTTLE_LIMIT, RETRY_THROTTLE_LIMIT_DEFAULT), conf.getTimeDuration(RETRY_THROTTLE_INTERVAL, RETRY_THROTTLE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); - // no retry on network and tangible API issues - RetryPolicy fail = RetryPolicies.TRY_ONCE_THEN_FAIL; - // client connectivity: fixed retries without care for idempotency - RetryPolicy connectivityFailure = fixedRetries; + connectivityFailure = fixedRetries; + Map, RetryPolicy> policyMap = + createExceptionMap(); + retryPolicy = retryByException(retryIdempotentCalls, policyMap); + } + + /** + * Subclasses can override this like a constructor to change behavior: call + * superclass method, then modify it as needed, and return it. + * @return Map from exception type to RetryPolicy + */ + protected Map, RetryPolicy> createExceptionMap() { // the policy map maps the exact classname; subclasses do not // inherit policies. Map, RetryPolicy> policyMap = new HashMap<>(); @@ -126,7 +154,6 @@ public S3ARetryPolicy(Configuration conf) { policyMap.put(InterruptedException.class, fail); // note this does not pick up subclasses (like socket timeout) policyMap.put(InterruptedIOException.class, fail); - policyMap.put(AWSRedirectException.class, fail); // interesting question: should this be retried ever? policyMap.put(AccessDeniedException.class, fail); policyMap.put(FileNotFoundException.class, fail); @@ -169,7 +196,7 @@ public S3ARetryPolicy(Configuration conf) { // trigger sleep policyMap.put(ProvisionedThroughputExceededException.class, throttlePolicy); - retryPolicy = retryByException(retryIdempotentCalls, policyMap); + return policyMap; } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java new file mode 100644 index 0000000000..023d0c3cf2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.FileNotFoundException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicy; + + +/** + * Slightly-modified retry policy for cases when the file is present in the + * MetadataStore, but may be still throwing FileNotFoundException from S3. + */ +public class S3GuardExistsRetryPolicy extends S3ARetryPolicy { + /** + * Instantiate. + * @param conf configuration to read. + */ + public S3GuardExistsRetryPolicy(Configuration conf) { + super(conf); + } + + @Override + protected Map, RetryPolicy> createExceptionMap() { + Map, RetryPolicy> b = super.createExceptionMap(); + b.put(FileNotFoundException.class, retryIdempotentCalls); + return b; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 4dfbdc835f..4c4043ebc4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -688,9 +688,11 @@ public void put(PathMetadata meta) throws IOException { @Override @Retries.OnceRaw public void put(Collection metas) throws IOException { - LOG.debug("Saving batch to table {} in region {}", tableName, region); - processBatchWriteRequest(null, pathMetadataToItem(completeAncestry(metas))); + Item[] items = pathMetadataToItem(completeAncestry(metas)); + LOG.debug("Saving batch of {} items to table {}, region {}", items.length, + tableName, region); + processBatchWriteRequest(null, items); } /** @@ -1076,6 +1078,15 @@ void provisionTable(Long readCapacity, Long writeCapacity) }); } + @Retries.RetryTranslated + @VisibleForTesting + void provisionTableBlocking(Long readCapacity, Long writeCapacity) + throws IOException { + provisionTable(readCapacity, writeCapacity); + waitForTableActive(table); + } + + @VisibleForTesting Table getTable() { return table; } @@ -1173,15 +1184,12 @@ public void updateParameters(Map parameters) S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY, currentWrite); - ProvisionedThroughput throughput = new ProvisionedThroughput() - .withReadCapacityUnits(newRead) - .withWriteCapacityUnits(newWrite); if (newRead != currentRead || newWrite != currentWrite) { LOG.info("Current table capacity is read: {}, write: {}", currentRead, currentWrite); LOG.info("Changing capacity of table to read: {}, write: {}", newRead, newWrite); - table.updateTable(throughput); + provisionTableBlocking(newRead, newWrite); } else { LOG.info("Table capacity unchanged at read: {}, write: {}", newRead, newWrite); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java index bdab7b7283..69d181ee30 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java @@ -34,6 +34,9 @@ * {@code MetadataStore} defines the set of operations that any metadata store * implementation must provide. Note that all {@link Path} objects provided * to methods must be absolute, not relative paths. + * Implementations must implement any retries needed internally, such that + * transient errors are generally recovered from without throwing exceptions + * from this API. */ @InterfaceAudience.Private @InterfaceStability.Evolving diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md index 4924b45ffb..4454d5ce2f 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md @@ -300,6 +300,11 @@ By their very nature they are slow. And, as their execution time is often limited by bandwidth between the computer running the tests and the S3 endpoint, parallel execution does not speed these tests up. +***Note: Running scale tests with -Ds3guard and -Ddynamo requires that +you use a private, testing-only DynamoDB table.*** The tests do disruptive +things such as deleting metadata and setting the provisioned throughput +to very low values. + ### Enabling the Scale Tests The tests are enabled if the `scale` property is set in the maven build diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java index eb4f70bf7c..6ac803e308 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java @@ -22,16 +22,22 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.s3a.S3AContract; +import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; +import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore; import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Test; import java.io.FileNotFoundException; -import java.util.concurrent.Callable; +import java.io.InputStream; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; import static org.apache.hadoop.fs.s3a.Constants.*; -import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*; +import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*; +import static org.apache.hadoop.test.LambdaTestUtils.eventually; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Tests S3A behavior under forced inconsistency via {@link @@ -43,6 +49,8 @@ */ public class ITestS3AInconsistency extends AbstractS3ATestBase { + private static final int OPEN_READ_ITERATIONS = 20; + @Override protected AbstractFSContract createContract(Configuration conf) { conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class, @@ -86,15 +94,103 @@ public void testGetFileStatus() throws Exception { } } + + /** + * Ensure that deleting a file with an open read stream does eventually cause + * readers to get a FNFE, even with S3Guard and its retries enabled. + * In real usage, S3Guard should be enabled for all clients that modify the + * file, so the delete would be immediately recorded in the MetadataStore. + * Here, however, we test deletion from under S3Guard to make sure it still + * eventually propagates the FNFE after any retry policies are exhausted. + */ + @Test + public void testOpenDeleteRead() throws Exception { + S3AFileSystem fs = getFileSystem(); + Path p = path("testOpenDeleteRead.txt"); + writeTextFile(fs, p, "1337c0d3z", true); + try (InputStream s = fs.open(p)) { + // Disable s3guard, delete file underneath it, re-enable s3guard + MetadataStore metadataStore = fs.getMetadataStore(); + fs.setMetadataStore(new NullMetadataStore()); + fs.delete(p, false); + fs.setMetadataStore(metadataStore); + eventually(1000, 200, () -> { + intercept(FileNotFoundException.class, () -> s.read()); + }); + } + } + + /** + * Test read() path behavior when getFileStatus() succeeds but subsequent + * read() on the input stream fails due to eventual consistency. + * There are many points in the InputStream codepaths that can fail. We set + * a probability of failure and repeat the test multiple times to achieve + * decent coverage. + */ + @Test + public void testOpenFailOnRead() throws Exception { + + S3AFileSystem fs = getFileSystem(); + + // 1. Patch in a different failure injection policy with <1.0 probability + Configuration conf = fs.getConf(); + conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 0.5f); + InconsistentAmazonS3Client.setFailureInjectionPolicy(fs, + new FailureInjectionPolicy(conf)); + + // 2. Make sure no ancestor dirs exist + Path dir = path("ancestor"); + fs.delete(dir, true); + waitUntilDeleted(dir); + + // 3. Create a descendant file, which implicitly creates ancestors + // This file has delayed visibility. + describe("creating test file"); + Path path = path("ancestor/file-to-read-" + DEFAULT_DELAY_KEY_SUBSTRING); + writeTextFile(getFileSystem(), path, "Reading is fun", false); + + // 4. Clear inconsistency so the first getFileStatus() can succeed, if we + // are not using S3Guard. If we are using S3Guard, it should tolerate the + // delayed visibility. + if (!fs.hasMetadataStore()) { + InconsistentAmazonS3Client.clearInconsistency(fs); + } + + // ? Do we need multiple iterations when S3Guard is disabled? For now, + // leaving it in + for (int i = 0; i < OPEN_READ_ITERATIONS; i++) { + doOpenFailOnReadTest(fs, path, i); + } + } + + private void doOpenFailOnReadTest(S3AFileSystem fs, Path path, int iteration) + throws Exception { + + // 4. Open the file + describe(String.format("i=%d: opening test file", iteration)); + try(InputStream in = fs.open(path)) { + // 5. Assert expected behavior on read() failure. + int l = 4; + byte[] buf = new byte[l]; + describe("reading test file"); + // Use both read() variants + if ((iteration % 2) == 0) { + assertEquals(l, in.read(buf, 0, l)); + } else { + in.read(); + } + } catch (FileNotFoundException e) { + if (fs.hasMetadataStore()) { + LOG.error("Error:", e); + ContractTestUtils.fail("S3Guard failed to handle fail-on-read", e); + } else { + LOG.info("File not found on read(), as expected."); + } + } + } + private void waitUntilDeleted(final Path p) throws Exception { LambdaTestUtils.eventually(30 * 1000, 1000, - new Callable() { - @Override - public Void call() throws Exception { - assertPathDoesNotExist("Dir should be deleted", p); - return null; - } - } - ); + () -> assertPathDoesNotExist("Dir should be deleted", p)); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java index 4a81374223..763819b2a4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java @@ -20,7 +20,6 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; -import com.amazonaws.services.s3.AmazonS3; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -33,6 +32,7 @@ import org.junit.Test; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -41,6 +41,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*; import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*; /** @@ -552,11 +553,10 @@ public void testInconsistentS3ClientDeletes() throws Throwable { * @param key * @param delimiter * @return - * @throws IOException + * @throws IOException on error */ - private ListObjectsV2Result listObjectsV2(S3AFileSystem fs, - String key, String delimiter) throws java.io.IOException { + String key, String delimiter) throws IOException { ListObjectsV2Request k = fs.createListObjectsRequest(key, delimiter) .getV2(); return invoker.retryUntranslated("list", true, @@ -565,9 +565,4 @@ private ListObjectsV2Result listObjectsV2(S3AFileSystem fs, }); } - private static void clearInconsistency(S3AFileSystem fs) throws Exception { - AmazonS3 s3 = fs.getAmazonS3ClientForTesting("s3guard"); - InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3); - ic.clearInconsistency(); - } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index da0060ec01..4414746f96 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -51,7 +51,7 @@ import java.util.concurrent.Callable; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; -import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*; +import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*; import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions; @@ -819,7 +819,7 @@ public static long lsR(FileSystem fileSystem, Path path, boolean recursive) * Turn on the inconsistent S3A FS client in a configuration, * with 100% probability of inconsistency, default delays. * For this to go live, the paths must include the element - * {@link InconsistentAmazonS3Client#DEFAULT_DELAY_KEY_SUBSTRING}. + * {@link FailureInjectionPolicy#DEFAULT_DELAY_KEY_SUBSTRING}. * @param conf configuration to patch * @param delay delay in millis */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java index 4730a902f7..b8610d64cd 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.FailureInjectionPolicy; import org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.WriteOperationHelper; @@ -90,7 +91,7 @@ public boolean useInconsistentClient() { @Override protected Path path(String filepath) throws IOException { return useInconsistentClient() ? - super.path(InconsistentAmazonS3Client.DEFAULT_DELAY_KEY_SUBSTRING + super.path(FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING + "/" + filepath) : super.path(filepath); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java new file mode 100644 index 0000000000..02a896653a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; + +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.scale.AbstractITestS3AMetadataStoreScale; + +import static org.apache.hadoop.fs.s3a.s3guard.MetadataStoreTestBase.basicFileStatus; +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.junit.Assume.*; + +/** + * Scale test for DynamoDBMetadataStore. + */ +public class ITestDynamoDBMetadataStoreScale + extends AbstractITestS3AMetadataStoreScale { + + private static final long BATCH_SIZE = 25; + private static final long SMALL_IO_UNITS = BATCH_SIZE / 4; + + @Override + public MetadataStore createMetadataStore() throws IOException { + Configuration conf = getFileSystem().getConf(); + String ddbTable = conf.get(S3GUARD_DDB_TABLE_NAME_KEY); + assumeNotNull("DynamoDB table is configured", ddbTable); + String ddbEndpoint = conf.get(S3GUARD_DDB_REGION_KEY); + assumeNotNull("DynamoDB endpoint is configured", ddbEndpoint); + + DynamoDBMetadataStore ms = new DynamoDBMetadataStore(); + ms.initialize(getFileSystem().getConf()); + return ms; + } + + + /** + * Though the AWS SDK claims in documentation to handle retries and + * exponential backoff, we have witnessed + * com.amazonaws...dynamodbv2.model.ProvisionedThroughputExceededException + * (Status Code: 400; Error Code: ProvisionedThroughputExceededException) + * Hypothesis: + * Happens when the size of a batched write is bigger than the number of + * provisioned write units. This test ensures we handle the case + * correctly, retrying w/ smaller batch instead of surfacing exceptions. + */ + @Test + public void testBatchedWriteExceedsProvisioned() throws Exception { + + final long iterations = 5; + boolean isProvisionedChanged; + List toCleanup = new ArrayList<>(); + + // Fail if someone changes a constant we depend on + assertTrue("Maximum batch size must big enough to run this test", + S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT >= BATCH_SIZE); + + try (DynamoDBMetadataStore ddbms = + (DynamoDBMetadataStore)createMetadataStore()) { + + DynamoDB ddb = ddbms.getDynamoDB(); + String tableName = ddbms.getTable().getTableName(); + final ProvisionedThroughputDescription existing = + ddb.getTable(tableName).describe().getProvisionedThroughput(); + + // If you set the same provisioned I/O as already set it throws an + // exception, avoid that. + isProvisionedChanged = (existing.getReadCapacityUnits() != SMALL_IO_UNITS + || existing.getWriteCapacityUnits() != SMALL_IO_UNITS); + + if (isProvisionedChanged) { + // Set low provisioned I/O for dynamodb + describe("Provisioning dynamo tbl %s read/write -> %d/%d", tableName, + SMALL_IO_UNITS, SMALL_IO_UNITS); + // Blocks to ensure table is back to ready state before we proceed + ddbms.provisionTableBlocking(SMALL_IO_UNITS, SMALL_IO_UNITS); + } else { + describe("Skipping provisioning table I/O, already %d/%d", + SMALL_IO_UNITS, SMALL_IO_UNITS); + } + + try { + // We know the dynamodb metadata store will expand a put of a path + // of depth N into a batch of N writes (all ancestors are written + // separately up to the root). (Ab)use this for an easy way to write + // a batch of stuff that is bigger than the provisioned write units + try { + describe("Running %d iterations of batched put, size %d", iterations, + BATCH_SIZE); + long pruneItems = 0; + for (long i = 0; i < iterations; i++) { + Path longPath = pathOfDepth(BATCH_SIZE, String.valueOf(i)); + FileStatus status = basicFileStatus(longPath, 0, false, 12345, + 12345); + PathMetadata pm = new PathMetadata(status); + + ddbms.put(pm); + toCleanup.add(pm); + pruneItems++; + // Having hard time reproducing Exceeded exception with put, also + // try occasional prune, which was the only stack trace I've seen + // (on JIRA) + if (pruneItems == BATCH_SIZE) { + describe("pruning files"); + ddbms.prune(Long.MAX_VALUE /* all files */); + pruneItems = 0; + } + } + } finally { + describe("Cleaning up table %s", tableName); + for (PathMetadata pm : toCleanup) { + cleanupMetadata(ddbms, pm); + } + } + } finally { + if (isProvisionedChanged) { + long write = existing.getWriteCapacityUnits(); + long read = existing.getReadCapacityUnits(); + describe("Restoring dynamo tbl %s read/write -> %d/%d", tableName, + read, write); + ddbms.provisionTableBlocking(existing.getReadCapacityUnits(), + existing.getWriteCapacityUnits()); + } + } + } + } + + // Attempt do delete metadata, suppressing any errors + private void cleanupMetadata(MetadataStore ms, PathMetadata pm) { + try { + ms.forgetMetadata(pm.getFileStatus().getPath()); + } catch (IOException ioe) { + // Ignore. + } + } + + private Path pathOfDepth(long n, @Nullable String fileSuffix) { + StringBuilder sb = new StringBuilder(); + for (long i = 0; i < n; i++) { + sb.append(i == 0 ? "/" + this.getClass().getSimpleName() : "lvl"); + sb.append(i); + if (i == n-1 && fileSuffix != null) { + sb.append(fileSuffix); + } + sb.append("/"); + } + return new Path(getFileSystem().getUri().toString(), sb.toString()); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java index c19ae9184e..e463ce4706 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java @@ -839,7 +839,7 @@ FileStatus basicFileStatus(Path path, int size, boolean isDir) throws return basicFileStatus(path, size, isDir, modTime, accessTime); } - FileStatus basicFileStatus(Path path, int size, boolean isDir, + public static FileStatus basicFileStatus(Path path, int size, boolean isDir, long newModTime, long newAccessTime) throws IOException { return new FileStatus(size, isDir, REPLICATION, BLOCK_SIZE, newModTime, newAccessTime, PERMISSION, OWNER, GROUP, path); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestDynamoDBMetadataStoreScale.java deleted file mode 100644 index 3de19350fa..0000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestDynamoDBMetadataStoreScale.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.scale; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore; -import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; - -import java.io.IOException; - -import static org.junit.Assume.*; -import static org.apache.hadoop.fs.s3a.Constants.*; - -/** - * Scale test for DynamoDBMetadataStore. - */ -public class ITestDynamoDBMetadataStoreScale - extends AbstractITestS3AMetadataStoreScale { - - @Override - public MetadataStore createMetadataStore() throws IOException { - Configuration conf = getFileSystem().getConf(); - String ddbTable = conf.get(S3GUARD_DDB_TABLE_NAME_KEY); - assumeNotNull("DynamoDB table is configured", ddbTable); - String ddbEndpoint = conf.get(S3GUARD_DDB_REGION_KEY); - assumeNotNull("DynamoDB endpoint is configured", ddbEndpoint); - - DynamoDBMetadataStore ms = new DynamoDBMetadataStore(); - ms.initialize(getFileSystem().getConf()); - return ms; - } -}