HADOOP-18163. hadoop-azure support for the Manifest Committer of MAPREDUCE-7341

Follow-on patch to MAPREDUCE-7341, adding ABFS support and tests

* resilient rename
* tests for job commit through the manifest committer.

contains
- HADOOP-17976. ABFS etag extraction inconsistent between LIST and HEAD calls
- HADOOP-16204. ABFS tests to include terasort

Contributed by Steve Loughran.

Change-Id: I0a7d4043bdf19bcb00c033fc389730109b93b77f
This commit is contained in:
Steve Loughran 2022-03-16 15:41:03 +00:00
parent 1cc83f0f45
commit 3238bdab89
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
33 changed files with 2108 additions and 84 deletions

View File

@ -376,6 +376,13 @@
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>

View File

@ -231,6 +231,67 @@
<scope>test</scope>
</dependency>
<!--
the mapreduce-client-core module is compiled against for the
manifest committer support. It is not needed on the classpath
except when using this committer, so it only tagged as
"provided".
It is not exported as a transitive dependency of the JAR.
Applications which wish to to use the manifest committer
will need to explicity add the mapreduce JAR to their classpath.
This is already done by MapReduce and Spark.
-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
</dependency>
<!--
These are only added to the classpath for tests,
and are not exported by the test JAR as transitive
dependencies.
-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-examples</artifactId>
<scope>test</scope>
<type>jar</type>
</dependency>
<!-- artifacts needed to bring up a Mini MR Yarn cluster-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
@ -331,7 +392,7 @@
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@ -362,7 +423,7 @@
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@ -404,7 +465,7 @@
<!-- surefire.forkNumber won't do the parameter -->
<!-- substitution. Putting a prefix in front of it like -->
<!-- "fork-" makes it work. -->
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<!-- Propagate scale parameters -->
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
@ -494,7 +555,7 @@
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@ -538,7 +599,7 @@
<!-- surefire.forkNumber won't do the parameter -->
<!-- substitution. Putting a prefix in front of it like -->
<!-- "fork-" makes it work. -->
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<!-- Propagate scale parameters -->
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
@ -556,6 +617,7 @@
<exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
<exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
<exclude>**/azurebfs/commit/*.java</exclude>
</excludes>
</configuration>
@ -584,7 +646,7 @@
<!-- surefire.forkNumber won't do the parameter -->
<!-- substitution. Putting a prefix in front of it like -->
<!-- "fork-" makes it work. -->
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<!-- Propagate scale parameters -->
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
@ -597,6 +659,7 @@
<include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
<include>**/azurebfs/ITestSmallWriteOptimization.java</include>
<include>**/azurebfs/services/ITestReadBufferManager.java</include>
<include>**/azurebfs/commit/*.java</include>
</includes>
</configuration>
</execution>
@ -646,7 +709,7 @@
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@ -676,7 +739,7 @@
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@ -718,7 +781,7 @@
<!-- surefire.forkNumber won't do the parameter -->
<!-- substitution. Putting a prefix in front of it like -->
<!-- "fork-" makes it work. -->
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<!-- Propagate scale parameters -->
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>

View File

@ -48,4 +48,7 @@
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
<suppress checks="ParameterNumber|VisibilityModifier"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
<!-- allow tests to use _ for ordering. -->
<suppress checks="MethodName"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
</suppressions>

View File

@ -260,6 +260,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
private boolean enableAutoThrottling;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT,
MinValue = 0,
DefaultValue = RATE_LIMIT_DEFAULT)
private int rateLimit;
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY,
DefaultValue = DEFAULT_FS_AZURE_USER_AGENT_PREFIX)
private String userAgentId;
@ -726,6 +731,10 @@ public boolean isAutoThrottlingEnabled() {
return this.enableAutoThrottling;
}
public int getRateLimit() {
return rateLimit;
}
public String getCustomUserAgentPrefix() {
return this.userAgentId;
}

View File

@ -27,6 +27,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.AccessDeniedException;
import java.time.Duration;
import java.util.Hashtable;
import java.util.List;
import java.util.ArrayList;
@ -42,13 +43,17 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import javax.annotation.Nullable;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
@ -94,9 +99,12 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.RateLimiting;
import org.apache.hadoop.util.RateLimitingFactory;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
@ -143,6 +151,9 @@ public class AzureBlobFileSystem extends FileSystem
/** Maximum Active blocks per OutputStream. */
private int blockOutputActiveBlocks;
/** Rate limiting for operations which use it to throttle their IO. */
private RateLimiting rateLimiting;
@Override
public void initialize(URI uri, Configuration configuration)
throws IOException {
@ -215,7 +226,7 @@ public void initialize(URI uri, Configuration configuration)
}
AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit());
LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
}
@ -282,8 +293,13 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
}
@Override
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize,
final short replication, final long blockSize, final Progressable progress) throws IOException {
public FSDataOutputStream create(final Path f,
final FsPermission permission,
final boolean overwrite,
final int bufferSize,
final short replication,
final long blockSize,
final Progressable progress) throws IOException {
LOG.debug("AzureBlobFileSystem.create path: {} permission: {} overwrite: {} bufferSize: {}",
f,
permission,
@ -332,8 +348,12 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe
@Override
@SuppressWarnings("deprecation")
public FSDataOutputStream createNonRecursive(final Path f, final FsPermission permission,
final EnumSet<CreateFlag> flags, final int bufferSize, final short replication, final long blockSize,
public FSDataOutputStream createNonRecursive(final Path f,
final FsPermission permission,
final EnumSet<CreateFlag> flags,
final int bufferSize,
final short replication,
final long blockSize,
final Progressable progress) throws IOException {
// Check if file should be appended or overwritten. Assume that the file
@ -357,7 +377,8 @@ public FSDataOutputStream createNonRecursive(final Path f,
}
@Override
public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException {
public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress)
throws IOException {
LOG.debug(
"AzureBlobFileSystem.append path: {} bufferSize: {}",
f.toString(),
@ -430,7 +451,7 @@ public boolean rename(final Path src, final Path dst) throws IOException {
qualifiedDstPath = makeQualified(adjustedDst);
abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext);
abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null);
return true;
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Rename operation failed. ", ex);
@ -448,6 +469,81 @@ public boolean rename(final Path src, final Path dst) throws IOException {
}
/**
* Private method to create resilient commit support.
* @return a new instance
* @param path destination path
* @throws IOException problem probing store capabilities
* @throws UnsupportedOperationException if the store lacks this support
*/
@InterfaceAudience.Private
public ResilientCommitByRename createResilientCommitSupport(final Path path)
throws IOException {
if (!hasPathCapability(path,
CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME)) {
throw new UnsupportedOperationException(
"Resilient commit support not available for " + path);
}
return new ResilientCommitByRenameImpl();
}
/**
* Resilient commit support.
* Provided as a nested class to avoid contaminating the
* FS instance with too many private methods which end up
* being used widely (as has happened to the S3A FS)
*/
public class ResilientCommitByRenameImpl implements ResilientCommitByRename {
/**
* Perform the rename.
* This will be rate limited, as well as able to recover
* from rename errors if the etag was passed in.
* @param source path to source file
* @param dest destination of rename.
* @param sourceEtag etag of source file. may be null or empty
* @return the outcome of the operation
* @throws IOException any rename failure which was not recovered from.
*/
public Pair<Boolean, Duration> commitSingleFileByRename(
final Path source,
final Path dest,
@Nullable final String sourceEtag) throws IOException {
LOG.debug("renameFileWithEtag source: {} dest: {} etag {}", source, dest, sourceEtag);
statIncrement(CALL_RENAME);
trailingPeriodCheck(dest);
Path qualifiedSrcPath = makeQualified(source);
Path qualifiedDstPath = makeQualified(dest);
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat,
listener);
if (qualifiedSrcPath.equals(qualifiedDstPath)) {
// rename to itself is forbidden
throw new PathIOException(qualifiedSrcPath.toString(), "cannot rename object onto self");
}
// acquire one IO permit
final Duration waitTime = rateLimiting.acquire(1);
try {
final boolean recovered = abfsStore.rename(qualifiedSrcPath,
qualifiedDstPath, tracingContext, sourceEtag);
return Pair.of(recovered, waitTime);
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Rename operation failed. ", ex);
checkException(source, ex);
// never reached
return null;
}
}
}
@Override
public boolean delete(final Path f, final boolean recursive) throws IOException {
LOG.debug(
@ -533,8 +629,7 @@ private void trailingPeriodCheck(Path path) throws IllegalArgumentException {
"ABFS does not allow files or directories to end with a dot.");
}
path = path.getParent();
}
else {
} else {
break;
}
}
@ -658,7 +753,6 @@ public Path makeQualified(Path path) {
return super.makeQualified(path);
}
@Override
public Path getWorkingDirectory() {
return this.workingDir;
@ -782,8 +876,7 @@ public Void call() throws Exception {
}
});
}
}
finally {
} finally {
executorService.shutdownNow();
}
@ -839,7 +932,10 @@ public void setOwner(final Path path, final String owner, final String group)
* @throws IllegalArgumentException If name is null or empty or if value is null
*/
@Override
public void setXAttr(final Path path, final String name, final byte[] value, final EnumSet<XAttrSetFlag> flag)
public void setXAttr(final Path path,
final String name,
final byte[] value,
final EnumSet<XAttrSetFlag> flag)
throws IOException {
LOG.debug("AzureBlobFileSystem.setXAttr path: {}", path);
@ -1526,8 +1622,9 @@ public boolean hasPathCapability(final Path path, final String capability)
case CommonPathCapabilities.FS_PERMISSIONS:
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.ETAGS_AVAILABLE:
case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME:
return true;
case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME:
case CommonPathCapabilities.FS_ACLS:
return getIsNamespaceEnabled(
new TracingContext(clientCorrelationId, fileSystemId,

View File

@ -62,6 +62,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -855,7 +856,22 @@ public void breakLease(final Path path, final TracingContext tracingContext) thr
client.breakLease(getRelativePath(path), tracingContext);
}
public void rename(final Path source, final Path destination, TracingContext tracingContext) throws
/**
* Rename a file or directory.
* If a source etag is passed in, the operation will attempt to recover
* from a missing source file by probing the destination for
* existence and comparing etags.
* @param source path to source file
* @param destination destination of rename.
* @param tracingContext trace context
* @param sourceEtag etag of source file. may be null or empty
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
* @return true if recovery was needed and succeeded.
*/
public boolean rename(final Path source,
final Path destination,
final TracingContext tracingContext,
final String sourceEtag) throws
AzureBlobFileSystemException {
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
long countAggregate = 0;
@ -875,23 +891,29 @@ public void rename(final Path source, final Path destination, TracingContext tra
String sourceRelativePath = getRelativePath(source);
String destinationRelativePath = getRelativePath(destination);
// was any operation recovered from?
boolean recovered = false;
do {
try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) {
AbfsRestOperation op = client
.renamePath(sourceRelativePath, destinationRelativePath,
continuation, tracingContext);
final Pair<AbfsRestOperation, Boolean> pair =
client.renamePath(sourceRelativePath, destinationRelativePath,
continuation, tracingContext, sourceEtag);
AbfsRestOperation op = pair.getLeft();
perfInfo.registerResult(op.getResult());
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
perfInfo.registerSuccess(true);
countAggregate++;
shouldContinue = continuation != null && !continuation.isEmpty();
// update the recovery flag.
recovered |= pair.getRight();
if (!shouldContinue) {
perfInfo.registerAggregates(startAggregate, countAggregate);
}
}
} while (shouldContinue);
return recovered;
}
public void delete(final Path path, final boolean recursive,
@ -1909,7 +1931,7 @@ boolean areLeasesFreed() {
* @param result response to process.
* @return the quote-unwrapped etag.
*/
private static String extractEtagHeader(AbfsHttpOperation result) {
public static String extractEtagHeader(AbfsHttpOperation result) {
String etag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
if (etag != null) {
// strip out any wrapper "" quotes which come back, for consistency with

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import java.io.IOException;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem;
/**
* Extension of StoreOperationsThroughFileSystem with ABFS awareness.
* Purely for use by jobs committing work through the manifest committer.
* The {@link AzureManifestCommitterFactory} will configure
* this as the binding to the FS.
*
* ADLS Gen2 stores support etag-recovery on renames, but not WASB
* stores.
*/
@InterfaceAudience.LimitedPrivate("mapreduce")
@InterfaceStability.Unstable
public class AbfsManifestStoreOperations extends
ManifestStoreOperationsThroughFileSystem {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsManifestStoreOperations.class);
/**
* Classname, which can be declared in jpb configurations.
*/
public static final String NAME = AbfsManifestStoreOperations.class.getName();
/**
* Resilient rename calls; only available on an ADLS Gen2 store.
* Will be null after binding if the FS isn't compatible.
*/
private ResilientCommitByRename resilientCommitByRename;
@Override
public AzureBlobFileSystem getFileSystem() {
return (AzureBlobFileSystem) super.getFileSystem();
}
/**
* Bind to the store.
*
* @param filesystem FS.
* @param path path to work under
* @throws IOException binding problems.
*/
@Override
public void bindToFileSystem(FileSystem filesystem, Path path) throws IOException {
if (!(filesystem instanceof AzureBlobFileSystem)) {
throw new PathIOException(path.toString(),
"Not an abfs filesystem: " + filesystem.getClass());
}
super.bindToFileSystem(filesystem, path);
try {
resilientCommitByRename = getFileSystem().createResilientCommitSupport(path);
LOG.debug("Bonded to filesystem with resilient commits under path {}", path);
} catch (UnsupportedOperationException e) {
LOG.debug("No resilient commit support under path {}", path);
}
}
@Override
public boolean storePreservesEtagsThroughRenames(final Path path) {
return true;
}
/**
* Resilient commits available on hierarchical stores.
* @return true if the FS can use etags on renames.
*/
@Override
public boolean storeSupportsResilientCommit() {
return resilientCommitByRename != null;
}
/**
* Commit a file through an internal ABFS operation.
* If resilient commit is unavailable, invokes the superclass, which
* will raise an UnsupportedOperationException
* @param entry entry to commit
* @return the outcome
* @throws IOException any failure in resilient commit.
* @throws UnsupportedOperationException if not available.
*/
@Override
public CommitFileResult commitFile(final FileEntry entry) throws IOException {
if (resilientCommitByRename != null) {
final Pair<Boolean, Duration> result =
resilientCommitByRename.commitSingleFileByRename(
entry.getSourcePath(),
entry.getDestPath(),
entry.getEtag());
return CommitFileResult.fromResilientCommit(result.getLeft(),
result.getRight());
} else {
return super.commitFile(entry);
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
/**
* A Committer for the manifest committer which performs all bindings needed
* to work best with abfs.
* This includes, at a minimum, switching to the abfs-specific manifest store operations.
*
* This classname is referenced in configurations, so MUST NOT change.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AzureManifestCommitterFactory extends ManifestCommitterFactory {
/**
* Classname, which can be declared in job configurations.
*/
public static final String NAME = ManifestCommitterFactory.class.getName();
@Override
public ManifestCommitter createOutputCommitter(final Path outputPath,
final TaskAttemptContext context) throws IOException {
final Configuration conf = context.getConfiguration();
// use ABFS Store operations
conf.set(OPT_STORE_OPERATIONS_CLASS,
AbfsManifestStoreOperations.NAME);
return super.createOutputCommitter(outputPath, context);
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
/**
* API exclusively for committing files.
*
* This is only for use by (@link {@link AbfsManifestStoreOperations},
* and is intended to be implemented by ABFS.
* To ensure that there is no need to add mapreduce JARs to the
* classpath just to work with ABFS, this interface
* MUST NOT refer to anything in the
* {@code org.apache.hadoop.mapreduce} package.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface ResilientCommitByRename extends IOStatisticsSource {
/**
* Rename source file to dest path *Exactly*; no subdirectory games here.
* if the method does not raise an exception,then
* the data at dest is the data which was at source.
*
* Requirements
*
* <pre>
* exists(FS, source) else raise FileNotFoundException
* source != dest else raise PathIOException
* not exists(FS, dest)
* isDir(FS, dest.getParent)
* </pre>
* <ol>
* <li>source != dest else raise PathIOException</li>
* <li>source must exist else raise FileNotFoundException</li>
* <li>source must exist and be a file</li>
* <li>dest must not exist; </li>
* <li>dest.getParent() must be a dir</li>
* <li>if sourceEtag is non-empty, it MAY be used to qualify/validate the rename.</li>
* </ol>
*
* The outcome of the operation is undefined if source is not a file, dest exists,
* dest.getParent() doesn't exist/is a file.
* That is: implementations SHOULD assume that the code calling this method has
* set up the destination directory tree and is only invoking this call on a file.
* Accordingly: <i>implementations MAY skip validation checks</i>
*
* Post Conditions on a successful operation:
* <pre>
* FS' where:
* not exists(FS', source)
* and exists(FS', dest)
* and data(FS', dest) == data (FS, source)
* </pre>
* This is exactly the same outcome as `FileSystem.rename()` when the same preconditions
* are met. This API call simply restricts the operation to file rename with strict
* conditions, (no need to be 'clever' about dest path calculation) and the ability
* to pass in etags, modtimes and file status values.
*
* @param source path to source file
* @param dest destination of rename.
* @param sourceEtag etag of source file. may be null or empty
* @return true if recovery was needed.
* @throws FileNotFoundException source file not found
* @throws PathIOException failure, including source and dest being the same path
* @throws IOException any other exception
*/
Pair<Boolean, Duration> commitSingleFileByRename(
Path source,
Path dest,
@Nullable String sourceEtag) throws IOException;
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Support for manifest committer.
* Unless otherwise stated: classes are private.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.fs.azurebfs.commit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -220,6 +220,9 @@ public final class ConfigurationKeys {
/** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */
public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track";
/** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */
public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit";
public static String accountProperty(String property, String account) {
return property + "." + account;
}

View File

@ -133,5 +133,10 @@ public final class FileSystemConfigurations {
public static final String DATA_BLOCKS_BUFFER_DEFAULT =
DATA_BLOCKS_BUFFER_DISK;
/**
* IO rate limit. Value: {@value}
*/
public static final int RATE_LIMIT_DEFAULT = 10_000;
private FileSystemConfigurations() {}
}

View File

@ -51,6 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
@ -67,6 +68,8 @@
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
@ -478,8 +481,30 @@ public AbfsRestOperation breakLease(final String path,
return op;
}
public AbfsRestOperation renamePath(String source, final String destination,
final String continuation, TracingContext tracingContext)
/**
* Rename a file or directory.
* If a source etag is passed in, the operation will attempt to recover
* from a missing source file by probing the destination for
* existence and comparing etags.
* The second value in the result will be true to indicate that this
* took place.
* As rename recovery is only attempted if the source etag is non-empty,
* in normal rename operations rename recovery will never happen.
* @param source path to source file
* @param destination destination of rename.
* @param continuation continuation.
* @param tracingContext trace context
* @param sourceEtag etag of source file. may be null or empty
* @return pair of (the rename operation, flag indicating recovery took place)
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
*/
public Pair<AbfsRestOperation, Boolean> renamePath(
final String source,
final String destination,
final String continuation,
final TracingContext tracingContext,
final String sourceEtag)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
@ -505,9 +530,73 @@ public AbfsRestOperation renamePath(String source, final String destination,
HTTP_METHOD_PUT,
url,
requestHeaders);
// no attempt at recovery using timestamps as it was not reliable.
try {
op.execute(tracingContext);
return op;
return Pair.of(op, false);
} catch (AzureBlobFileSystemException e) {
// If we have no HTTP response, throw the original exception.
if (!op.hasResult()) {
throw e;
}
boolean etagCheckSucceeded = renameIdempotencyCheckOp(
source,
sourceEtag, op, destination, tracingContext);
if (!etagCheckSucceeded) {
// idempotency did not return different result
// throw back the exception
throw e;
}
return Pair.of(op, true);
}
}
/**
* Check if the rename request failure is post a retry and if earlier rename
* request might have succeeded at back-end.
*
* If a source etag was passed in, and the error was 404, get the
* etag of any file at the destination.
* If it matches the source etag, then the rename is considered
* a success.
* Exceptions raised in the probe of the destination are swallowed,
* so that they do not interfere with the original rename failures.
* @param source source path
* @param op Rename request REST operation response with non-null HTTP response
* @param destination rename destination path
* @param sourceEtag etag of source file. may be null or empty
* @param tracingContext Tracks identifiers for request header
* @return true if the file was successfully copied
*/
public boolean renameIdempotencyCheckOp(
final String source,
final String sourceEtag,
final AbfsRestOperation op,
final String destination,
TracingContext tracingContext) {
Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");
if ((op.isARetriedRequest())
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
&& isNotEmpty(sourceEtag)) {
// Server has returned HTTP 404, which means rename source no longer
// exists. Check on destination status and if its etag matches
// that of the source, consider it to be a success.
LOG.debug("rename {} to {} failed, checking etag of destination",
source, destination);
try {
final AbfsRestOperation destStatusOp = getPathStatus(destination,
false, tracingContext);
final AbfsHttpOperation result = destStatusOp.getResult();
return result.getStatusCode() == HttpURLConnection.HTTP_OK
&& sourceEtag.equals(extractEtagHeader(result));
} catch (AzureBlobFileSystemException ignored) {
// GetFileStatus on the destination failed, the rename did not take place
}
}
return false;
}
public AbfsRestOperation append(final String path, final byte[] buffer,

View File

@ -48,9 +48,15 @@ public String getStorageAccountKey(String accountName, Configuration rawConfig)
// Validating the key.
validateStorageAccountKey(key);
} catch (IllegalAccessException | InvalidConfigurationValueException e) {
throw new KeyProviderException("Failure to initialize configuration", e);
LOG.debug("Failure to retrieve storage account key for {}", accountName,
e);
throw new KeyProviderException("Failure to initialize configuration for "
+ accountName
+ " key =\"" + key + "\""
+ ": " + e, e);
} catch(IOException ioe) {
LOG.warn("Unable to get key from credential providers. {}", ioe);
LOG.warn("Unable to get key for {} from credential providers. {}",
accountName, ioe, ioe);
}
return key;

View File

@ -270,12 +270,13 @@ protected void createFilesystemForSASTests() throws Exception {
// The SAS tests do not have permission to create a filesystem
// so first create temporary instance of the filesystem using SharedKey
// then re-use the filesystem it creates with SAS auth instead of SharedKey.
AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
try (AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig)){
ContractTestUtils.assertPathExists(tempFs, "This path should exist",
new Path("/"));
abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
usingFilesystemForSASTests = true;
}
}
public AzureBlobFileSystem getFileSystem() throws IOException {
return abfs;

View File

@ -32,7 +32,10 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
@ -76,13 +79,19 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
new Random().nextBytes(b);
Path testPath = path(TEST_PATH);
try (FSDataOutputStream stream = fs.create(testPath)) {
FSDataOutputStream stream = fs.create(testPath);
try {
stream.write(b);
} finally{
stream.close();
}
IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
final byte[] readBuffer = new byte[2 * bufferSize];
int result;
IOStatisticsSource statisticsSource = null;
try (FSDataInputStream inputStream = fs.open(testPath)) {
statisticsSource = inputStream;
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.READ, true, 0,
@ -100,6 +109,8 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
inputStream.seek(0);
result = inputStream.read(readBuffer, 0, bufferSize);
}
IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
assertNotEquals("data read in final read()", -1, result);
assertArrayEquals(readBuffer, b);
}

View File

@ -401,7 +401,8 @@ public void testSignatureMask() throws Exception {
fs.create(new Path(src)).close();
AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
.renamePath(src, "/testABC" + "/abc.txt", null,
getTestTracingContext(fs, false));
getTestTracingContext(fs, false), null)
.getLeft();
AbfsHttpOperation result = abfsHttpRestOperation.getResult();
String url = result.getMaskedUrl();
String encodedUrl = result.getMaskedEncodedUrl();
@ -418,7 +419,7 @@ public void testSignatureMaskOnExceptionMessage() throws Exception {
intercept(IOException.class, "sig=XXXX",
() -> getFileSystem().getAbfsClient()
.renamePath("testABC/test.xt", "testABC/abc.txt", null,
getTestTracingContext(getFileSystem(), false)));
getTestTracingContext(getFileSystem(), false), null));
}
@Test

View File

@ -526,7 +526,8 @@ private void testRenamePath(final boolean isWithCPK) throws Exception {
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
.renamePath(testFileName, newName, null,
getTestTracingContext(fs, false));
getTestTracingContext(fs, false), null)
.getLeft();
assertCPKHeaders(abfsRestOperation, false);
assertNoCPKResponseHeadersPresent(abfsRestOperation);

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
/**
* Helper methods for committer tests on ABFS.
*/
final class AbfsCommitTestHelper {
private AbfsCommitTestHelper() {
}
/**
* Prepare the test configuration.
* @param contractTestBinding test binding
* @return an extracted and patched configuration.
*/
static Configuration prepareTestConfiguration(
ABFSContractTestBinding contractTestBinding) {
final Configuration conf =
contractTestBinding.getRawConfiguration();
// use ABFS Store operations
conf.set(OPT_STORE_OPERATIONS_CLASS,
AbfsManifestStoreOperations.NAME);
return conf;
}
}

View File

@ -0,0 +1,260 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.util.DurationInfo;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled;
import static org.apache.hadoop.io.IOUtils.closeStream;
/**
* Tests which create a yarn minicluster.
* These are all considered scale tests; the probe for
* scale tests being enabled is executed before the cluster
* is set up to avoid wasting time on non-scale runs.
*/
public abstract class AbstractAbfsClusterITest extends
AbstractManifestCommitterTest {
public static final int NO_OF_NODEMANAGERS = 2;
private final ABFSContractTestBinding binding;
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
protected AbstractAbfsClusterITest() throws Exception {
binding = new ABFSContractTestBinding();
}
@Override
protected int getTestTimeoutMillis() {
return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS;
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
requireScaleTestsEnabled();
if (getClusterBinding() == null) {
clusterBinding = demandCreateClusterBinding();
}
assertNotNull("cluster is not bound", getClusterBinding());
}
@AfterClass
public static void teardownClusters() throws IOException {
terminateCluster(clusterBinding);
clusterBinding = null;
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
@Override
protected Configuration createConfiguration() {
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
}
/**
* This is the cluster binding which every subclass must create.
*/
protected static final class ClusterBinding {
private String clusterName;
private final MiniMRYarnCluster yarn;
public ClusterBinding(
final String clusterName,
final MiniMRYarnCluster yarn) {
this.clusterName = clusterName;
this.yarn = requireNonNull(yarn);
}
/**
* Get the cluster FS, which will either be HDFS or the local FS.
* @return a filesystem.
* @throws IOException failure
*/
public FileSystem getClusterFS() throws IOException {
return FileSystem.getLocal(yarn.getConfig());
}
public MiniMRYarnCluster getYarn() {
return yarn;
}
public Configuration getConf() {
return getYarn().getConfig();
}
public String getClusterName() {
return clusterName;
}
public void terminate() {
closeStream(getYarn());
}
}
/**
* Create the cluster binding.
* The configuration will be patched by propagating down options
* from the maven build (S3Guard binding etc) and turning off unwanted
* YARN features.
*
* If an HDFS cluster is requested,
* the HDFS and YARN clusters will share the same configuration, so
* the HDFS cluster binding is implicitly propagated to YARN.
* If one is not requested, the local filesystem is used as the cluster FS.
* @param conf configuration to start with.
* @return the cluster binding.
* @throws IOException failure.
*/
protected static ClusterBinding createCluster(
final JobConf conf) throws IOException {
try (DurationInfo d = new DurationInfo(LOG, "Creating YARN MiniCluster")) {
conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
// create a unique cluster name based on the current time in millis.
String timestamp = LocalDateTime.now().format(
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS"));
String clusterName = "yarn-" + timestamp;
MiniMRYarnCluster yarnCluster =
new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS);
yarnCluster.init(conf);
yarnCluster.start();
return new ClusterBinding(clusterName, yarnCluster);
}
}
/**
* Terminate the cluster if it is not null.
* @param cluster the cluster
*/
protected static void terminateCluster(ClusterBinding cluster) {
if (cluster != null) {
cluster.terminate();
}
}
/**
* Get the cluster binding for this subclass.
* @return the cluster binding
*/
protected ClusterBinding getClusterBinding() {
return clusterBinding;
}
protected MiniMRYarnCluster getYarn() {
return getClusterBinding().getYarn();
}
/**
* We stage work into a temporary directory rather than directly under
* the user's home directory, as that is often rejected by CI test
* runners.
*/
@Rule
public final TemporaryFolder stagingFilesDir = new TemporaryFolder();
/**
* binding on demand rather than in a BeforeClass static method.
* Subclasses can override this to change the binding options.
* @return the cluster binding
*/
protected ClusterBinding demandCreateClusterBinding() throws Exception {
return createCluster(new JobConf());
}
/**
* Create a job configuration.
* This creates a new job conf from the yarn
* cluster configuration then calls
* {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized.
* @return the new job configuration.
* @throws IOException failure
*/
protected JobConf newJobConf() throws IOException {
JobConf jobConf = new JobConf(getYarn().getConfig());
jobConf.addResource(getConfiguration());
applyCustomConfigOptions(jobConf);
return jobConf;
}
/**
* Patch the (job) configuration for this committer.
* @param jobConf configuration to patch
* @return a configuration which will run this configuration.
*/
protected Configuration patchConfigurationForCommitter(
final Configuration jobConf) {
enableManifestCommitter(jobConf);
return jobConf;
}
/**
* Override point to let implementations tune the MR Job conf.
* @param jobConf configuration
*/
protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
}
/**
* Assume that scale tests are enabled.
*/
protected void requireScaleTestsEnabled() {
assumeScaleTestsEnabled(getConfiguration());
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCleanupStage;
/**
* Cleanup logic on ABFS.
*/
public class ITestAbfsCleanupStage extends TestCleanupStage {
private final ABFSContractTestBinding binding;
public ITestAbfsCleanupStage() throws Exception {
binding = new ABFSContractTestBinding();
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
}
@Override
protected Configuration createConfiguration() {
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCommitTaskStage;
/**
* ABFS storage test of task committer.
*/
public class ITestAbfsCommitTaskStage extends TestCommitTaskStage {
private final ABFSContractTestBinding binding;
public ITestAbfsCommitTaskStage() throws Exception {
binding = new ABFSContractTestBinding();
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
}
@Override
protected Configuration createConfiguration() {
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCreateOutputDirectoriesStage;
/**
* ABFS storage test of directory creation.
*/
public class ITestAbfsCreateOutputDirectoriesStage extends TestCreateOutputDirectoriesStage {
private final ABFSContractTestBinding binding;
public ITestAbfsCreateOutputDirectoriesStage() throws Exception {
binding = new ABFSContractTestBinding();
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
}
@Override
protected Configuration createConfiguration() {
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import java.io.IOException;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.FixMethodOrder;
import org.junit.runners.MethodSorters;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestJobThroughManifestCommitter;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
/**
* Test the Manifest committer stages against ABFS.
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ITestAbfsJobThroughManifestCommitter
extends TestJobThroughManifestCommitter {
private final ABFSContractTestBinding binding;
public ITestAbfsJobThroughManifestCommitter() throws Exception {
binding = new ABFSContractTestBinding();
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
}
@Override
protected Configuration createConfiguration() {
return enableManifestCommitter(prepareTestConfiguration(binding));
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
@Override
protected boolean shouldDeleteTestRootAtEndOfTestRun() {
return true;
}
/**
* Add read of manifest and validate of output's etags.
* @param attemptId attempt ID
* @param files files which were created.
* @param manifest manifest
* @throws IOException failure
*/
@Override
protected void validateTaskAttemptManifest(String attemptId,
List<Path> files,
TaskManifest manifest) throws IOException {
super.validateTaskAttemptManifest(attemptId, files, manifest);
final List<FileEntry> commit = manifest.getFilesToCommit();
final ManifestStoreOperations operations = getStoreOperations();
for (FileEntry entry : commit) {
Assertions.assertThat(entry.getEtag())
.describedAs("Etag of %s", entry)
.isNotEmpty();
final FileStatus sourceStatus = operations.getFileStatus(entry.getSourcePath());
final String etag = ManifestCommitterSupport.getEtag(sourceStatus);
Assertions.assertThat(etag)
.describedAs("Etag of %s", sourceStatus)
.isEqualTo(entry.getEtag());
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestLoadManifestsStage;
/**
* ABFS storage test of saving and loading a large number
* of manifests.
*/
public class ITestAbfsLoadManifestsStage extends TestLoadManifestsStage {
private final ABFSContractTestBinding binding;
public ITestAbfsLoadManifestsStage() throws Exception {
binding = new ABFSContractTestBinding();
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
}
@Override
protected Configuration createConfiguration() {
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestManifestCommitProtocol;
import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
/**
* Test the Manifest protocol against ABFS.
*/
public class ITestAbfsManifestCommitProtocol extends
TestManifestCommitProtocol {
private final ABFSContractTestBinding binding;
public ITestAbfsManifestCommitProtocol() throws Exception {
binding = new ABFSContractTestBinding();
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
}
@Override
protected Configuration createConfiguration() {
return enableManifestCommitter(prepareTestConfiguration(binding));
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
@Override
protected String suitename() {
return "ITestAbfsManifestCommitProtocol";
}
}

View File

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import java.nio.charset.StandardCharsets;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
import static org.junit.Assume.assumeTrue;
/**
* Test {@link AbfsManifestStoreOperations}.
* As this looks at etag handling through FS operations, it's actually testing how etags work
* in ABFS (preservation across renames) and in the client (are they consistent
* in LIST and HEAD calls).
*
* Skipped when tested against wasb-compatible stores.
*/
public class ITestAbfsManifestStoreOperations extends AbstractManifestCommitterTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsManifestStoreOperations.class);
private final ABFSContractTestBinding binding;
public ITestAbfsManifestStoreOperations() throws Exception {
binding = new ABFSContractTestBinding();
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
// skip tests on non-HNS stores
assumeTrue("Resilient rename not available",
getFileSystem().hasPathCapability(getContract().getTestPath(),
ETAGS_PRESERVED_IN_RENAME));
}
@Override
protected Configuration createConfiguration() {
return enableManifestCommitter(prepareTestConfiguration(binding));
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
/**
* basic consistency across operations, as well as being non-empty.
*/
@Test
public void testEtagConsistencyAcrossListAndHead() throws Throwable {
describe("Etag values must be non-empty and consistent across LIST and HEAD Calls.");
final Path path = methodPath();
final FileSystem fs = getFileSystem();
ContractTestUtils.touch(fs, path);
final ManifestStoreOperations operations = createManifestStoreOperations();
Assertions.assertThat(operations)
.describedAs("Store operations class loaded via Configuration")
.isInstanceOf(AbfsManifestStoreOperations.class);
final FileStatus st = operations.getFileStatus(path);
final String etag = operations.getEtag(st);
Assertions.assertThat(etag)
.describedAs("Etag of %s", st)
.isNotBlank();
LOG.info("etag of empty file is \"{}\"", etag);
final FileStatus[] statuses = fs.listStatus(path);
Assertions.assertThat(statuses)
.describedAs("List(%s)", path)
.hasSize(1);
final FileStatus lsStatus = statuses[0];
Assertions.assertThat(operations.getEtag(lsStatus))
.describedAs("etag of list status (%s) compared to HEAD value of %s", lsStatus, st)
.isEqualTo(etag);
}
@Test
public void testEtagsOfDifferentDataDifferent() throws Throwable {
describe("Verify that two different blocks of data written have different tags");
final Path path = methodPath();
final FileSystem fs = getFileSystem();
Path src = new Path(path, "src");
ContractTestUtils.createFile(fs, src, true,
"data1234".getBytes(StandardCharsets.UTF_8));
final ManifestStoreOperations operations = createManifestStoreOperations();
final FileStatus srcStatus = operations.getFileStatus(src);
final String srcTag = operations.getEtag(srcStatus);
LOG.info("etag of file 1 is \"{}\"", srcTag);
// now overwrite with data of same length
// (ensure that path or length aren't used exclusively as tag)
ContractTestUtils.createFile(fs, src, true,
"1234data".getBytes(StandardCharsets.UTF_8));
// validate
final String tag2 = operations.getEtag(operations.getFileStatus(src));
LOG.info("etag of file 2 is \"{}\"", tag2);
Assertions.assertThat(tag2)
.describedAs("etag of updated file")
.isNotEqualTo(srcTag);
}
@Test
public void testEtagConsistencyAcrossRename() throws Throwable {
describe("Verify that when a file is renamed, the etag remains unchanged");
final Path path = methodPath();
final FileSystem fs = getFileSystem();
Path src = new Path(path, "src");
Path dest = new Path(path, "dest");
ContractTestUtils.createFile(fs, src, true,
"sample data".getBytes(StandardCharsets.UTF_8));
final ManifestStoreOperations operations = createManifestStoreOperations();
final FileStatus srcStatus = operations.getFileStatus(src);
final String srcTag = operations.getEtag(srcStatus);
LOG.info("etag of short file is \"{}\"", srcTag);
Assertions.assertThat(srcTag)
.describedAs("Etag of %s", srcStatus)
.isNotBlank();
// rename
operations.commitFile(new FileEntry(src, dest, 0, srcTag));
// validate
FileStatus destStatus = operations.getFileStatus(dest);
final String destTag = operations.getEtag(destStatus);
Assertions.assertThat(destTag)
.describedAs("etag of list status (%s) compared to HEAD value of %s", destStatus, srcStatus)
.isEqualTo(srcTag);
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestRenameStageFailure;
/**
* Rename failure logic on ABFS.
* This will go through the resilient rename operation.
*/
public class ITestAbfsRenameStageFailure extends TestRenameStageFailure {
/**
* How many files to create.
*/
private static final int FILES_TO_CREATE = 20;
private final ABFSContractTestBinding binding;
public ITestAbfsRenameStageFailure() throws Exception {
binding = new ABFSContractTestBinding();
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
}
@Override
protected Configuration createConfiguration() {
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
@Override
protected boolean requireRenameResilience() {
return true;
}
@Override
protected int filesToCreate() {
return FILES_TO_CREATE;
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestTaskManifestFileIO;
/**
* Test Reading/writing manifest file through ABFS.
*/
public class ITestAbfsTaskManifestFileIO extends TestTaskManifestFileIO {
private final ABFSContractTestBinding binding;
public ITestAbfsTaskManifestFileIO() throws Exception {
binding = new ABFSContractTestBinding();
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
}
@Override
protected Configuration createConfiguration() {
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
}

View File

@ -0,0 +1,353 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import java.io.File;
import java.io.FileNotFoundException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.junit.Assume;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.examples.terasort.TeraGen;
import org.apache.hadoop.examples.terasort.TeraSort;
import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
import org.apache.hadoop.examples.terasort.TeraValidate;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.functional.RemoteIterators;
import static java.util.Optional.empty;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.loadSuccessFile;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateSuccessFile;
/**
* Runs Terasort against ABFS using the manifest committer.
* The tests run in sequence, so each operation is isolated.
* Scale test only (it is big and slow)
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@SuppressWarnings({"StaticNonFinalField", "OptionalUsedAsFieldOrParameterType"})
public class ITestAbfsTerasort extends AbstractAbfsClusterITest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsTerasort.class);
public static final int EXPECTED_PARTITION_COUNT = 10;
public static final int PARTITION_SAMPLE_SIZE = 1000;
public static final int ROW_COUNT = 1000;
/**
* This has to be common across all test methods.
*/
private static final Path TERASORT_PATH = new Path("/ITestAbfsTerasort");
/**
* Duration tracker created in the first of the test cases and closed
* in {@link #test_140_teracomplete()}.
*/
private static Optional<DurationInfo> terasortDuration = empty();
/**
* Tracker of which stages are completed and how long they took.
*/
private static final Map<String, DurationInfo> COMPLETED_STAGES = new HashMap<>();
/**
* FileSystem statistics are collected from the _SUCCESS markers.
*/
protected static final IOStatisticsSnapshot JOB_IOSTATS =
snapshotIOStatistics();
/** Base path for all the terasort input and output paths. */
private Path terasortPath;
/** Input (teragen) path. */
private Path sortInput;
/** Path where sorted data goes. */
private Path sortOutput;
/** Path for validated job's output. */
private Path sortValidate;
public ITestAbfsTerasort() throws Exception {
}
@Override
public void setup() throws Exception {
// superclass calls requireScaleTestsEnabled();
super.setup();
prepareToTerasort();
}
/**
* Set up the job conf with the options for terasort chosen by the scale
* options.
* @param conf configuration
*/
@Override
protected void applyCustomConfigOptions(JobConf conf) {
// small sample size for faster runs
conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(),
getSampleSizeForEachPartition());
conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
getExpectedPartitionCount());
conf.setBoolean(
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
false);
}
private int getExpectedPartitionCount() {
return EXPECTED_PARTITION_COUNT;
}
private int getSampleSizeForEachPartition() {
return PARTITION_SAMPLE_SIZE;
}
protected int getRowCount() {
return ROW_COUNT;
}
/**
* Set up the terasort by initializing paths variables
* The paths used must be unique across parameterized runs but
* common across all test cases in a single parameterized run.
*/
private void prepareToTerasort() {
terasortPath = getFileSystem().makeQualified(TERASORT_PATH);
sortInput = new Path(terasortPath, "sortin");
sortOutput = new Path(terasortPath, "sortout");
sortValidate = new Path(terasortPath, "validate");
}
/**
* Declare that a stage has completed.
* @param stage stage name/key in the map
* @param d duration.
*/
private static void completedStage(final String stage,
final DurationInfo d) {
COMPLETED_STAGES.put(stage, d);
}
/**
* Declare a stage which is required for this test case.
* @param stage stage name
*/
private static void requireStage(final String stage) {
Assume.assumeTrue(
"Required stage was not completed: " + stage,
COMPLETED_STAGES.get(stage) != null);
}
/**
* Execute a single stage in the terasort.
* Updates the completed stages map with the stage duration -if successful.
* @param stage Stage name for the stages map.
* @param jobConf job conf
* @param dest destination directory -the _SUCCESS file will be expected here.
* @param tool tool to run.
* @param args args for the tool.
* @param minimumFileCount minimum number of files to have been created
* @throws Exception any failure
*/
private void executeStage(
final String stage,
final JobConf jobConf,
final Path dest,
final Tool tool,
final String[] args,
final int minimumFileCount) throws Exception {
int result;
// the duration info is created outside a try-with-resources
// clause as it is used later.
DurationInfo d = new DurationInfo(LOG, stage);
try {
result = ToolRunner.run(jobConf, tool, args);
} finally {
d.close();
}
dumpOutputTree(dest);
assertEquals(stage
+ "(" + StringUtils.join(", ", args) + ")"
+ " failed", 0, result);
final ManifestSuccessData successFile = validateSuccessFile(getFileSystem(), dest,
minimumFileCount, "");
JOB_IOSTATS.aggregate(successFile.getIOStatistics());
completedStage(stage, d);
}
/**
* Set up terasort by cleaning out the destination, and note the initial
* time before any of the jobs are executed.
*
* This is executed first <i>for each parameterized run</i>.
* It is where all variables which need to be reset for each run need
* to be reset.
*/
@Test
public void test_100_terasort_setup() throws Throwable {
describe("Setting up for a terasort");
getFileSystem().delete(terasortPath, true);
terasortDuration = Optional.of(new DurationInfo(LOG, false, "Terasort"));
}
@Test
public void test_110_teragen() throws Throwable {
describe("Teragen to %s", sortInput);
getFileSystem().delete(sortInput, true);
JobConf jobConf = newJobConf();
patchConfigurationForCommitter(jobConf);
executeStage("teragen",
jobConf,
sortInput,
new TeraGen(),
new String[]{Integer.toString(getRowCount()), sortInput.toString()},
1);
}
@Test
public void test_120_terasort() throws Throwable {
describe("Terasort from %s to %s", sortInput, sortOutput);
requireStage("teragen");
getFileSystem().delete(sortOutput, true);
loadSuccessFile(getFileSystem(), sortInput);
JobConf jobConf = newJobConf();
patchConfigurationForCommitter(jobConf);
executeStage("terasort",
jobConf,
sortOutput,
new TeraSort(),
new String[]{sortInput.toString(), sortOutput.toString()},
1);
}
@Test
public void test_130_teravalidate() throws Throwable {
describe("TeraValidate from %s to %s", sortOutput, sortValidate);
requireStage("terasort");
getFileSystem().delete(sortValidate, true);
loadSuccessFile(getFileSystem(), sortOutput);
JobConf jobConf = newJobConf();
patchConfigurationForCommitter(jobConf);
executeStage("teravalidate",
jobConf,
sortValidate,
new TeraValidate(),
new String[]{sortOutput.toString(), sortValidate.toString()},
1);
}
/**
* Print the results, and save to the base dir as a CSV file.
* Why there? Makes it easy to list and compare.
*/
@Test
public void test_140_teracomplete() throws Throwable {
terasortDuration.ifPresent(d -> {
d.close();
completedStage("overall", d);
});
// IO Statistics
IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, JOB_IOSTATS);
// and the summary
final StringBuilder results = new StringBuilder();
results.append("\"Operation\"\t\"Duration\"\n");
// this is how you dynamically create a function in a method
// for use afterwards.
// Works because there's no IOEs being raised in this sequence.
Consumer<String> stage = (s) -> {
DurationInfo duration = COMPLETED_STAGES.get(s);
results.append(String.format("\"%s\"\t\"%s\"\n",
s,
duration == null ? "" : duration));
};
stage.accept("teragen");
stage.accept("terasort");
stage.accept("teravalidate");
stage.accept("overall");
String text = results.toString();
File resultsFile = File.createTempFile("results", ".csv");
FileUtils.write(resultsFile, text, StandardCharsets.UTF_8);
LOG.info("Results are in {}\n{}", resultsFile, text);
}
/**
* Reset the duration so if two committer tests are run sequentially.
* Without this the total execution time is reported as from the start of
* the first test suite to the end of the second.
*/
@Test
public void test_150_teracleanup() throws Throwable {
terasortDuration = Optional.empty();
}
@Test
public void test_200_directory_deletion() throws Throwable {
getFileSystem().delete(terasortPath, true);
}
/**
* Dump the files under a path -but not fail if the path is not present.,
* @param path path to dump
* @throws Exception any failure.
*/
protected void dumpOutputTree(Path path) throws Exception {
LOG.info("Files under output directory {}", path);
try {
RemoteIterators.foreach(getFileSystem().listFiles(path, true),
(status) -> LOG.info("{}", status));
} catch (FileNotFoundException e) {
LOG.info("Output directory {} not found", path);
}
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Unit and integration tests for the manifest committer.
* JSON job reports will be saved to
* {@code target/reports}
*/
package org.apache.hadoop.fs.azurebfs.commit;

View File

@ -34,7 +34,7 @@ public class AbfsFileSystemContract extends AbstractBondedFSContract {
public static final String CONTRACT_XML = "abfs.xml";
private final boolean isSecure;
protected AbfsFileSystemContract(final Configuration conf, boolean secure) {
public AbfsFileSystemContract(final Configuration conf, boolean secure) {
super(conf);
//insert the base features
addConfResource(CONTRACT_XML);

View File

@ -0,0 +1,25 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<include xmlns="http://www.w3.org/2001/XInclude" href="azure-test.xml">
<fallback/>
</include>
</configuration>