diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index df3c9315ba..43a2b7e0db 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -614,7 +614,10 @@ private long putObject() throws IOException {
try {
// the putObject call automatically closes the input
// stream afterwards.
- return writeOperationHelper.putObject(putObjectRequest, builder.putOptions);
+ return writeOperationHelper.putObject(
+ putObjectRequest,
+ builder.putOptions,
+ statistics);
} finally {
cleanupWithLogger(LOG, uploadData, block);
}
@@ -897,7 +900,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
try {
LOG.debug("Uploading part {} for id '{}'",
currentPartNumber, uploadId);
- PartETag partETag = writeOperationHelper.uploadPart(request)
+ PartETag partETag = writeOperationHelper.uploadPart(request, statistics)
.getPartETag();
LOG.debug("Completed upload of {} to part {}",
block, partETag.getETag());
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index cf82627af0..155a91a8af 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -2600,6 +2600,21 @@ protected DurationTrackerFactory getDurationTrackerFactory() {
: null;
}
+ /**
+ * Given a possibly null duration tracker factory, return a non-null
+ * one for use in tracking durations -either that or the FS tracker
+ * itself.
+ *
+ * @param factory factory.
+ * @return a non-null factory.
+ */
+ protected DurationTrackerFactory nonNullDurationTrackerFactory(
+ DurationTrackerFactory factory) {
+ return factory != null
+ ? factory
+ : getDurationTrackerFactory();
+ }
+
/**
* Request object metadata; increments counters in the process.
* Retry policy: retry untranslated.
@@ -2958,20 +2973,22 @@ public UploadInfo putObject(PutObjectRequest putObjectRequest) {
* Important: this call will close any input stream in the request.
* @param putObjectRequest the request
* @param putOptions put object options
+ * @param durationTrackerFactory factory for duration tracking
* @return the upload initiated
* @throws AmazonClientException on problems
*/
@VisibleForTesting
@Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed")
PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest,
- PutObjectOptions putOptions)
+ PutObjectOptions putOptions,
+ DurationTrackerFactory durationTrackerFactory)
throws AmazonClientException {
long len = getPutRequestLength(putObjectRequest);
LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey());
incrementPutStartStatistics(len);
try {
PutObjectResult result = trackDurationOfSupplier(
- getDurationTrackerFactory(),
+ nonNullDurationTrackerFactory(durationTrackerFactory),
OBJECT_PUT_REQUESTS.getSymbol(), () ->
s3.putObject(putObjectRequest));
incrementPutCompletedStatistics(true, len);
@@ -3010,16 +3027,21 @@ private long getPutRequestLength(PutObjectRequest putObjectRequest) {
*
* Retry Policy: none.
* @param request request
+ * @param durationTrackerFactory duration tracker factory for operation
* @return the result of the operation.
* @throws AmazonClientException on problems
*/
@Retries.OnceRaw
- UploadPartResult uploadPart(UploadPartRequest request)
+ UploadPartResult uploadPart(UploadPartRequest request,
+ final DurationTrackerFactory durationTrackerFactory)
throws AmazonClientException {
long len = request.getPartSize();
incrementPutStartStatistics(len);
try {
- UploadPartResult uploadPartResult = s3.uploadPart(request);
+ UploadPartResult uploadPartResult = trackDurationOfSupplier(
+ nonNullDurationTrackerFactory(durationTrackerFactory),
+ MULTIPART_UPLOAD_PART_PUT.getSymbol(), () ->
+ s3.uploadPart(request));
incrementPutCompletedStatistics(true, len);
return uploadPartResult;
} catch (AmazonClientException e) {
@@ -4432,8 +4454,9 @@ private void createEmptyObject(final String objectName, PutObjectOptions putOpti
throws IOException {
invoker.retry("PUT 0-byte object ", objectName,
true, () ->
- putObjectDirect(getRequestFactory()
- .newDirectoryMarkerRequest(objectName), putOptions));
+ putObjectDirect(getRequestFactory().newDirectoryMarkerRequest(objectName),
+ putOptions,
+ getDurationTrackerFactory()));
incrementPutProgressStatistics(objectName, 0);
instrumentation.directoryCreated();
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 5f753757d8..e0770c6094 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -1441,9 +1441,7 @@ private void mergeOutputStreamStatistics(
final IOStatisticsStore sourceIOStatistics = source.getIOStatistics();
this.getIOStatistics().aggregate(sourceIOStatistics);
- // propagate any extra values into the FS-level stats.
- incrementMutableCounter(OBJECT_PUT_REQUESTS.getSymbol(),
- sourceIOStatistics.counters().get(OBJECT_PUT_REQUESTS.getSymbol()));
+ // propagate any extra values into the FS-level stats;
incrementMutableCounter(
COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
sourceIOStatistics.counters().get(COMMITTER_MAGIC_MARKER_PUT.getSymbol()));
@@ -1507,6 +1505,7 @@ private OutputStreamStatistics(
COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
INVOCATION_ABORT.getSymbol(),
MULTIPART_UPLOAD_COMPLETED.getSymbol(),
+ MULTIPART_UPLOAD_PART_PUT.getSymbol(),
OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(),
OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(),
OBJECT_PUT_REQUESTS.getSymbol())
@@ -1773,7 +1772,8 @@ private CommitterStatisticsImpl() {
COMMITTER_COMMIT_JOB.getSymbol(),
COMMITTER_LOAD_SINGLE_PENDING_FILE.getSymbol(),
COMMITTER_MATERIALIZE_FILE.getSymbol(),
- COMMITTER_STAGE_FILE_UPLOAD.getSymbol())
+ COMMITTER_STAGE_FILE_UPLOAD.getSymbol(),
+ OBJECT_PUT_REQUESTS.getSymbol())
.build();
setIOStatistics(st);
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
index 30427f7672..528a99f5e0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
@@ -31,17 +31,18 @@
import java.util.concurrent.TimeUnit;
import com.amazonaws.AmazonClientException;
-import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.ConnectTimeoutException;
+import org.apache.hadoop.util.Preconditions;
import static org.apache.hadoop.io.retry.RetryPolicies.*;
@@ -228,6 +229,9 @@ protected Map, RetryPolicy> createExceptionMap() {
policyMap.put(AWSS3IOException.class, retryIdempotentCalls);
policyMap.put(SocketTimeoutException.class, retryIdempotentCalls);
+ // Unsupported requests do not work, however many times you try
+ policyMap.put(UnsupportedRequestException.class, fail);
+
return policyMap;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 8159244dc3..dc4ee8a949 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -44,6 +44,8 @@
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.fs.s3a.audit.AuditFailureException;
+import org.apache.hadoop.fs.s3a.audit.AuditIntegration;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider;
import org.apache.hadoop.fs.s3a.impl.NetworkBinding;
@@ -201,10 +203,14 @@ public static IOException translateException(@Nullable String operation,
// call considered an sign of connectivity failure
return (EOFException)new EOFException(message).initCause(exception);
}
+ // if the exception came from the auditor, hand off translation
+ // to it.
+ if (exception instanceof AuditFailureException) {
+ return AuditIntegration.translateAuditException(path, (AuditFailureException) exception);
+ }
if (exception instanceof CredentialInitializationException) {
// the exception raised by AWSCredentialProvider list if the
// credentials were not accepted,
- // or auditing blocked the operation.
return (AccessDeniedException)new AccessDeniedException(path, null,
exception.toString()).initCause(exception);
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index 7e0875d8bb..0bbb8a35f5 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.s3a.select.SelectBinding;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.util.DurationInfo;
@@ -564,36 +565,19 @@ public String toString() {
* file, from the content length of the header.
* @param putObjectRequest the request
* @param putOptions put object options
+ * @param durationTrackerFactory factory for duration tracking
* @return the upload initiated
* @throws IOException on problems
*/
@Retries.RetryTranslated
public PutObjectResult putObject(PutObjectRequest putObjectRequest,
- PutObjectOptions putOptions)
+ PutObjectOptions putOptions,
+ DurationTrackerFactory durationTrackerFactory)
throws IOException {
return retry("Writing Object",
putObjectRequest.getKey(), true,
withinAuditSpan(getAuditSpan(), () ->
- owner.putObjectDirect(putObjectRequest, putOptions)));
- }
-
- /**
- * PUT an object.
- *
- * @param putObjectRequest the request
- * @param putOptions put object options
- *
- * @throws IOException on problems
- */
- @Retries.RetryTranslated
- public void uploadObject(PutObjectRequest putObjectRequest,
- PutObjectOptions putOptions)
- throws IOException {
-
- retry("Writing Object",
- putObjectRequest.getKey(), true,
- withinAuditSpan(getAuditSpan(), () ->
- owner.putObjectDirect(putObjectRequest, putOptions)));
+ owner.putObjectDirect(putObjectRequest, putOptions, durationTrackerFactory)));
}
/**
@@ -650,18 +634,20 @@ public CompleteMultipartUploadResult commitUpload(
/**
* Upload part of a multi-partition file.
* @param request request
+ * @param durationTrackerFactory duration tracker factory for operation
* @return the result of the operation.
* @throws IOException on problems
*/
@Retries.RetryTranslated
- public UploadPartResult uploadPart(UploadPartRequest request)
+ public UploadPartResult uploadPart(UploadPartRequest request,
+ final DurationTrackerFactory durationTrackerFactory)
throws IOException {
return retry("upload part #" + request.getPartNumber()
+ " upload ID " + request.getUploadId(),
request.getKey(),
true,
withinAuditSpan(getAuditSpan(),
- () -> owner.uploadPart(request)));
+ () -> owner.uploadPart(request, durationTrackerFactory)));
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
index 9063423122..1c3d368857 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
@@ -244,25 +245,14 @@ UploadPartRequest newUploadPartRequest(
* file, from the content length of the header.
* @param putObjectRequest the request
* @param putOptions put object options
+ * @param durationTrackerFactory factory for duration tracking
* @return the upload initiated
* @throws IOException on problems
*/
@Retries.RetryTranslated
PutObjectResult putObject(PutObjectRequest putObjectRequest,
- PutObjectOptions putOptions)
- throws IOException;
-
- /**
- * PUT an object via the transfer manager.
- *
- * @param putObjectRequest the request
- * @param putOptions put object options
- *
- * @throws IOException on problems
- */
- @Retries.RetryTranslated
- void uploadObject(PutObjectRequest putObjectRequest,
- PutObjectOptions putOptions)
+ PutObjectOptions putOptions,
+ DurationTrackerFactory durationTrackerFactory)
throws IOException;
/**
@@ -299,11 +289,13 @@ CompleteMultipartUploadResult commitUpload(
/**
* Upload part of a multi-partition file.
* @param request request
+ * @param durationTrackerFactory factory for duration tracking
* @return the result of the operation.
* @throws IOException on problems
*/
@Retries.RetryTranslated
- UploadPartResult uploadPart(UploadPartRequest request)
+ UploadPartResult uploadPart(UploadPartRequest request,
+ DurationTrackerFactory durationTrackerFactory)
throws IOException;
/**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/UnsupportedRequestException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/UnsupportedRequestException.java
new file mode 100644
index 0000000000..36b7148f00
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/UnsupportedRequestException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.api;
+
+import org.apache.hadoop.fs.PathIOException;
+
+/**
+ * An operation is unsupported.
+ */
+public class UnsupportedRequestException extends PathIOException {
+
+ public UnsupportedRequestException(final String path, final Throwable cause) {
+ super(path, cause);
+ }
+
+ public UnsupportedRequestException(final String path, final String error) {
+ super(path, error);
+ }
+
+ public UnsupportedRequestException(final String path,
+ final String error,
+ final Throwable cause) {
+ super(path, error, cause);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
index f2963d7319..b4be341c91 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
@@ -217,6 +217,20 @@ private RequestInfo writing(final String verb,
|| request instanceof GetBucketLocationRequest;
}
+ /**
+ * Predicate which returns true if the request is part of the
+ * multipart upload API -and which therefore must be rejected
+ * if multipart upload is disabled.
+ * @param request request
+ * @return true if the transfer manager creates them.
+ */
+ public static boolean isRequestMultipartIO(final Object request) {
+ return request instanceof CopyPartRequest
+ || request instanceof CompleteMultipartUploadRequest
+ || request instanceof InitiateMultipartUploadRequest
+ || request instanceof UploadPartRequest;
+ }
+
/**
* Info about a request.
*/
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditIntegration.java
index bc964b430e..c66f45eb30 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditIntegration.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditIntegration.java
@@ -21,12 +21,14 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.nio.file.AccessDeniedException;
import com.amazonaws.HandlerContextAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
import org.apache.hadoop.fs.s3a.audit.impl.ActiveAuditManagerS3A;
import org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor;
import org.apache.hadoop.fs.s3a.audit.impl.NoopAuditManagerS3A;
@@ -142,4 +144,20 @@ public static void attachSpanToRequest(
request.addHandlerContext(AUDIT_SPAN_HANDLER_CONTEXT, span);
}
+ /**
+ * Translate an audit exception.
+ * @param path path of operation.
+ * @param exception exception
+ * @return the IOE to raise.
+ */
+ public static IOException translateAuditException(String path,
+ AuditFailureException exception) {
+ if (exception instanceof AuditOperationRejectedException) {
+ // special handling of this subclass
+ return new UnsupportedRequestException(path,
+ exception.getMessage(), exception);
+ }
+ return (AccessDeniedException)new AccessDeniedException(path, null,
+ exception.toString()).initCause(exception);
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditOperationRejectedException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditOperationRejectedException.java
new file mode 100644
index 0000000000..f93313b261
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditOperationRejectedException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+/**
+ * The auditor has rejected the operation as forbidden/unavailable.
+ */
+public class AuditOperationRejectedException extends AuditFailureException {
+
+ public AuditOperationRejectedException(final String message, final Throwable t) {
+ super(message, t);
+ }
+
+ public AuditOperationRejectedException(final String message) {
+ super(message);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java
index feb926a0bf..6d6bd93543 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.fs.audit.CommonAuditContext;
import org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer;
import org.apache.hadoop.fs.s3a.audit.AuditFailureException;
+import org.apache.hadoop.fs.s3a.audit.AuditOperationRejectedException;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
@@ -46,6 +47,9 @@
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_TIMESTAMP;
import static org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext;
import static org.apache.hadoop.fs.audit.CommonAuditContext.currentThreadID;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MULTIPART_UPLOAD_ENABLED;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
+import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO;
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED;
@@ -112,6 +116,12 @@ public class LoggingAuditor
*/
private Collection filters;
+ /**
+ * Does the S3A FS instance being audited have multipart upload enabled?
+ * If not: fail if a multipart upload is initiated.
+ */
+ private boolean isMultipartUploadEnabled;
+
/**
* Log for warning of problems getting the range of GetObjectRequest
* will only log of a problem once per process instance.
@@ -164,6 +174,8 @@ protected void serviceInit(final Configuration conf) throws Exception {
final CommonAuditContext currentContext = currentAuditContext();
warningSpan = new WarningSpan(OUTSIDE_SPAN,
currentContext, createSpanID(), null, null);
+ isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
+ DEFAULT_MULTIPART_UPLOAD_ENABLED);
}
@Override
@@ -173,6 +185,7 @@ public String toString() {
sb.append("ID='").append(getAuditorId()).append('\'');
sb.append(", headerEnabled=").append(headerEnabled);
sb.append(", rejectOutOfSpan=").append(rejectOutOfSpan);
+ sb.append(", isMultipartUploadEnabled=").append(isMultipartUploadEnabled);
sb.append('}');
return sb.toString();
}
@@ -363,6 +376,12 @@ public T beforeExecution(
analyzer.analyze(request),
header);
}
+ // now see if the request is actually a blocked multipart request
+ if (!isMultipartUploadEnabled && isRequestMultipartIO(request)) {
+ throw new AuditOperationRejectedException("Multipart IO request "
+ + request + " rejected " + header);
+ }
+
return request;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
index eb23f299a0..ef56d82978 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
@@ -583,7 +583,7 @@ public SinglePendingCommit uploadFileToPendingCommit(File localFile,
localFile,
offset);
part.setLastPart(partNumber == numParts);
- UploadPartResult partResult = writeOperations.uploadPart(part);
+ UploadPartResult partResult = writeOperations.uploadPart(part, statistics);
offset += uploadPartSize;
parts.add(partResult.getPartETag());
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
index c85571a194..1a5451df80 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
@@ -185,7 +185,7 @@ public boolean aboutToComplete(String uploadId,
private void upload(PutObjectRequest request) throws IOException {
trackDurationOfInvocation(trackerStatistics,
COMMITTER_MAGIC_MARKER_PUT.getSymbol(), () ->
- writer.uploadObject(request, PutObjectOptions.keepingDirs()));
+ writer.putObject(request, PutObjectOptions.keepingDirs(), null));
}
@Override
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java
index 3a6a04e5e7..4ab5bc6a99 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java
@@ -57,7 +57,10 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.util.Preconditions;
+import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_COMPLETED;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_INITIATED;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfCallable;
/**
* MultipartUploader for S3AFileSystem. This uses the S3 multipart
@@ -122,13 +125,13 @@ public CompletableFuture startUpload(
checkPath(dest);
String key = context.pathToKey(dest);
return context.submit(new CompletableFuture<>(),
- () -> {
+ trackDurationOfCallable(statistics, OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(), () -> {
String uploadId = writeOperations.initiateMultiPartUpload(key,
PutObjectOptions.keepingDirs());
statistics.uploadStarted();
return BBUploadHandle.from(ByteBuffer.wrap(
uploadId.getBytes(Charsets.UTF_8)));
- });
+ }));
}
@Override
@@ -152,7 +155,7 @@ public CompletableFuture putPart(
UploadPartRequest request = writeOperations.newUploadPartRequest(key,
uploadIdString, partNumber, (int) lengthInBytes, inputStream,
null, 0L);
- UploadPartResult result = writeOperations.uploadPart(request);
+ UploadPartResult result = writeOperations.uploadPart(request, statistics);
statistics.partPut(lengthInBytes);
String eTag = result.getETag();
return BBPartHandle.from(
@@ -206,7 +209,7 @@ public CompletableFuture complete(
// retrieve/create operation state for scalability of completion.
long finalLen = totalLength;
return context.submit(new CompletableFuture<>(),
- () -> {
+ trackDurationOfCallable(statistics, MULTIPART_UPLOAD_COMPLETED.getSymbol(), () -> {
CompleteMultipartUploadResult result =
writeOperations.commitUpload(
key,
@@ -218,7 +221,7 @@ public CompletableFuture complete(
byte[] eTag = result.getETag().getBytes(Charsets.UTF_8);
statistics.uploadCompleted();
return (PathHandle) () -> ByteBuffer.wrap(eTag);
- });
+ }));
}
@Override
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/S3AMultipartUploaderStatisticsImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/S3AMultipartUploaderStatisticsImpl.java
index 7b6d559cf2..b35e4421d7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/S3AMultipartUploaderStatisticsImpl.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/S3AMultipartUploaderStatisticsImpl.java
@@ -34,6 +34,7 @@
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_PART_PUT;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_PART_PUT_BYTES;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_STARTED;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_INITIATED;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
/**
@@ -73,8 +74,11 @@ public S3AMultipartUploaderStatisticsImpl(
MULTIPART_UPLOAD_PART_PUT_BYTES.getSymbol(),
MULTIPART_UPLOAD_ABORTED.getSymbol(),
MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED.getSymbol(),
- MULTIPART_UPLOAD_COMPLETED.getSymbol(),
MULTIPART_UPLOAD_STARTED.getSymbol())
+ .withDurationTracking(
+ MULTIPART_UPLOAD_COMPLETED.getSymbol(),
+ OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(),
+ MULTIPART_UPLOAD_PART_PUT.getSymbol())
.build();
setIOStatistics(st);
}
@@ -96,13 +100,12 @@ public void uploadStarted() {
@Override
public void partPut(final long lengthInBytes) {
- inc(MULTIPART_UPLOAD_PART_PUT, 1);
inc(MULTIPART_UPLOAD_PART_PUT_BYTES, lengthInBytes);
}
@Override
public void uploadCompleted() {
- inc(MULTIPART_UPLOAD_COMPLETED, 1);
+ // duration tracking updates the statistics
}
@Override
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
index 2d29282ad0..1a944ec299 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
@@ -114,7 +114,7 @@ public void testPutObjectDirect() throws Throwable {
new ByteArrayInputStream("PUT".getBytes()),
metadata);
LambdaTestUtils.intercept(IllegalStateException.class,
- () -> fs.putObjectDirect(put, PutObjectOptions.keepingDirs()));
+ () -> fs.putObjectDirect(put, PutObjectOptions.keepingDirs(), null));
assertPathDoesNotExist("put object was created", path);
}
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java
index 9ea33cf69c..1ddff3c4cd 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java
@@ -82,7 +82,7 @@ public static IdKey createPartUpload(S3AFileSystem fs, String key, int len,
String uploadId = writeHelper.initiateMultiPartUpload(key, PutObjectOptions.keepingDirs());
UploadPartRequest req = writeHelper.newUploadPartRequest(key, uploadId,
partNo, len, in, null, 0L);
- PartETag partEtag = writeHelper.uploadPart(req).getPartETag();
+ PartETag partEtag = writeHelper.uploadPart(req, null).getPartETag();
LOG.debug("uploaded part etag {}, upid {}", partEtag.getETag(), uploadId);
return new IdKey(key, uploadId);
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditIntegration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditIntegration.java
index bd552b91aa..7cdab4c4b7 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditIntegration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditIntegration.java
@@ -25,13 +25,17 @@
import com.amazonaws.DefaultRequest;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
import org.apache.hadoop.fs.s3a.audit.impl.NoopAuditor;
import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.AbstractHadoopTestBase;
@@ -67,6 +71,22 @@ public void testExceptionTranslation() throws Throwable {
});
}
+ /**
+ * UnsupportedRequest mapping and fail fast outcome.
+ */
+ @Test
+ public void testUnsupportedExceptionTranslation() throws Throwable {
+ final UnsupportedRequestException ex = intercept(UnsupportedRequestException.class, () -> {
+ throw translateException("test", "/",
+ new AuditOperationRejectedException("not supported"));
+ });
+ final S3ARetryPolicy retryPolicy = new S3ARetryPolicy(new Configuration(false));
+ final RetryPolicy.RetryAction action = retryPolicy.shouldRetry(ex, 0, 0, true);
+ Assertions.assertThat(action.action)
+ .describedAs("retry policy %s for %s", action, ex)
+ .isEqualTo(RetryPolicy.RetryAction.RetryDecision.FAIL);
+ }
+
/**
* Create a no-op auditor.
*/
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
index aeea195aac..e1440a497b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
@@ -42,6 +43,7 @@
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.extractXAttrLongValue;
/**
@@ -67,6 +69,8 @@ public class ITestS3AHugeMagicCommits extends AbstractSTestS3AHugeFiles {
/** The file with the JSON data about the commit. */
private Path pendingDataFile;
+ private Path finalDirectory;
+
/**
* Use fast upload on disk.
* @return the upload buffer mechanism.
@@ -84,13 +88,18 @@ public String getTestSuiteName() {
return "ITestS3AHugeMagicCommits";
}
+ @Override
+ protected boolean expectImmediateFileVisibility() {
+ return false;
+ }
+
@Override
public void setup() throws Exception {
super.setup();
CommitUtils.verifyIsMagicCommitFS(getFileSystem());
// set up the paths for the commit operation
- Path finalDirectory = new Path(getScaleTestDir(), "commit");
+ finalDirectory = new Path(getScaleTestDir(), "commit");
magicDir = new Path(finalDirectory, MAGIC);
jobDir = new Path(magicDir, "job_001");
String filename = "commit.bin";
@@ -120,6 +129,15 @@ public void test_030_postCreationAssertions() throws Throwable {
FileStatus status = fs.getFileStatus(magicOutputFile);
assertEquals("Non empty marker file " + status,
0, status.getLen());
+ final Map xAttr = fs.getXAttrs(magicOutputFile);
+ final String header = XA_MAGIC_MARKER;
+ Assertions.assertThat(xAttr)
+ .describedAs("Header %s of %s", header, magicOutputFile)
+ .containsKey(header);
+ Assertions.assertThat(extractXAttrLongValue(xAttr.get(header)))
+ .describedAs("Decoded header %s of %s", header, magicOutputFile)
+ .get()
+ .isEqualTo(getFilesize());
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
CommitOperations operations = new CommitOperations(fs);
Path destDir = getHugefile().getParent();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index f8d47011de..c4949375b7 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -22,7 +22,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntFunction;
@@ -51,17 +50,29 @@
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.Progressable;
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_COMPLETED;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
-import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS;
/**
* Scale test which creates a huge file.
@@ -76,9 +87,10 @@
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
+
private static final Logger LOG = LoggerFactory.getLogger(
AbstractSTestS3AHugeFiles.class);
- public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB;
+ public static final int DEFAULT_UPLOAD_BLOCKSIZE = 128 * _1KB;
private Path scaleTestDir;
private Path hugefile;
@@ -94,6 +106,7 @@ public void setup() throws Exception {
scaleTestDir = new Path(getTestPath(), getTestSuiteName());
hugefile = new Path(scaleTestDir, "hugefile");
hugefileRenamed = new Path(scaleTestDir, "hugefileRenamed");
+ uploadBlockSize = uploadBlockSize();
filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE,
DEFAULT_HUGE_FILESIZE);
}
@@ -117,12 +130,22 @@ protected Configuration createScaleConfiguration() {
partitionSize = (int) getTestPropertyBytes(conf,
KEY_HUGE_PARTITION_SIZE,
DEFAULT_HUGE_PARTITION_SIZE);
- assertTrue("Partition size too small: " + partitionSize,
- partitionSize >= MULTIPART_MIN_SIZE);
+ Assertions.assertThat(partitionSize)
+ .describedAs("Partition size set in " + KEY_HUGE_PARTITION_SIZE)
+ .isGreaterThanOrEqualTo(MULTIPART_MIN_SIZE);
+ removeBaseAndBucketOverrides(conf,
+ SOCKET_SEND_BUFFER,
+ SOCKET_RECV_BUFFER,
+ MIN_MULTIPART_THRESHOLD,
+ MULTIPART_SIZE,
+ USER_AGENT_PREFIX,
+ FAST_UPLOAD_BUFFER);
+
conf.setLong(SOCKET_SEND_BUFFER, _1MB);
conf.setLong(SOCKET_RECV_BUFFER, _1MB);
conf.setLong(MIN_MULTIPART_THRESHOLD, partitionSize);
conf.setInt(MULTIPART_SIZE, partitionSize);
+ conf.setInt(AWS_S3_VECTOR_ACTIVE_RANGE_READS, 32);
conf.set(USER_AGENT_PREFIX, "STestS3AHugeFileCreate");
conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName());
S3ATestUtils.disableFilesystemCaching(conf);
@@ -180,6 +203,7 @@ public void test_010_CreateHugeFile() throws IOException {
IOStatistics iostats = fs.getIOStatistics();
String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol();
+ String multipartBlockUploads = Statistic.MULTIPART_UPLOAD_PART_PUT.getSymbol();
String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol();
Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE;
Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING;
@@ -192,13 +216,8 @@ public void test_010_CreateHugeFile() throws IOException {
true,
uploadBlockSize,
progress)) {
- try {
- streamStatistics = getOutputStreamStatistics(out);
- } catch (ClassCastException e) {
- LOG.info("Wrapped output stream is not block stream: {}",
- out.getWrappedStream());
- streamStatistics = null;
- }
+ streamStatistics = requireNonNull(getOutputStreamStatistics(out),
+ () -> "No iostatistics in " + out);
for (long block = 1; block <= blocks; block++) {
out.write(data);
@@ -222,6 +241,13 @@ public void test_010_CreateHugeFile() throws IOException {
writtenMB / elapsedTime));
}
}
+ if (!expectMultipartUpload()) {
+ // it is required that no data has uploaded at this point on a
+ // non-multipart upload
+ Assertions.assertThat(progress.getUploadEvents())
+ .describedAs("upload events in %s", progress)
+ .isEqualTo(0);
+ }
// now close the file
LOG.info("Closing stream {}", out);
LOG.info("Statistics : {}", streamStatistics);
@@ -235,34 +261,51 @@ public void test_010_CreateHugeFile() throws IOException {
filesizeMB, uploadBlockSize);
logFSState();
bandwidth(timer, filesize);
- LOG.info("Statistics after stream closed: {}", streamStatistics);
- LOG.info("IOStatistics after upload: {}",
- demandStringifyIOStatistics(iostats));
- long putRequestCount = lookupCounterStatistic(iostats, putRequests);
+ final IOStatistics streamIOstats = streamStatistics.getIOStatistics();
+ LOG.info("Stream IOStatistics after stream closed: {}",
+ ioStatisticsToPrettyString(streamIOstats));
+
+ LOG.info("FileSystem IOStatistics after upload: {}",
+ ioStatisticsToPrettyString(iostats));
+ final String requestKey;
long putByteCount = lookupCounterStatistic(iostats, putBytes);
- Assertions.assertThat(putRequestCount)
- .describedAs("Put request count from filesystem stats %s",
- iostats)
- .isGreaterThan(0);
+ long putRequestCount;
+
+ if (expectMultipartUpload()) {
+ requestKey = multipartBlockUploads;
+ putRequestCount = lookupCounterStatistic(streamIOstats, requestKey);
+ assertThatStatisticCounter(streamIOstats, multipartBlockUploads)
+ .isGreaterThanOrEqualTo(1);
+ verifyStatisticCounterValue(streamIOstats, STREAM_WRITE_BLOCK_UPLOADS, putRequestCount);
+ // non-magic uploads will have completed
+ verifyStatisticCounterValue(streamIOstats, MULTIPART_UPLOAD_COMPLETED.getSymbol(),
+ expectImmediateFileVisibility() ? 1 : 0);
+ } else {
+ // single put
+ requestKey = putRequests;
+ putRequestCount = lookupCounterStatistic(streamIOstats, requestKey);
+ verifyStatisticCounterValue(streamIOstats, putRequests, 1);
+ verifyStatisticCounterValue(streamIOstats, STREAM_WRITE_BLOCK_UPLOADS, 1);
+ verifyStatisticCounterValue(streamIOstats, MULTIPART_UPLOAD_COMPLETED.getSymbol(), 0);
+ }
Assertions.assertThat(putByteCount)
- .describedAs("%s count from filesystem stats %s",
- putBytes, iostats)
+ .describedAs("%s count from stream stats %s",
+ putBytes, streamStatistics)
.isGreaterThan(0);
+
LOG.info("PUT {} bytes in {} operations; {} MB/operation",
putByteCount, putRequestCount,
putByteCount / (putRequestCount * _1MB));
LOG.info("Time per PUT {} nS",
toHuman(timer.nanosPerOperation(putRequestCount)));
verifyStatisticGaugeValue(iostats, putRequestsActive.getSymbol(), 0);
- verifyStatisticGaugeValue(iostats,
- STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol(), 0);
+ verifyStatisticGaugeValue(iostats, STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol(), 0);
+
progress.verifyNoFailures(
"Put file " + fileToCreate + " of size " + filesize);
- if (streamStatistics != null) {
- assertEquals("actively allocated blocks in " + streamStatistics,
- 0, streamStatistics.getBlocksActivelyAllocated());
- }
+ assertEquals("actively allocated blocks in " + streamStatistics,
+ 0, streamStatistics.getBlocksActivelyAllocated());
}
/**
@@ -290,10 +333,45 @@ protected Path getHugefileRenamed() {
return hugefileRenamed;
}
- protected int getUploadBlockSize() {
+ public int getUploadBlockSize() {
return uploadBlockSize;
}
+ /**
+ * Get the desired upload block size for this test run.
+ * @return the block size
+ */
+ protected int uploadBlockSize() {
+ return DEFAULT_UPLOAD_BLOCKSIZE;
+ }
+
+ /**
+ * Get the size of the file.
+ * @return file size
+ */
+ public long getFilesize() {
+ return filesize;
+ }
+
+ /**
+ * Is this expected to be a multipart upload?
+ * Assertions will change if not.
+ * @return true by default.
+ */
+ protected boolean expectMultipartUpload() {
+ return true;
+ }
+
+ /**
+ * Is this expected to be a normal file creation with
+ * the output immediately visible?
+ * Assertions will change if not.
+ * @return true by default.
+ */
+ protected boolean expectImmediateFileVisibility() {
+ return true;
+ }
+
protected int getPartitionSize() {
return partitionSize;
}
@@ -304,6 +382,7 @@ protected int getPartitionSize() {
private final class ProgressCallback implements Progressable,
ProgressListener {
private AtomicLong bytesTransferred = new AtomicLong(0);
+ private AtomicLong uploadEvents = new AtomicLong(0);
private AtomicInteger failures = new AtomicInteger(0);
private final ContractTestUtils.NanoTimer timer;
@@ -339,10 +418,11 @@ public void progressChanged(ProgressEvent progressEvent) {
progressEvent,
writtenMB, elapsedTimeS, writtenMB / elapsedTimeS));
break;
+ case REQUEST_BYTE_TRANSFER_EVENT:
+ uploadEvents.incrementAndGet();
+ break;
default:
- if (eventType.isByteCountEvent()) {
- LOG.debug("Event {}", progressEvent);
- } else {
+ if (!eventType.isByteCountEvent()) {
LOG.info("Event {}", progressEvent);
}
break;
@@ -352,12 +432,29 @@ public void progressChanged(ProgressEvent progressEvent) {
@Override
public String toString() {
String sb = "ProgressCallback{"
- + "bytesTransferred=" + bytesTransferred +
- ", failures=" + failures +
+ + "bytesTransferred=" + bytesTransferred.get() +
+ ", uploadEvents=" + uploadEvents.get() +
+ ", failures=" + failures.get() +
'}';
return sb;
}
+ /**
+ * Get the number of bytes transferred.
+ * @return byte count
+ */
+ private long getBytesTransferred() {
+ return bytesTransferred.get();
+ }
+
+ /**
+ * Get the number of event callbacks.
+ * @return count of byte transferred events.
+ */
+ private long getUploadEvents() {
+ return uploadEvents.get();
+ }
+
private void verifyNoFailures(String operation) {
assertEquals("Failures in " + operation + ": " + this, 0, failures.get());
}
@@ -467,15 +564,42 @@ public void test_045_vectoredIOHugeFile() throws Throwable {
rangeList.add(FileRange.createFileRange(2820861, 156770));
IntFunction allocate = ByteBuffer::allocate;
FileSystem fs = getFileSystem();
- CompletableFuture builder =
- fs.openFile(hugefile).build();
- try (FSDataInputStream in = builder.get()) {
- in.readVectored(rangeList, allocate);
- byte[] readFullRes = new byte[(int)filesize];
+
+ // read into a buffer first
+ // using sequential IO
+
+ int validateSize = (int) Math.min(filesize, 10 * _1MB);
+ byte[] readFullRes;
+ IOStatistics sequentialIOStats, vectorIOStats;
+ try (FSDataInputStream in = fs.openFile(hugefile)
+ .opt(FS_OPTION_OPENFILE_LENGTH, validateSize) // lets us actually force a shorter read
+ .opt(FS_OPTION_OPENFILE_SPLIT_START, 0)
+ .opt(FS_OPTION_OPENFILE_SPLIT_END, validateSize)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY, "sequential")
+ .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize)
+ .build().get();
+ DurationInfo ignored = new DurationInfo(LOG, "Sequential read of %,d bytes",
+ validateSize)) {
+ readFullRes = new byte[validateSize];
in.readFully(0, readFullRes);
+ sequentialIOStats = in.getIOStatistics();
+ }
+
+ // now do a vector IO read
+ try (FSDataInputStream in = fs.openFile(hugefile)
+ .opt(FS_OPTION_OPENFILE_LENGTH, filesize)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY, "vector, random")
+ .build().get();
+ DurationInfo ignored = new DurationInfo(LOG, "Vector Read")) {
+
+ in.readVectored(rangeList, allocate);
// Comparing vectored read results with read fully.
validateVectoredReadResult(rangeList, readFullRes);
+ vectorIOStats = in.getIOStatistics();
}
+
+ LOG.info("Bulk read IOStatistics={}", ioStatisticsToPrettyString(sequentialIOStats));
+ LOG.info("Vector IOStatistics={}", ioStatisticsToPrettyString(vectorIOStats));
}
/**
@@ -493,7 +617,12 @@ public void test_050_readHugeFile() throws Throwable {
byte[] data = new byte[uploadBlockSize];
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
- try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) {
+ try (FSDataInputStream in = fs.openFile(hugefile)
+ .withFileStatus(status)
+ .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+ .build().get();
+ DurationInfo ignored = new DurationInfo(LOG, "Vector Read")) {
for (long block = 0; block < blocks; block++) {
in.readFully(data);
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
index 91ea0c8e62..de903b3d75 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
@@ -260,7 +260,7 @@ public void testMultiPagesListingPerformanceAndCorrectness()
.newPutObjectRequest(fs.pathToKey(file), om,
null, new FailingInputStream());
futures.add(submit(executorService, () ->
- writeOperationHelper.putObject(put, PutObjectOptions.keepingDirs())));
+ writeOperationHelper.putObject(put, PutObjectOptions.keepingDirs(), null)));
}
LOG.info("Waiting for PUTs to complete");
waitForCompletion(futures);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java
deleted file mode 100644
index 08192969e2..0000000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.scale;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.Test;
-
-import org.apache.hadoop.fs.s3a.S3AFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.apache.hadoop.fs.s3a.Constants;
-
-import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
-import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
-import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK;
-import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
-import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
-import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
-import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS;
-import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
-
-/**
- * Test a file upload using a single PUT operation. Multipart uploads will
- * be disabled in the test.
- */
-public class ITestS3AHugeFileUploadSinglePut extends S3AScaleTestBase {
-
- public static final Logger LOG = LoggerFactory.getLogger(
- ITestS3AHugeFileUploadSinglePut.class);
-
- private long fileSize;
-
- @Override
- protected Configuration createScaleConfiguration() {
- Configuration conf = super.createScaleConfiguration();
- removeBucketOverrides(getTestBucketName(conf), conf,
- FAST_UPLOAD_BUFFER,
- IO_CHUNK_BUFFER_SIZE,
- KEY_HUGE_FILESIZE,
- MULTIPART_UPLOADS_ENABLED,
- MULTIPART_SIZE,
- REQUEST_TIMEOUT);
- conf.setBoolean(Constants.MULTIPART_UPLOADS_ENABLED, false);
- fileSize = getTestPropertyBytes(conf, KEY_HUGE_FILESIZE,
- DEFAULT_HUGE_FILESIZE);
- // set a small part size to verify it does not impact block allocation size
- conf.setLong(MULTIPART_SIZE, 10_000);
- conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_DISK);
- conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
- conf.set(REQUEST_TIMEOUT, "1h");
- return conf;
- }
-
- @Test
- public void uploadFileSinglePut() throws IOException {
- LOG.info("Creating file with size : {}", fileSize);
- S3AFileSystem fs = getFileSystem();
- ContractTestUtils.createAndVerifyFile(fs,
- methodPath(), fileSize);
- // Exactly three put requests should be made during the upload of the file
- // First one being the creation of the directory marker
- // Second being the creation of the test file
- // Third being the creation of directory marker on the file delete
- assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol())
- .isEqualTo(3);
- }
-}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java
new file mode 100644
index 0000000000..ed300dba01
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
+import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Use a single PUT for the whole upload/rename/delete workflow; include verification
+ * that the transfer manager will fail fast unless the multipart threshold is huge.
+ */
+public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles {
+
+ /**
+ * Size to ensure MPUs don't happen in transfer manager.
+ */
+ public static final String S_1T = "1T";
+
+ public static final String SINGLE_PUT_REQUEST_TIMEOUT = "1h";
+
+ /**
+ * Always use disk storage.
+ * @return disk block store always.
+ */
+ protected String getBlockOutputBufferName() {
+ return Constants.FAST_UPLOAD_BUFFER_DISK;
+ }
+
+ @Override
+ protected boolean expectMultipartUpload() {
+ return false;
+ }
+
+ /**
+ * Create a configuration without multipart upload,
+ * and a long request timeout to allow for a very slow
+ * PUT in close.
+ * @return the configuration to create the test FS with.
+ */
+ @Override
+ protected Configuration createScaleConfiguration() {
+ Configuration conf = super.createScaleConfiguration();
+ removeBaseAndBucketOverrides(conf,
+ IO_CHUNK_BUFFER_SIZE,
+ MIN_MULTIPART_THRESHOLD,
+ MULTIPART_UPLOADS_ENABLED,
+ MULTIPART_SIZE,
+ REQUEST_TIMEOUT);
+ conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
+ conf.set(MIN_MULTIPART_THRESHOLD, S_1T);
+ conf.set(MULTIPART_SIZE, S_1T);
+ conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
+ conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT);
+ return conf;
+ }
+
+ /**
+ * After the file is created, attempt a rename with an FS
+ * instance with a small multipart threshold;
+ * this MUST be rejected.
+ */
+ @Override
+ public void test_030_postCreationAssertions() throws Throwable {
+ assumeHugeFileExists();
+ final Path hugefile = getHugefile();
+ final Path hugefileRenamed = getHugefileRenamed();
+ describe("renaming %s to %s", hugefile, hugefileRenamed);
+ S3AFileSystem fs = getFileSystem();
+ fs.delete(hugefileRenamed, false);
+ // create a new fs with a small multipart threshold; expect rename failure.
+ final Configuration conf = new Configuration(fs.getConf());
+ conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
+ conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+ S3ATestUtils.disableFilesystemCaching(conf);
+
+ try (FileSystem fs2 = FileSystem.get(fs.getUri(), conf)) {
+ intercept(UnsupportedRequestException.class, () ->
+ fs2.rename(hugefile, hugefileRenamed));
+ }
+ }
+}