diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 193f898fc7..80e07f0ac1 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -384,6 +384,13 @@ ${hadoop.version} + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + test-jar + + org.apache.hadoop hadoop-mapreduce-client-jobclient diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index a9f58e39c6..40aeec0702 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -219,6 +219,67 @@ test + + + org.apache.hadoop + hadoop-mapreduce-client-core + provided + + + + org.apache.hadoop + hadoop-mapreduce-client-core + test + test-jar + + + org.apache.hadoop + hadoop-yarn-server-tests + test + test-jar + + + org.apache.hadoop + hadoop-mapreduce-client-hs + test + + + org.apache.hadoop + hadoop-mapreduce-examples + test + jar + + + + org.apache.hadoop + hadoop-mapreduce-client-app + test + + + org.apache.hadoop + hadoop-mapreduce-client-app + test-jar + test + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + test + test-jar + org.apache.hadoop hadoop-distcp @@ -319,7 +380,7 @@ ${test.build.data}/${surefire.forkNumber} ${test.build.dir}/${surefire.forkNumber} ${hadoop.tmp.dir}/${surefire.forkNumber} - fork-${surefire.forkNumber} + fork-000${surefire.forkNumber} ${fs.azure.scale.test.enabled} ${fs.azure.scale.test.huge.filesize} ${fs.azure.scale.test.huge.partitionsize} @@ -350,7 +411,7 @@ ${test.build.data}/${surefire.forkNumber} ${test.build.dir}/${surefire.forkNumber} ${hadoop.tmp.dir}/${surefire.forkNumber} - fork-${surefire.forkNumber} + fork-000${surefire.forkNumber} ${fs.azure.scale.test.enabled} ${fs.azure.scale.test.huge.filesize} ${fs.azure.scale.test.huge.partitionsize} @@ -392,7 +453,7 @@ - fork-${surefire.forkNumber} + fork-000${surefire.forkNumber} ${fs.azure.scale.test.enabled} ${fs.azure.scale.test.huge.filesize} @@ -482,7 +543,7 @@ ${test.build.data}/${surefire.forkNumber} ${test.build.dir}/${surefire.forkNumber} ${hadoop.tmp.dir}/${surefire.forkNumber} - fork-${surefire.forkNumber} + fork-000${surefire.forkNumber} ${fs.azure.scale.test.enabled} ${fs.azure.scale.test.huge.filesize} ${fs.azure.scale.test.huge.partitionsize} @@ -526,7 +587,7 @@ - fork-${surefire.forkNumber} + fork-000${surefire.forkNumber} ${fs.azure.scale.test.enabled} ${fs.azure.scale.test.timeout} @@ -544,6 +605,7 @@ **/azurebfs/extensions/ITestAbfsDelegationTokens.java **/azurebfs/ITestSmallWriteOptimization.java **/azurebfs/services/ITestReadBufferManager.java + **/azurebfs/commit/*.java @@ -572,7 +634,7 @@ - fork-${surefire.forkNumber} + fork-000${surefire.forkNumber} ${fs.azure.scale.test.enabled} ${fs.azure.scale.test.timeout} @@ -585,6 +647,7 @@ **/azurebfs/extensions/ITestAbfsDelegationTokens.java **/azurebfs/ITestSmallWriteOptimization.java **/azurebfs/services/ITestReadBufferManager.java + **/azurebfs/commit/*.java @@ -634,7 +697,7 @@ ${test.build.data}/${surefire.forkNumber} ${test.build.dir}/${surefire.forkNumber} ${hadoop.tmp.dir}/${surefire.forkNumber} - fork-${surefire.forkNumber} + fork-000${surefire.forkNumber} ${fs.azure.scale.test.enabled} ${fs.azure.scale.test.huge.filesize} ${fs.azure.scale.test.huge.partitionsize} @@ -664,7 +727,7 @@ ${test.build.data}/${surefire.forkNumber} ${test.build.dir}/${surefire.forkNumber} ${hadoop.tmp.dir}/${surefire.forkNumber} - fork-${surefire.forkNumber} + fork-000${surefire.forkNumber} ${fs.azure.scale.test.enabled} ${fs.azure.scale.test.huge.filesize} ${fs.azure.scale.test.huge.partitionsize} @@ -706,7 +769,7 @@ - fork-${surefire.forkNumber} + fork-000${surefire.forkNumber} ${fs.azure.scale.test.enabled} ${fs.azure.scale.test.huge.filesize} diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml index 070c8c1fe8..fd2a7c210e 100644 --- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml +++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml @@ -48,4 +48,7 @@ files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/> + + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 9719da7dc1..fafc30372b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -260,6 +260,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING) private boolean enableAutoThrottling; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT, + MinValue = 0, + DefaultValue = RATE_LIMIT_DEFAULT) + private int rateLimit; + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY, DefaultValue = DEFAULT_FS_AZURE_USER_AGENT_PREFIX) private String userAgentId; @@ -726,6 +731,10 @@ public boolean isAutoThrottlingEnabled() { return this.enableAutoThrottling; } + public int getRateLimit() { + return rateLimit; + } + public String getCustomUserAgentPrefix() { return this.userAgentId; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index ae70b8dc53..46141e7c4a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -27,6 +27,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.file.AccessDeniedException; +import java.time.Duration; import java.util.Hashtable; import java.util.List; import java.util.ArrayList; @@ -42,13 +43,17 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.hadoop.io.IOUtils; +import javax.annotation.Nullable; + import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept; import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator; @@ -94,9 +99,12 @@ import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.store.DataBlocks; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.RateLimiting; +import org.apache.hadoop.util.RateLimitingFactory; import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.LambdaUtils; @@ -143,6 +151,9 @@ public class AzureBlobFileSystem extends FileSystem /** Maximum Active blocks per OutputStream. */ private int blockOutputActiveBlocks; + /** Rate limiting for operations which use it to throttle their IO. */ + private RateLimiting rateLimiting; + @Override public void initialize(URI uri, Configuration configuration) throws IOException { @@ -215,7 +226,7 @@ public void initialize(URI uri, Configuration configuration) } AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled()); - + rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit()); LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri); } @@ -261,7 +272,7 @@ private FSDataInputStream open(final Path path, InputStream inputStream = abfsStore .openFileForRead(qualifiedPath, parameters, statistics, tracingContext); return new FSDataInputStream(inputStream); - } catch(AzureBlobFileSystemException ex) { + } catch (AzureBlobFileSystemException ex) { checkException(path, ex); return null; } @@ -290,8 +301,13 @@ protected CompletableFuture openFileWithOptions( } @Override - public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, - final short replication, final long blockSize, final Progressable progress) throws IOException { + public FSDataOutputStream create(final Path f, + final FsPermission permission, + final boolean overwrite, + final int bufferSize, + final short replication, + final long blockSize, + final Progressable progress) throws IOException { LOG.debug("AzureBlobFileSystem.create path: {} permission: {} overwrite: {} bufferSize: {}", f, permission, @@ -311,7 +327,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi FsPermission.getUMask(getConf()), tracingContext); statIncrement(FILES_CREATED); return new FSDataOutputStream(outputStream, statistics); - } catch(AzureBlobFileSystemException ex) { + } catch (AzureBlobFileSystemException ex) { checkException(f, ex); return null; } @@ -340,8 +356,12 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe @Override @SuppressWarnings("deprecation") - public FSDataOutputStream createNonRecursive(final Path f, final FsPermission permission, - final EnumSet flags, final int bufferSize, final short replication, final long blockSize, + public FSDataOutputStream createNonRecursive(final Path f, + final FsPermission permission, + final EnumSet flags, + final int bufferSize, + final short replication, + final long blockSize, final Progressable progress) throws IOException { // Check if file should be appended or overwritten. Assume that the file @@ -365,7 +385,8 @@ public FSDataOutputStream createNonRecursive(final Path f, } @Override - public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException { + public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) + throws IOException { LOG.debug( "AzureBlobFileSystem.append path: {} bufferSize: {}", f.toString(), @@ -380,7 +401,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr OutputStream outputStream = abfsStore .openFileForWrite(qualifiedPath, statistics, false, tracingContext); return new FSDataOutputStream(outputStream, statistics); - } catch(AzureBlobFileSystemException ex) { + } catch (AzureBlobFileSystemException ex) { checkException(f, ex); return null; } @@ -403,7 +424,7 @@ public boolean rename(final Path src, final Path dst) throws IOException { fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat, listener); // rename under same folder; - if(makeQualified(parentFolder).equals(qualifiedDstPath)) { + if (makeQualified(parentFolder).equals(qualifiedDstPath)) { return tryGetFileStatus(qualifiedSrcPath, tracingContext) != null; } @@ -438,24 +459,99 @@ public boolean rename(final Path src, final Path dst) throws IOException { qualifiedDstPath = makeQualified(adjustedDst); - abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext); + abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null); return true; - } catch(AzureBlobFileSystemException ex) { + } catch (AzureBlobFileSystemException ex) { LOG.debug("Rename operation failed. ", ex); checkException( - src, - ex, - AzureServiceErrorCode.PATH_ALREADY_EXISTS, - AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH, - AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND, - AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE, - AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND, - AzureServiceErrorCode.INTERNAL_OPERATION_ABORT); + src, + ex, + AzureServiceErrorCode.PATH_ALREADY_EXISTS, + AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH, + AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND, + AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE, + AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND, + AzureServiceErrorCode.INTERNAL_OPERATION_ABORT); return false; } } + /** + * Private method to create resilient commit support. + * @return a new instance + * @param path destination path + * @throws IOException problem probing store capabilities + * @throws UnsupportedOperationException if the store lacks this support + */ + @InterfaceAudience.Private + public ResilientCommitByRename createResilientCommitSupport(final Path path) + throws IOException { + + if (!hasPathCapability(path, + CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME)) { + throw new UnsupportedOperationException( + "Resilient commit support not available for " + path); + } + return new ResilientCommitByRenameImpl(); + } + + /** + * Resilient commit support. + * Provided as a nested class to avoid contaminating the + * FS instance with too many private methods which end up + * being used widely (as has happened to the S3A FS) + */ + public class ResilientCommitByRenameImpl implements ResilientCommitByRename { + + /** + * Perform the rename. + * This will be rate limited, as well as able to recover + * from rename errors if the etag was passed in. + * @param source path to source file + * @param dest destination of rename. + * @param sourceEtag etag of source file. may be null or empty + * @return the outcome of the operation + * @throws IOException any rename failure which was not recovered from. + */ + public Pair commitSingleFileByRename( + final Path source, + final Path dest, + @Nullable final String sourceEtag) throws IOException { + + LOG.debug("renameFileWithEtag source: {} dest: {} etag {}", source, dest, sourceEtag); + statIncrement(CALL_RENAME); + + trailingPeriodCheck(dest); + Path qualifiedSrcPath = makeQualified(source); + Path qualifiedDstPath = makeQualified(dest); + + TracingContext tracingContext = new TracingContext(clientCorrelationId, + fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat, + listener); + + if (qualifiedSrcPath.equals(qualifiedDstPath)) { + // rename to itself is forbidden + throw new PathIOException(qualifiedSrcPath.toString(), "cannot rename object onto self"); + } + + // acquire one IO permit + final Duration waitTime = rateLimiting.acquire(1); + + try { + final boolean recovered = abfsStore.rename(qualifiedSrcPath, + qualifiedDstPath, tracingContext, sourceEtag); + return Pair.of(recovered, waitTime); + } catch (AzureBlobFileSystemException ex) { + LOG.debug("Rename operation failed. ", ex); + checkException(source, ex); + // never reached + return null; + } + + } + } + @Override public boolean delete(final Path f, final boolean recursive) throws IOException { LOG.debug( @@ -533,7 +629,7 @@ private void incrementStatistic(AbfsStatistic statistic) { * @throws IllegalArgumentException if the path has a trailing period (.) */ private void trailingPeriodCheck(Path path) throws IllegalArgumentException { - while (!path.isRoot()){ + while (!path.isRoot()) { String pathToString = path.toString(); if (pathToString.length() != 0) { if (pathToString.charAt(pathToString.length() - 1) == '.') { @@ -541,8 +637,7 @@ private void trailingPeriodCheck(Path path) throws IllegalArgumentException { "ABFS does not allow files or directories to end with a dot."); } path = path.getParent(); - } - else { + } else { break; } } @@ -601,10 +696,10 @@ public synchronized void close() throws IOException { @Override public FileStatus getFileStatus(final Path f) throws IOException { - TracingContext tracingContext = new TracingContext(clientCorrelationId, - fileSystemId, FSOperationType.GET_FILESTATUS, tracingHeaderFormat, - listener); - return getFileStatus(f, tracingContext); + TracingContext tracingContext = new TracingContext(clientCorrelationId, + fileSystemId, FSOperationType.GET_FILESTATUS, tracingHeaderFormat, + listener); + return getFileStatus(f, tracingContext); } private FileStatus getFileStatus(final Path path, @@ -615,7 +710,7 @@ private FileStatus getFileStatus(final Path path, try { return abfsStore.getFileStatus(qualifiedPath, tracingContext); - } catch(AzureBlobFileSystemException ex) { + } catch (AzureBlobFileSystemException ex) { checkException(path, ex); return null; } @@ -639,7 +734,7 @@ public void breakLease(final Path f) throws IOException { fileSystemId, FSOperationType.BREAK_LEASE, tracingHeaderFormat, listener); abfsStore.breakLease(qualifiedPath, tracingContext); - } catch(AzureBlobFileSystemException ex) { + } catch (AzureBlobFileSystemException ex) { checkException(f, ex); } } @@ -666,7 +761,6 @@ public Path makeQualified(Path path) { return super.makeQualified(path); } - @Override public Path getWorkingDirectory() { return this.workingDir; @@ -689,8 +783,8 @@ public String getScheme() { @Override public Path getHomeDirectory() { return makeQualified(new Path( - FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX - + "/" + abfsStore.getUser())); + FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX + + "/" + abfsStore.getUser())); } /** @@ -714,8 +808,8 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, } final String blobLocationHost = abfsStore.getAbfsConfiguration().getAzureBlockLocationHost(); - final String[] name = { blobLocationHost }; - final String[] host = { blobLocationHost }; + final String[] name = {blobLocationHost}; + final String[] host = {blobLocationHost}; long blockSize = file.getBlockSize(); if (blockSize <= 0) { throw new IllegalArgumentException( @@ -790,15 +884,14 @@ public Void call() throws Exception { } }); } - } - finally { + } finally { executorService.shutdownNow(); } return true; } - /** + /** * Set owner of a path (i.e. a file or a directory). * The parameters owner and group cannot both be null. * @@ -828,9 +921,9 @@ public void setOwner(final Path path, final String owner, final String group) try { abfsStore.setOwner(qualifiedPath, - owner, - group, - tracingContext); + owner, + group, + tracingContext); } catch (AzureBlobFileSystemException ex) { checkException(path, ex); } @@ -847,7 +940,10 @@ public void setOwner(final Path path, final String owner, final String group) * @throws IllegalArgumentException If name is null or empty or if value is null */ @Override - public void setXAttr(final Path path, final String name, final byte[] value, final EnumSet flag) + public void setXAttr(final Path path, + final String name, + final byte[] value, + final EnumSet flag) throws IOException { LOG.debug("AzureBlobFileSystem.setXAttr path: {}", path); @@ -971,7 +1067,7 @@ public void modifyAclEntries(final Path path, final List aclSpec) if (!getIsNamespaceEnabled(tracingContext)) { throw new UnsupportedOperationException( "modifyAclEntries is only supported by storage accounts with the " - + "hierarchical namespace enabled."); + + "hierarchical namespace enabled."); } if (aclSpec == null || aclSpec.isEmpty()) { @@ -1006,7 +1102,7 @@ public void removeAclEntries(final Path path, final List aclSpec) if (!getIsNamespaceEnabled(tracingContext)) { throw new UnsupportedOperationException( "removeAclEntries is only supported by storage accounts with the " - + "hierarchical namespace enabled."); + + "hierarchical namespace enabled."); } if (aclSpec == null || aclSpec.isEmpty()) { @@ -1038,7 +1134,7 @@ public void removeDefaultAcl(final Path path) throws IOException { if (!getIsNamespaceEnabled(tracingContext)) { throw new UnsupportedOperationException( "removeDefaultAcl is only supported by storage accounts with the " - + "hierarchical namespace enabled."); + + "hierarchical namespace enabled."); } Path qualifiedPath = makeQualified(path); @@ -1068,7 +1164,7 @@ public void removeAcl(final Path path) throws IOException { if (!getIsNamespaceEnabled(tracingContext)) { throw new UnsupportedOperationException( "removeAcl is only supported by storage accounts with the " - + "hierarchical namespace enabled."); + + "hierarchical namespace enabled."); } Path qualifiedPath = makeQualified(path); @@ -1101,7 +1197,7 @@ public void setAcl(final Path path, final List aclSpec) if (!getIsNamespaceEnabled(tracingContext)) { throw new UnsupportedOperationException( "setAcl is only supported by storage accounts with the hierarchical " - + "namespace enabled."); + + "namespace enabled."); } if (aclSpec == null || aclSpec.size() == 0) { @@ -1133,7 +1229,7 @@ public AclStatus getAclStatus(final Path path) throws IOException { if (!getIsNamespaceEnabled(tracingContext)) { throw new UnsupportedOperationException( "getAclStatus is only supported by storage account with the " - + "hierarchical namespace enabled."); + + "hierarchical namespace enabled."); } Path qualifiedPath = makeQualified(path); @@ -1243,7 +1339,7 @@ private FileStatus tryGetFileStatus(final Path f, TracingContext tracingContext) private boolean fileSystemExists() throws IOException { LOG.debug( - "AzureBlobFileSystem.fileSystemExists uri: {}", uri); + "AzureBlobFileSystem.fileSystemExists uri: {}", uri); try { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.TEST_OP, tracingHeaderFormat, listener); @@ -1534,8 +1630,9 @@ public boolean hasPathCapability(final Path path, final String capability) case CommonPathCapabilities.FS_PERMISSIONS: case CommonPathCapabilities.FS_APPEND: case CommonPathCapabilities.ETAGS_AVAILABLE: - case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME: return true; + + case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME: case CommonPathCapabilities.FS_ACLS: return getIsNamespaceEnabled( new TracingContext(clientCorrelationId, fileSystemId, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index f4f8959964..046f9f0b56 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -62,6 +62,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -878,7 +879,22 @@ public void breakLease(final Path path, final TracingContext tracingContext) thr client.breakLease(getRelativePath(path), tracingContext); } - public void rename(final Path source, final Path destination, TracingContext tracingContext) throws + /** + * Rename a file or directory. + * If a source etag is passed in, the operation will attempt to recover + * from a missing source file by probing the destination for + * existence and comparing etags. + * @param source path to source file + * @param destination destination of rename. + * @param tracingContext trace context + * @param sourceEtag etag of source file. may be null or empty + * @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures. + * @return true if recovery was needed and succeeded. + */ + public boolean rename(final Path source, + final Path destination, + final TracingContext tracingContext, + final String sourceEtag) throws AzureBlobFileSystemException { final Instant startAggregate = abfsPerfTracker.getLatencyInstant(); long countAggregate = 0; @@ -898,23 +914,29 @@ public void rename(final Path source, final Path destination, TracingContext tra String sourceRelativePath = getRelativePath(source); String destinationRelativePath = getRelativePath(destination); + // was any operation recovered from? + boolean recovered = false; do { try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) { - AbfsRestOperation op = client - .renamePath(sourceRelativePath, destinationRelativePath, - continuation, tracingContext); + final Pair pair = + client.renamePath(sourceRelativePath, destinationRelativePath, + continuation, tracingContext, sourceEtag); + + AbfsRestOperation op = pair.getLeft(); perfInfo.registerResult(op.getResult()); continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); perfInfo.registerSuccess(true); countAggregate++; shouldContinue = continuation != null && !continuation.isEmpty(); - + // update the recovery flag. + recovered |= pair.getRight(); if (!shouldContinue) { perfInfo.registerAggregates(startAggregate, countAggregate); } } } while (shouldContinue); + return recovered; } public void delete(final Path path, final boolean recursive, @@ -1932,7 +1954,7 @@ boolean areLeasesFreed() { * @param result response to process. * @return the quote-unwrapped etag. */ - private static String extractEtagHeader(AbfsHttpOperation result) { + public static String extractEtagHeader(AbfsHttpOperation result) { String etag = result.getResponseHeader(HttpHeaderConfigurations.ETAG); if (etag != null) { // strip out any wrapper "" quotes which come back, for consistency with diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java new file mode 100644 index 0000000000..efba9244af --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import java.io.IOException; +import java.time.Duration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem; + +/** + * Extension of StoreOperationsThroughFileSystem with ABFS awareness. + * Purely for use by jobs committing work through the manifest committer. + * The {@link AzureManifestCommitterFactory} will configure + * this as the binding to the FS. + * + * ADLS Gen2 stores support etag-recovery on renames, but not WASB + * stores. + */ +@InterfaceAudience.LimitedPrivate("mapreduce") +@InterfaceStability.Unstable +public class AbfsManifestStoreOperations extends + ManifestStoreOperationsThroughFileSystem { + + private static final Logger LOG = LoggerFactory.getLogger( + AbfsManifestStoreOperations.class); + + /** + * Classname, which can be declared in jpb configurations. + */ + public static final String NAME = AbfsManifestStoreOperations.class.getName(); + + /** + * Resilient rename calls; only available on an ADLS Gen2 store. + * Will be null after binding if the FS isn't compatible. + */ + private ResilientCommitByRename resilientCommitByRename; + + @Override + public AzureBlobFileSystem getFileSystem() { + return (AzureBlobFileSystem) super.getFileSystem(); + } + + /** + * Bind to the store. + * + * @param filesystem FS. + * @param path path to work under + * @throws IOException binding problems. + */ + @Override + public void bindToFileSystem(FileSystem filesystem, Path path) throws IOException { + if (!(filesystem instanceof AzureBlobFileSystem)) { + throw new PathIOException(path.toString(), + "Not an abfs filesystem: " + filesystem.getClass()); + } + super.bindToFileSystem(filesystem, path); + try { + resilientCommitByRename = getFileSystem().createResilientCommitSupport(path); + LOG.debug("Bonded to filesystem with resilient commits under path {}", path); + } catch (UnsupportedOperationException e) { + LOG.debug("No resilient commit support under path {}", path); + } + } + + @Override + public boolean storePreservesEtagsThroughRenames(final Path path) { + return true; + } + + /** + * Resilient commits available on hierarchical stores. + * @return true if the FS can use etags on renames. + */ + @Override + public boolean storeSupportsResilientCommit() { + return resilientCommitByRename != null; + } + + /** + * Commit a file through an internal ABFS operation. + * If resilient commit is unavailable, invokes the superclass, which + * will raise an UnsupportedOperationException + * @param entry entry to commit + * @return the outcome + * @throws IOException any failure in resilient commit. + * @throws UnsupportedOperationException if not available. + */ + @Override + public CommitFileResult commitFile(final FileEntry entry) throws IOException { + + if (resilientCommitByRename != null) { + final Pair result = + resilientCommitByRename.commitSingleFileByRename( + entry.getSourcePath(), + entry.getDestPath(), + entry.getEtag()); + return CommitFileResult.fromResilientCommit(result.getLeft(), + result.getRight()); + } else { + return super.commitFile(entry); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java new file mode 100644 index 0000000000..b760fa7a4a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory; + +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS; + +/** + * A Committer for the manifest committer which performs all bindings needed + * to work best with abfs. + * This includes, at a minimum, switching to the abfs-specific manifest store operations. + * + * This classname is referenced in configurations, so MUST NOT change. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class AzureManifestCommitterFactory extends ManifestCommitterFactory { + + /** + * Classname, which can be declared in job configurations. + */ + public static final String NAME = ManifestCommitterFactory.class.getName(); + + @Override + public ManifestCommitter createOutputCommitter(final Path outputPath, + final TaskAttemptContext context) throws IOException { + final Configuration conf = context.getConfiguration(); + // use ABFS Store operations + conf.set(OPT_STORE_OPERATIONS_CLASS, + AbfsManifestStoreOperations.NAME); + return super.createOutputCommitter(outputPath, context); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java new file mode 100644 index 0000000000..2e91392a66 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Duration; +import javax.annotation.Nullable; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; + +/** + * API exclusively for committing files. + * + * This is only for use by (@link {@link AbfsManifestStoreOperations}, + * and is intended to be implemented by ABFS. + * To ensure that there is no need to add mapreduce JARs to the + * classpath just to work with ABFS, this interface + * MUST NOT refer to anything in the + * {@code org.apache.hadoop.mapreduce} package. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface ResilientCommitByRename extends IOStatisticsSource { + + /** + * Rename source file to dest path *Exactly*; no subdirectory games here. + * if the method does not raise an exception,then + * the data at dest is the data which was at source. + * + * Requirements + * + *
+   *   exists(FS, source) else raise FileNotFoundException
+   *   source != dest else raise PathIOException
+   *   not exists(FS, dest)
+   *   isDir(FS, dest.getParent)
+   * 
+ *
    + *
  1. source != dest else raise PathIOException
  2. + *
  3. source must exist else raise FileNotFoundException
  4. + *
  5. source must exist and be a file
  6. + *
  7. dest must not exist;
  8. + *
  9. dest.getParent() must be a dir
  10. + *
  11. if sourceEtag is non-empty, it MAY be used to qualify/validate the rename.
  12. + *
+ * + * The outcome of the operation is undefined if source is not a file, dest exists, + * dest.getParent() doesn't exist/is a file. + * That is: implementations SHOULD assume that the code calling this method has + * set up the destination directory tree and is only invoking this call on a file. + * Accordingly: implementations MAY skip validation checks + * + * Post Conditions on a successful operation: + *
+   * FS' where:
+   *     not exists(FS', source)
+   *     and exists(FS', dest)
+   *     and data(FS', dest) == data (FS, source)
+   * 
+ * This is exactly the same outcome as `FileSystem.rename()` when the same preconditions + * are met. This API call simply restricts the operation to file rename with strict + * conditions, (no need to be 'clever' about dest path calculation) and the ability + * to pass in etags, modtimes and file status values. + * + * @param source path to source file + * @param dest destination of rename. + * @param sourceEtag etag of source file. may be null or empty + * @return true if recovery was needed. + * @throws FileNotFoundException source file not found + * @throws PathIOException failure, including source and dest being the same path + * @throws IOException any other exception + */ + Pair commitSingleFileByRename( + Path source, + Path dest, + @Nullable String sourceEtag) throws IOException; + + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java new file mode 100644 index 0000000000..3567377350 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Support for manifest committer. + * Unless otherwise stated: classes are private. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.fs.azurebfs.commit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 12beb5a9bb..9d3b2d5e82 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -220,6 +220,9 @@ public final class ConfigurationKeys { /** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */ public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track"; + /** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */ + public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit"; + public static String accountProperty(String property, String account) { return property + "." + account; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index f58c61e890..63d62a33b1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -133,5 +133,10 @@ public final class FileSystemConfigurations { public static final String DATA_BLOCKS_BUFFER_DEFAULT = DATA_BLOCKS_BUFFER_DISK; + /** + * IO rate limit. Value: {@value} + */ + public static final int RATE_LIMIT_DEFAULT = 10_000; + private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 08142a17a8..b701037d0f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -51,6 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; @@ -67,6 +68,8 @@ import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.util.concurrent.HadoopExecutors; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM; @@ -478,8 +481,30 @@ public AbfsRestOperation breakLease(final String path, return op; } - public AbfsRestOperation renamePath(String source, final String destination, - final String continuation, TracingContext tracingContext) + + /** + * Rename a file or directory. + * If a source etag is passed in, the operation will attempt to recover + * from a missing source file by probing the destination for + * existence and comparing etags. + * The second value in the result will be true to indicate that this + * took place. + * As rename recovery is only attempted if the source etag is non-empty, + * in normal rename operations rename recovery will never happen. + * @param source path to source file + * @param destination destination of rename. + * @param continuation continuation. + * @param tracingContext trace context + * @param sourceEtag etag of source file. may be null or empty + * @return pair of (the rename operation, flag indicating recovery took place) + * @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures. + */ + public Pair renamePath( + final String source, + final String destination, + final String continuation, + final TracingContext tracingContext, + final String sourceEtag) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); @@ -505,9 +530,73 @@ public AbfsRestOperation renamePath(String source, final String destination, HTTP_METHOD_PUT, url, requestHeaders); - // no attempt at recovery using timestamps as it was not reliable. - op.execute(tracingContext); - return op; + try { + op.execute(tracingContext); + return Pair.of(op, false); + } catch (AzureBlobFileSystemException e) { + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw e; + } + boolean etagCheckSucceeded = renameIdempotencyCheckOp( + source, + sourceEtag, op, destination, tracingContext); + if (!etagCheckSucceeded) { + // idempotency did not return different result + // throw back the exception + throw e; + } + return Pair.of(op, true); + } + } + + /** + * Check if the rename request failure is post a retry and if earlier rename + * request might have succeeded at back-end. + * + * If a source etag was passed in, and the error was 404, get the + * etag of any file at the destination. + * If it matches the source etag, then the rename is considered + * a success. + * Exceptions raised in the probe of the destination are swallowed, + * so that they do not interfere with the original rename failures. + * @param source source path + * @param op Rename request REST operation response with non-null HTTP response + * @param destination rename destination path + * @param sourceEtag etag of source file. may be null or empty + * @param tracingContext Tracks identifiers for request header + * @return true if the file was successfully copied + */ + public boolean renameIdempotencyCheckOp( + final String source, + final String sourceEtag, + final AbfsRestOperation op, + final String destination, + TracingContext tracingContext) { + Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response"); + + if ((op.isARetriedRequest()) + && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) + && isNotEmpty(sourceEtag)) { + + // Server has returned HTTP 404, which means rename source no longer + // exists. Check on destination status and if its etag matches + // that of the source, consider it to be a success. + LOG.debug("rename {} to {} failed, checking etag of destination", + source, destination); + + try { + final AbfsRestOperation destStatusOp = getPathStatus(destination, + false, tracingContext); + final AbfsHttpOperation result = destStatusOp.getResult(); + + return result.getStatusCode() == HttpURLConnection.HTTP_OK + && sourceEtag.equals(extractEtagHeader(result)); + } catch (AzureBlobFileSystemException ignored) { + // GetFileStatus on the destination failed, the rename did not take place + } + } + return false; } public AbfsRestOperation append(final String path, final byte[] buffer, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java index bb1ec9e4a3..e3adc59afa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java @@ -48,9 +48,15 @@ public String getStorageAccountKey(String accountName, Configuration rawConfig) // Validating the key. validateStorageAccountKey(key); } catch (IllegalAccessException | InvalidConfigurationValueException e) { - throw new KeyProviderException("Failure to initialize configuration", e); + LOG.debug("Failure to retrieve storage account key for {}", accountName, + e); + throw new KeyProviderException("Failure to initialize configuration for " + + accountName + + " key =\"" + key + "\"" + + ": " + e, e); } catch(IOException ioe) { - LOG.warn("Unable to get key from credential providers. {}", ioe); + LOG.warn("Unable to get key for {} from credential providers. {}", + accountName, ioe, ioe); } return key; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 56d553819f..4a5507526c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -271,11 +271,12 @@ protected void createFilesystemForSASTests() throws Exception { // The SAS tests do not have permission to create a filesystem // so first create temporary instance of the filesystem using SharedKey // then re-use the filesystem it creates with SAS auth instead of SharedKey. - AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig); - ContractTestUtils.assertPathExists(tempFs, "This path should exist", - new Path("/")); - abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name()); - usingFilesystemForSASTests = true; + try (AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig)){ + ContractTestUtils.assertPathExists(tempFs, "This path should exist", + new Path("/")); + abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name()); + usingFilesystemForSASTests = true; + } } public AzureBlobFileSystem getFileSystem() throws IOException { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index b0e82444af..5bd6eaff42 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -32,7 +32,10 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; +import org.apache.hadoop.fs.statistics.IOStatisticsLogging; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; @@ -76,13 +79,19 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { new Random().nextBytes(b); Path testPath = path(TEST_PATH); - try (FSDataOutputStream stream = fs.create(testPath)) { + FSDataOutputStream stream = fs.create(testPath); + try { stream.write(b); + } finally{ + stream.close(); } + IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream); final byte[] readBuffer = new byte[2 * bufferSize]; int result; + IOStatisticsSource statisticsSource = null; try (FSDataInputStream inputStream = fs.open(testPath)) { + statisticsSource = inputStream; ((AbfsInputStream) inputStream.getWrappedStream()).registerListener( new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.READ, true, 0, @@ -100,6 +109,8 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { inputStream.seek(0); result = inputStream.read(readBuffer, 0, bufferSize); } + IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource); + assertNotEquals("data read in final read()", -1, result); assertArrayEquals(readBuffer, b); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java index ea9fba6257..965e02a0a3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java @@ -401,7 +401,8 @@ public void testSignatureMask() throws Exception { fs.create(new Path(src)).close(); AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient() .renamePath(src, "/testABC" + "/abc.txt", null, - getTestTracingContext(fs, false)); + getTestTracingContext(fs, false), null) + .getLeft(); AbfsHttpOperation result = abfsHttpRestOperation.getResult(); String url = result.getMaskedUrl(); String encodedUrl = result.getMaskedEncodedUrl(); @@ -418,7 +419,7 @@ public void testSignatureMaskOnExceptionMessage() throws Exception { intercept(IOException.class, "sig=XXXX", () -> getFileSystem().getAbfsClient() .renamePath("testABC/test.xt", "testABC/abc.txt", null, - getTestTracingContext(getFileSystem(), false))); + getTestTracingContext(getFileSystem(), false), null)); } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java index 2198a6ab35..02260310bb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java @@ -526,7 +526,8 @@ private void testRenamePath(final boolean isWithCPK) throws Exception { AbfsClient abfsClient = fs.getAbfsClient(); AbfsRestOperation abfsRestOperation = abfsClient .renamePath(testFileName, newName, null, - getTestTracingContext(fs, false)); + getTestTracingContext(fs, false), null) + .getLeft(); assertCPKHeaders(abfsRestOperation, false); assertNoCPKResponseHeadersPresent(abfsRestOperation); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java new file mode 100644 index 0000000000..8160cdc64c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding; + +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS; + +/** + * Helper methods for committer tests on ABFS. + */ +final class AbfsCommitTestHelper { + private AbfsCommitTestHelper() { + } + + /** + * Prepare the test configuration. + * @param contractTestBinding test binding + * @return an extracted and patched configuration. + */ + static Configuration prepareTestConfiguration( + ABFSContractTestBinding contractTestBinding) { + final Configuration conf = + contractTestBinding.getRawConfiguration(); + + // use ABFS Store operations + conf.set(OPT_STORE_OPERATIONS_CLASS, + AbfsManifestStoreOperations.NAME); + + return conf; + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java new file mode 100644 index 0000000000..55752055f0 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azure.integration.AzureTestConstants; +import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding; +import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.util.DurationInfo; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled; +import static org.apache.hadoop.io.IOUtils.closeStream; + +/** + * Tests which create a yarn minicluster. + * These are all considered scale tests; the probe for + * scale tests being enabled is executed before the cluster + * is set up to avoid wasting time on non-scale runs. + */ +public abstract class AbstractAbfsClusterITest extends + AbstractManifestCommitterTest { + + public static final int NO_OF_NODEMANAGERS = 2; + + private final ABFSContractTestBinding binding; + + + /** + * The static cluster binding with the lifecycle of this test; served + * through instance-level methods for sharing across methods in the + * suite. + */ + @SuppressWarnings("StaticNonFinalField") + private static ClusterBinding clusterBinding; + + protected AbstractAbfsClusterITest() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + protected int getTestTimeoutMillis() { + return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS; + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + requireScaleTestsEnabled(); + if (getClusterBinding() == null) { + clusterBinding = demandCreateClusterBinding(); + } + assertNotNull("cluster is not bound", getClusterBinding()); + } + + @AfterClass + public static void teardownClusters() throws IOException { + terminateCluster(clusterBinding); + clusterBinding = null; + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, binding.isSecureMode()); + } + + @Override + protected Configuration createConfiguration() { + return AbfsCommitTestHelper.prepareTestConfiguration(binding); + } + + /** + * This is the cluster binding which every subclass must create. + */ + protected static final class ClusterBinding { + + private String clusterName; + + private final MiniMRYarnCluster yarn; + + public ClusterBinding( + final String clusterName, + final MiniMRYarnCluster yarn) { + this.clusterName = clusterName; + this.yarn = requireNonNull(yarn); + } + + + /** + * Get the cluster FS, which will either be HDFS or the local FS. + * @return a filesystem. + * @throws IOException failure + */ + public FileSystem getClusterFS() throws IOException { + return FileSystem.getLocal(yarn.getConfig()); + } + + public MiniMRYarnCluster getYarn() { + return yarn; + } + + public Configuration getConf() { + return getYarn().getConfig(); + } + + public String getClusterName() { + return clusterName; + } + + public void terminate() { + closeStream(getYarn()); + } + } + + /** + * Create the cluster binding. + * The configuration will be patched by propagating down options + * from the maven build (S3Guard binding etc) and turning off unwanted + * YARN features. + * + * If an HDFS cluster is requested, + * the HDFS and YARN clusters will share the same configuration, so + * the HDFS cluster binding is implicitly propagated to YARN. + * If one is not requested, the local filesystem is used as the cluster FS. + * @param conf configuration to start with. + * @return the cluster binding. + * @throws IOException failure. + */ + protected static ClusterBinding createCluster( + final JobConf conf) throws IOException { + try (DurationInfo d = new DurationInfo(LOG, "Creating YARN MiniCluster")) { + conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false); + // create a unique cluster name based on the current time in millis. + String timestamp = LocalDateTime.now().format( + DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS")); + String clusterName = "yarn-" + timestamp; + MiniMRYarnCluster yarnCluster = + new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS); + yarnCluster.init(conf); + yarnCluster.start(); + return new ClusterBinding(clusterName, yarnCluster); + } + } + + /** + * Terminate the cluster if it is not null. + * @param cluster the cluster + */ + protected static void terminateCluster(ClusterBinding cluster) { + if (cluster != null) { + cluster.terminate(); + } + } + + /** + * Get the cluster binding for this subclass. + * @return the cluster binding + */ + protected ClusterBinding getClusterBinding() { + return clusterBinding; + } + + protected MiniMRYarnCluster getYarn() { + return getClusterBinding().getYarn(); + } + + + /** + * We stage work into a temporary directory rather than directly under + * the user's home directory, as that is often rejected by CI test + * runners. + */ + @Rule + public final TemporaryFolder stagingFilesDir = new TemporaryFolder(); + + + /** + * binding on demand rather than in a BeforeClass static method. + * Subclasses can override this to change the binding options. + * @return the cluster binding + */ + protected ClusterBinding demandCreateClusterBinding() throws Exception { + return createCluster(new JobConf()); + } + + /** + * Create a job configuration. + * This creates a new job conf from the yarn + * cluster configuration then calls + * {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized. + * @return the new job configuration. + * @throws IOException failure + */ + protected JobConf newJobConf() throws IOException { + JobConf jobConf = new JobConf(getYarn().getConfig()); + jobConf.addResource(getConfiguration()); + applyCustomConfigOptions(jobConf); + return jobConf; + } + + /** + * Patch the (job) configuration for this committer. + * @param jobConf configuration to patch + * @return a configuration which will run this configuration. + */ + protected Configuration patchConfigurationForCommitter( + final Configuration jobConf) { + enableManifestCommitter(jobConf); + return jobConf; + } + + /** + * Override point to let implementations tune the MR Job conf. + * @param jobConf configuration + */ + protected void applyCustomConfigOptions(JobConf jobConf) throws IOException { + + } + + + /** + * Assume that scale tests are enabled. + */ + protected void requireScaleTestsEnabled() { + assumeScaleTestsEnabled(getConfiguration()); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java new file mode 100644 index 0000000000..a597c35376 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding; +import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCleanupStage; + +/** + * Cleanup logic on ABFS. + */ +public class ITestAbfsCleanupStage extends TestCleanupStage { + + private final ABFSContractTestBinding binding; + + public ITestAbfsCleanupStage() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + } + + @Override + protected Configuration createConfiguration() { + return AbfsCommitTestHelper.prepareTestConfiguration(binding); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, binding.isSecureMode()); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java new file mode 100644 index 0000000000..a0aaec8532 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding; +import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCommitTaskStage; + +/** + * ABFS storage test of task committer. + */ +public class ITestAbfsCommitTaskStage extends TestCommitTaskStage { + + private final ABFSContractTestBinding binding; + + public ITestAbfsCommitTaskStage() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + } + + @Override + protected Configuration createConfiguration() { + return AbfsCommitTestHelper.prepareTestConfiguration(binding); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, binding.isSecureMode()); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java new file mode 100644 index 0000000000..6621b80da0 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding; +import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCreateOutputDirectoriesStage; + +/** + * ABFS storage test of directory creation. + */ +public class ITestAbfsCreateOutputDirectoriesStage extends TestCreateOutputDirectoriesStage { + + private final ABFSContractTestBinding binding; + + public ITestAbfsCreateOutputDirectoriesStage() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + } + + @Override + protected Configuration createConfiguration() { + return AbfsCommitTestHelper.prepareTestConfiguration(binding); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, binding.isSecureMode()); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java new file mode 100644 index 0000000000..4e4c4f5996 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import java.io.IOException; +import java.util.List; + +import org.assertj.core.api.Assertions; +import org.junit.FixMethodOrder; +import org.junit.runners.MethodSorters; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding; +import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestJobThroughManifestCommitter; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations; + +import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration; + +/** + * Test the Manifest committer stages against ABFS. + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ITestAbfsJobThroughManifestCommitter + extends TestJobThroughManifestCommitter { + + private final ABFSContractTestBinding binding; + + public ITestAbfsJobThroughManifestCommitter() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + } + + @Override + protected Configuration createConfiguration() { + return enableManifestCommitter(prepareTestConfiguration(binding)); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, binding.isSecureMode()); + } + + @Override + protected boolean shouldDeleteTestRootAtEndOfTestRun() { + return true; + } + + /** + * Add read of manifest and validate of output's etags. + * @param attemptId attempt ID + * @param files files which were created. + * @param manifest manifest + * @throws IOException failure + */ + @Override + protected void validateTaskAttemptManifest(String attemptId, + List files, + TaskManifest manifest) throws IOException { + super.validateTaskAttemptManifest(attemptId, files, manifest); + final List commit = manifest.getFilesToCommit(); + final ManifestStoreOperations operations = getStoreOperations(); + for (FileEntry entry : commit) { + Assertions.assertThat(entry.getEtag()) + .describedAs("Etag of %s", entry) + .isNotEmpty(); + final FileStatus sourceStatus = operations.getFileStatus(entry.getSourcePath()); + final String etag = ManifestCommitterSupport.getEtag(sourceStatus); + Assertions.assertThat(etag) + .describedAs("Etag of %s", sourceStatus) + .isEqualTo(entry.getEtag()); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java new file mode 100644 index 0000000000..acd693e39a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding; +import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestLoadManifestsStage; + +/** + * ABFS storage test of saving and loading a large number + * of manifests. + */ +public class ITestAbfsLoadManifestsStage extends TestLoadManifestsStage { + + private final ABFSContractTestBinding binding; + + public ITestAbfsLoadManifestsStage() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + } + + @Override + protected Configuration createConfiguration() { + return AbfsCommitTestHelper.prepareTestConfiguration(binding); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, binding.isSecureMode()); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java new file mode 100644 index 0000000000..aac06f952d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding; +import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestManifestCommitProtocol; + +import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration; + +/** + * Test the Manifest protocol against ABFS. + */ +public class ITestAbfsManifestCommitProtocol extends + TestManifestCommitProtocol { + + private final ABFSContractTestBinding binding; + + public ITestAbfsManifestCommitProtocol() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + } + + @Override + protected Configuration createConfiguration() { + return enableManifestCommitter(prepareTestConfiguration(binding)); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, binding.isSecureMode()); + } + + + @Override + protected String suitename() { + return "ITestAbfsManifestCommitProtocol"; + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java new file mode 100644 index 0000000000..922782da29 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import java.nio.charset.StandardCharsets; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding; +import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations; + +import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME; +import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration; +import static org.junit.Assume.assumeTrue; + +/** + * Test {@link AbfsManifestStoreOperations}. + * As this looks at etag handling through FS operations, it's actually testing how etags work + * in ABFS (preservation across renames) and in the client (are they consistent + * in LIST and HEAD calls). + * + * Skipped when tested against wasb-compatible stores. + */ +public class ITestAbfsManifestStoreOperations extends AbstractManifestCommitterTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsManifestStoreOperations.class); + + private final ABFSContractTestBinding binding; + + public ITestAbfsManifestStoreOperations() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + + // skip tests on non-HNS stores + assumeTrue("Resilient rename not available", + getFileSystem().hasPathCapability(getContract().getTestPath(), + ETAGS_PRESERVED_IN_RENAME)); + + } + + @Override + protected Configuration createConfiguration() { + return enableManifestCommitter(prepareTestConfiguration(binding)); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, binding.isSecureMode()); + } + + /** + * basic consistency across operations, as well as being non-empty. + */ + @Test + public void testEtagConsistencyAcrossListAndHead() throws Throwable { + describe("Etag values must be non-empty and consistent across LIST and HEAD Calls."); + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + ContractTestUtils.touch(fs, path); + final ManifestStoreOperations operations = createManifestStoreOperations(); + Assertions.assertThat(operations) + .describedAs("Store operations class loaded via Configuration") + .isInstanceOf(AbfsManifestStoreOperations.class); + + final FileStatus st = operations.getFileStatus(path); + final String etag = operations.getEtag(st); + Assertions.assertThat(etag) + .describedAs("Etag of %s", st) + .isNotBlank(); + LOG.info("etag of empty file is \"{}\"", etag); + + final FileStatus[] statuses = fs.listStatus(path); + Assertions.assertThat(statuses) + .describedAs("List(%s)", path) + .hasSize(1); + final FileStatus lsStatus = statuses[0]; + Assertions.assertThat(operations.getEtag(lsStatus)) + .describedAs("etag of list status (%s) compared to HEAD value of %s", lsStatus, st) + .isEqualTo(etag); + } + + @Test + public void testEtagsOfDifferentDataDifferent() throws Throwable { + describe("Verify that two different blocks of data written have different tags"); + + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + Path src = new Path(path, "src"); + + ContractTestUtils.createFile(fs, src, true, + "data1234".getBytes(StandardCharsets.UTF_8)); + final ManifestStoreOperations operations = createManifestStoreOperations(); + final FileStatus srcStatus = operations.getFileStatus(src); + final String srcTag = operations.getEtag(srcStatus); + LOG.info("etag of file 1 is \"{}\"", srcTag); + + // now overwrite with data of same length + // (ensure that path or length aren't used exclusively as tag) + ContractTestUtils.createFile(fs, src, true, + "1234data".getBytes(StandardCharsets.UTF_8)); + + // validate + final String tag2 = operations.getEtag(operations.getFileStatus(src)); + LOG.info("etag of file 2 is \"{}\"", tag2); + + Assertions.assertThat(tag2) + .describedAs("etag of updated file") + .isNotEqualTo(srcTag); + } + + @Test + public void testEtagConsistencyAcrossRename() throws Throwable { + describe("Verify that when a file is renamed, the etag remains unchanged"); + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + Path src = new Path(path, "src"); + Path dest = new Path(path, "dest"); + + ContractTestUtils.createFile(fs, src, true, + "sample data".getBytes(StandardCharsets.UTF_8)); + final ManifestStoreOperations operations = createManifestStoreOperations(); + final FileStatus srcStatus = operations.getFileStatus(src); + final String srcTag = operations.getEtag(srcStatus); + LOG.info("etag of short file is \"{}\"", srcTag); + + Assertions.assertThat(srcTag) + .describedAs("Etag of %s", srcStatus) + .isNotBlank(); + + // rename + operations.commitFile(new FileEntry(src, dest, 0, srcTag)); + + // validate + FileStatus destStatus = operations.getFileStatus(dest); + final String destTag = operations.getEtag(destStatus); + Assertions.assertThat(destTag) + .describedAs("etag of list status (%s) compared to HEAD value of %s", destStatus, srcStatus) + .isEqualTo(srcTag); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java new file mode 100644 index 0000000000..5547d081c9 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding; +import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestRenameStageFailure; + +/** + * Rename failure logic on ABFS. + * This will go through the resilient rename operation. + */ +public class ITestAbfsRenameStageFailure extends TestRenameStageFailure { + + /** + * How many files to create. + */ + private static final int FILES_TO_CREATE = 20; + + private final ABFSContractTestBinding binding; + + public ITestAbfsRenameStageFailure() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + } + + @Override + protected Configuration createConfiguration() { + return AbfsCommitTestHelper.prepareTestConfiguration(binding); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, binding.isSecureMode()); + } + + @Override + protected boolean requireRenameResilience() { + return true; + } + + @Override + protected int filesToCreate() { + return FILES_TO_CREATE; + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java new file mode 100644 index 0000000000..d2fe9de115 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding; +import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestTaskManifestFileIO; + +/** + * Test Reading/writing manifest file through ABFS. + */ +public class ITestAbfsTaskManifestFileIO extends TestTaskManifestFileIO { + + private final ABFSContractTestBinding binding; + + public ITestAbfsTaskManifestFileIO() throws Exception { + binding = new ABFSContractTestBinding(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + } + + @Override + protected Configuration createConfiguration() { + return AbfsCommitTestHelper.prepareTestConfiguration(binding); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, binding.isSecureMode()); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java new file mode 100644 index 0000000000..4b21b838de --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.commit; + +import java.io.File; +import java.io.FileNotFoundException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; + +import org.junit.Assume; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.examples.terasort.TeraGen; +import org.apache.hadoop.examples.terasort.TeraSort; +import org.apache.hadoop.examples.terasort.TeraSortConfigKeys; +import org.apache.hadoop.examples.terasort.TeraValidate; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.statistics.IOStatisticsLogging; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData; +import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.util.functional.RemoteIterators; + +import static java.util.Optional.empty; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO; +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.loadSuccessFile; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateSuccessFile; + +/** + * Runs Terasort against ABFS using the manifest committer. + * The tests run in sequence, so each operation is isolated. + * Scale test only (it is big and slow) + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +@SuppressWarnings({"StaticNonFinalField", "OptionalUsedAsFieldOrParameterType"}) +public class ITestAbfsTerasort extends AbstractAbfsClusterITest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsTerasort.class); + + public static final int EXPECTED_PARTITION_COUNT = 10; + + public static final int PARTITION_SAMPLE_SIZE = 1000; + + public static final int ROW_COUNT = 1000; + + /** + * This has to be common across all test methods. + */ + private static final Path TERASORT_PATH = new Path("/ITestAbfsTerasort"); + + /** + * Duration tracker created in the first of the test cases and closed + * in {@link #test_140_teracomplete()}. + */ + private static Optional terasortDuration = empty(); + + /** + * Tracker of which stages are completed and how long they took. + */ + private static final Map COMPLETED_STAGES = new HashMap<>(); + + /** + * FileSystem statistics are collected from the _SUCCESS markers. + */ + protected static final IOStatisticsSnapshot JOB_IOSTATS = + snapshotIOStatistics(); + + /** Base path for all the terasort input and output paths. */ + private Path terasortPath; + + /** Input (teragen) path. */ + private Path sortInput; + + /** Path where sorted data goes. */ + private Path sortOutput; + + /** Path for validated job's output. */ + private Path sortValidate; + + public ITestAbfsTerasort() throws Exception { + } + + + @Override + public void setup() throws Exception { + // superclass calls requireScaleTestsEnabled(); + super.setup(); + prepareToTerasort(); + } + + /** + * Set up the job conf with the options for terasort chosen by the scale + * options. + * @param conf configuration + */ + @Override + protected void applyCustomConfigOptions(JobConf conf) { + // small sample size for faster runs + conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), + getSampleSizeForEachPartition()); + conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), + getExpectedPartitionCount()); + conf.setBoolean( + TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), + false); + } + + private int getExpectedPartitionCount() { + return EXPECTED_PARTITION_COUNT; + } + + private int getSampleSizeForEachPartition() { + return PARTITION_SAMPLE_SIZE; + } + + protected int getRowCount() { + return ROW_COUNT; + } + + /** + * Set up the terasort by initializing paths variables + * The paths used must be unique across parameterized runs but + * common across all test cases in a single parameterized run. + */ + private void prepareToTerasort() { + terasortPath = getFileSystem().makeQualified(TERASORT_PATH); + sortInput = new Path(terasortPath, "sortin"); + sortOutput = new Path(terasortPath, "sortout"); + sortValidate = new Path(terasortPath, "validate"); + } + + /** + * Declare that a stage has completed. + * @param stage stage name/key in the map + * @param d duration. + */ + private static void completedStage(final String stage, + final DurationInfo d) { + COMPLETED_STAGES.put(stage, d); + } + + /** + * Declare a stage which is required for this test case. + * @param stage stage name + */ + private static void requireStage(final String stage) { + Assume.assumeTrue( + "Required stage was not completed: " + stage, + COMPLETED_STAGES.get(stage) != null); + } + + /** + * Execute a single stage in the terasort. + * Updates the completed stages map with the stage duration -if successful. + * @param stage Stage name for the stages map. + * @param jobConf job conf + * @param dest destination directory -the _SUCCESS file will be expected here. + * @param tool tool to run. + * @param args args for the tool. + * @param minimumFileCount minimum number of files to have been created + * @throws Exception any failure + */ + private void executeStage( + final String stage, + final JobConf jobConf, + final Path dest, + final Tool tool, + final String[] args, + final int minimumFileCount) throws Exception { + int result; + + // the duration info is created outside a try-with-resources + // clause as it is used later. + DurationInfo d = new DurationInfo(LOG, stage); + try { + result = ToolRunner.run(jobConf, tool, args); + } finally { + d.close(); + } + dumpOutputTree(dest); + assertEquals(stage + + "(" + StringUtils.join(", ", args) + ")" + + " failed", 0, result); + final ManifestSuccessData successFile = validateSuccessFile(getFileSystem(), dest, + minimumFileCount, ""); + JOB_IOSTATS.aggregate(successFile.getIOStatistics()); + + completedStage(stage, d); + } + + /** + * Set up terasort by cleaning out the destination, and note the initial + * time before any of the jobs are executed. + * + * This is executed first for each parameterized run. + * It is where all variables which need to be reset for each run need + * to be reset. + */ + @Test + public void test_100_terasort_setup() throws Throwable { + describe("Setting up for a terasort"); + + getFileSystem().delete(terasortPath, true); + terasortDuration = Optional.of(new DurationInfo(LOG, false, "Terasort")); + } + + @Test + public void test_110_teragen() throws Throwable { + describe("Teragen to %s", sortInput); + getFileSystem().delete(sortInput, true); + + JobConf jobConf = newJobConf(); + patchConfigurationForCommitter(jobConf); + executeStage("teragen", + jobConf, + sortInput, + new TeraGen(), + new String[]{Integer.toString(getRowCount()), sortInput.toString()}, + 1); + } + + + @Test + public void test_120_terasort() throws Throwable { + describe("Terasort from %s to %s", sortInput, sortOutput); + requireStage("teragen"); + getFileSystem().delete(sortOutput, true); + + loadSuccessFile(getFileSystem(), sortInput); + JobConf jobConf = newJobConf(); + patchConfigurationForCommitter(jobConf); + executeStage("terasort", + jobConf, + sortOutput, + new TeraSort(), + new String[]{sortInput.toString(), sortOutput.toString()}, + 1); + } + + @Test + public void test_130_teravalidate() throws Throwable { + describe("TeraValidate from %s to %s", sortOutput, sortValidate); + requireStage("terasort"); + getFileSystem().delete(sortValidate, true); + loadSuccessFile(getFileSystem(), sortOutput); + JobConf jobConf = newJobConf(); + patchConfigurationForCommitter(jobConf); + executeStage("teravalidate", + jobConf, + sortValidate, + new TeraValidate(), + new String[]{sortOutput.toString(), sortValidate.toString()}, + 1); + } + + /** + * Print the results, and save to the base dir as a CSV file. + * Why there? Makes it easy to list and compare. + */ + @Test + public void test_140_teracomplete() throws Throwable { + terasortDuration.ifPresent(d -> { + d.close(); + completedStage("overall", d); + }); + + // IO Statistics + IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, JOB_IOSTATS); + + // and the summary + final StringBuilder results = new StringBuilder(); + results.append("\"Operation\"\t\"Duration\"\n"); + + // this is how you dynamically create a function in a method + // for use afterwards. + // Works because there's no IOEs being raised in this sequence. + Consumer stage = (s) -> { + DurationInfo duration = COMPLETED_STAGES.get(s); + results.append(String.format("\"%s\"\t\"%s\"\n", + s, + duration == null ? "" : duration)); + }; + + stage.accept("teragen"); + stage.accept("terasort"); + stage.accept("teravalidate"); + stage.accept("overall"); + String text = results.toString(); + File resultsFile = File.createTempFile("results", ".csv"); + FileUtils.write(resultsFile, text, StandardCharsets.UTF_8); + LOG.info("Results are in {}\n{}", resultsFile, text); + } + + /** + * Reset the duration so if two committer tests are run sequentially. + * Without this the total execution time is reported as from the start of + * the first test suite to the end of the second. + */ + @Test + public void test_150_teracleanup() throws Throwable { + terasortDuration = Optional.empty(); + } + + @Test + public void test_200_directory_deletion() throws Throwable { + getFileSystem().delete(terasortPath, true); + } + + /** + * Dump the files under a path -but not fail if the path is not present., + * @param path path to dump + * @throws Exception any failure. + */ + protected void dumpOutputTree(Path path) throws Exception { + LOG.info("Files under output directory {}", path); + try { + RemoteIterators.foreach(getFileSystem().listFiles(path, true), + (status) -> LOG.info("{}", status)); + } catch (FileNotFoundException e) { + LOG.info("Output directory {} not found", path); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java new file mode 100644 index 0000000000..3d49d62eaa --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Unit and integration tests for the manifest committer. + * JSON job reports will be saved to + * {@code target/reports} + */ +package org.apache.hadoop.fs.azurebfs.commit; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java index 62bcca174e..1319ea44c7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java @@ -34,7 +34,7 @@ public class AbfsFileSystemContract extends AbstractBondedFSContract { public static final String CONTRACT_XML = "abfs.xml"; private final boolean isSecure; - protected AbfsFileSystemContract(final Configuration conf, boolean secure) { + public AbfsFileSystemContract(final Configuration conf, boolean secure) { super(conf); //insert the base features addConfResource(CONTRACT_XML); diff --git a/hadoop-tools/hadoop-azure/src/test/resources/core-site.xml b/hadoop-tools/hadoop-azure/src/test/resources/core-site.xml new file mode 100644 index 0000000000..7d2d11c04e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/resources/core-site.xml @@ -0,0 +1,25 @@ + + + + + + + + +