HADOOP-18163. hadoop-azure support for the Manifest Committer of MAPREDUCE-7341
Follow-on patch to MAPREDUCE-7341, adding ABFS support and tests * resilient rename * tests for job commit through the manifest committer. contains - HADOOP-17976. ABFS etag extraction inconsistent between LIST and HEAD calls - HADOOP-16204. ABFS tests to include terasort Contributed by Steve Loughran. Change-Id: I0a7d4043bdf19bcb00c033fc389730109b93b77f
This commit is contained in:
parent
7328c34ba5
commit
8294bd5a37
@ -384,6 +384,13 @@
|
||||
<version>${hadoop.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
|
||||
|
@ -219,6 +219,67 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!--
|
||||
the mapreduce-client-core module is compiled against for the
|
||||
manifest committer support. It is not needed on the classpath
|
||||
except when using this committer, so it only tagged as
|
||||
"provided".
|
||||
It is not exported as a transitive dependency of the JAR.
|
||||
Applications which wish to to use the manifest committer
|
||||
will need to explicity add the mapreduce JAR to their classpath.
|
||||
This is already done by MapReduce and Spark.
|
||||
-->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<!--
|
||||
These are only added to the classpath for tests,
|
||||
and are not exported by the test JAR as transitive
|
||||
dependencies.
|
||||
-->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-tests</artifactId>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-hs</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-examples</artifactId>
|
||||
<scope>test</scope>
|
||||
<type>jar</type>
|
||||
</dependency>
|
||||
<!-- artifacts needed to bring up a Mini MR Yarn cluster-->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-app</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-app</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-distcp</artifactId>
|
||||
@ -319,7 +380,7 @@
|
||||
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
|
||||
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
|
||||
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
|
||||
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
|
||||
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
|
||||
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
|
||||
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
|
||||
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
|
||||
@ -350,7 +411,7 @@
|
||||
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
|
||||
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
|
||||
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
|
||||
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
|
||||
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
|
||||
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
|
||||
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
|
||||
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
|
||||
@ -392,7 +453,7 @@
|
||||
<!-- surefire.forkNumber won't do the parameter -->
|
||||
<!-- substitution. Putting a prefix in front of it like -->
|
||||
<!-- "fork-" makes it work. -->
|
||||
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
|
||||
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
|
||||
<!-- Propagate scale parameters -->
|
||||
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
|
||||
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
|
||||
@ -482,7 +543,7 @@
|
||||
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
|
||||
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
|
||||
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
|
||||
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
|
||||
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
|
||||
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
|
||||
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
|
||||
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
|
||||
@ -526,7 +587,7 @@
|
||||
<!-- surefire.forkNumber won't do the parameter -->
|
||||
<!-- substitution. Putting a prefix in front of it like -->
|
||||
<!-- "fork-" makes it work. -->
|
||||
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
|
||||
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
|
||||
<!-- Propagate scale parameters -->
|
||||
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
|
||||
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
|
||||
@ -544,6 +605,7 @@
|
||||
<exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
|
||||
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
|
||||
<exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
|
||||
<exclude>**/azurebfs/commit/*.java</exclude>
|
||||
</excludes>
|
||||
|
||||
</configuration>
|
||||
@ -572,7 +634,7 @@
|
||||
<!-- surefire.forkNumber won't do the parameter -->
|
||||
<!-- substitution. Putting a prefix in front of it like -->
|
||||
<!-- "fork-" makes it work. -->
|
||||
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
|
||||
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
|
||||
<!-- Propagate scale parameters -->
|
||||
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
|
||||
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
|
||||
@ -585,6 +647,7 @@
|
||||
<include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
|
||||
<include>**/azurebfs/ITestSmallWriteOptimization.java</include>
|
||||
<include>**/azurebfs/services/ITestReadBufferManager.java</include>
|
||||
<include>**/azurebfs/commit/*.java</include>
|
||||
</includes>
|
||||
</configuration>
|
||||
</execution>
|
||||
@ -634,7 +697,7 @@
|
||||
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
|
||||
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
|
||||
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
|
||||
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
|
||||
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
|
||||
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
|
||||
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
|
||||
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
|
||||
@ -664,7 +727,7 @@
|
||||
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
|
||||
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
|
||||
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
|
||||
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
|
||||
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
|
||||
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
|
||||
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
|
||||
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
|
||||
@ -706,7 +769,7 @@
|
||||
<!-- surefire.forkNumber won't do the parameter -->
|
||||
<!-- substitution. Putting a prefix in front of it like -->
|
||||
<!-- "fork-" makes it work. -->
|
||||
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
|
||||
<test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
|
||||
<!-- Propagate scale parameters -->
|
||||
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
|
||||
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
|
||||
|
@ -48,4 +48,7 @@
|
||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
|
||||
<suppress checks="ParameterNumber|VisibilityModifier"
|
||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
|
||||
<!-- allow tests to use _ for ordering. -->
|
||||
<suppress checks="MethodName"
|
||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
|
||||
</suppressions>
|
||||
|
@ -260,6 +260,11 @@ public class AbfsConfiguration{
|
||||
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
|
||||
private boolean enableAutoThrottling;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT,
|
||||
MinValue = 0,
|
||||
DefaultValue = RATE_LIMIT_DEFAULT)
|
||||
private int rateLimit;
|
||||
|
||||
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY,
|
||||
DefaultValue = DEFAULT_FS_AZURE_USER_AGENT_PREFIX)
|
||||
private String userAgentId;
|
||||
@ -726,6 +731,10 @@ public boolean isAutoThrottlingEnabled() {
|
||||
return this.enableAutoThrottling;
|
||||
}
|
||||
|
||||
public int getRateLimit() {
|
||||
return rateLimit;
|
||||
}
|
||||
|
||||
public String getCustomUserAgentPrefix() {
|
||||
return this.userAgentId;
|
||||
}
|
||||
|
@ -27,6 +27,7 @@
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.AccessDeniedException;
|
||||
import java.time.Duration;
|
||||
import java.util.Hashtable;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
@ -42,13 +43,17 @@
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.apache.hadoop.util.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
|
||||
@ -94,9 +99,12 @@
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
import org.apache.hadoop.fs.store.DataBlocks;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.RateLimiting;
|
||||
import org.apache.hadoop.util.RateLimitingFactory;
|
||||
import org.apache.hadoop.util.functional.RemoteIterators;
|
||||
import org.apache.hadoop.util.DurationInfo;
|
||||
import org.apache.hadoop.util.LambdaUtils;
|
||||
@ -143,6 +151,9 @@ public class AzureBlobFileSystem extends FileSystem
|
||||
/** Maximum Active blocks per OutputStream. */
|
||||
private int blockOutputActiveBlocks;
|
||||
|
||||
/** Rate limiting for operations which use it to throttle their IO. */
|
||||
private RateLimiting rateLimiting;
|
||||
|
||||
@Override
|
||||
public void initialize(URI uri, Configuration configuration)
|
||||
throws IOException {
|
||||
@ -215,7 +226,7 @@ public void initialize(URI uri, Configuration configuration)
|
||||
}
|
||||
|
||||
AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
|
||||
|
||||
rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit());
|
||||
LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
|
||||
}
|
||||
|
||||
@ -261,7 +272,7 @@ private FSDataInputStream open(final Path path,
|
||||
InputStream inputStream = abfsStore
|
||||
.openFileForRead(qualifiedPath, parameters, statistics, tracingContext);
|
||||
return new FSDataInputStream(inputStream);
|
||||
} catch(AzureBlobFileSystemException ex) {
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(path, ex);
|
||||
return null;
|
||||
}
|
||||
@ -290,8 +301,13 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize,
|
||||
final short replication, final long blockSize, final Progressable progress) throws IOException {
|
||||
public FSDataOutputStream create(final Path f,
|
||||
final FsPermission permission,
|
||||
final boolean overwrite,
|
||||
final int bufferSize,
|
||||
final short replication,
|
||||
final long blockSize,
|
||||
final Progressable progress) throws IOException {
|
||||
LOG.debug("AzureBlobFileSystem.create path: {} permission: {} overwrite: {} bufferSize: {}",
|
||||
f,
|
||||
permission,
|
||||
@ -311,7 +327,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi
|
||||
FsPermission.getUMask(getConf()), tracingContext);
|
||||
statIncrement(FILES_CREATED);
|
||||
return new FSDataOutputStream(outputStream, statistics);
|
||||
} catch(AzureBlobFileSystemException ex) {
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(f, ex);
|
||||
return null;
|
||||
}
|
||||
@ -340,8 +356,12 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
public FSDataOutputStream createNonRecursive(final Path f, final FsPermission permission,
|
||||
final EnumSet<CreateFlag> flags, final int bufferSize, final short replication, final long blockSize,
|
||||
public FSDataOutputStream createNonRecursive(final Path f,
|
||||
final FsPermission permission,
|
||||
final EnumSet<CreateFlag> flags,
|
||||
final int bufferSize,
|
||||
final short replication,
|
||||
final long blockSize,
|
||||
final Progressable progress) throws IOException {
|
||||
|
||||
// Check if file should be appended or overwritten. Assume that the file
|
||||
@ -365,7 +385,8 @@ public FSDataOutputStream createNonRecursive(final Path f,
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException {
|
||||
public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress)
|
||||
throws IOException {
|
||||
LOG.debug(
|
||||
"AzureBlobFileSystem.append path: {} bufferSize: {}",
|
||||
f.toString(),
|
||||
@ -380,7 +401,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr
|
||||
OutputStream outputStream = abfsStore
|
||||
.openFileForWrite(qualifiedPath, statistics, false, tracingContext);
|
||||
return new FSDataOutputStream(outputStream, statistics);
|
||||
} catch(AzureBlobFileSystemException ex) {
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(f, ex);
|
||||
return null;
|
||||
}
|
||||
@ -403,7 +424,7 @@ public boolean rename(final Path src, final Path dst) throws IOException {
|
||||
fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat,
|
||||
listener);
|
||||
// rename under same folder;
|
||||
if(makeQualified(parentFolder).equals(qualifiedDstPath)) {
|
||||
if (makeQualified(parentFolder).equals(qualifiedDstPath)) {
|
||||
return tryGetFileStatus(qualifiedSrcPath, tracingContext) != null;
|
||||
}
|
||||
|
||||
@ -438,24 +459,99 @@ public boolean rename(final Path src, final Path dst) throws IOException {
|
||||
|
||||
qualifiedDstPath = makeQualified(adjustedDst);
|
||||
|
||||
abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext);
|
||||
abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null);
|
||||
return true;
|
||||
} catch(AzureBlobFileSystemException ex) {
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
LOG.debug("Rename operation failed. ", ex);
|
||||
checkException(
|
||||
src,
|
||||
ex,
|
||||
AzureServiceErrorCode.PATH_ALREADY_EXISTS,
|
||||
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
|
||||
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
|
||||
AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
|
||||
AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
|
||||
AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
|
||||
src,
|
||||
ex,
|
||||
AzureServiceErrorCode.PATH_ALREADY_EXISTS,
|
||||
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
|
||||
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
|
||||
AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
|
||||
AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
|
||||
AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Private method to create resilient commit support.
|
||||
* @return a new instance
|
||||
* @param path destination path
|
||||
* @throws IOException problem probing store capabilities
|
||||
* @throws UnsupportedOperationException if the store lacks this support
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public ResilientCommitByRename createResilientCommitSupport(final Path path)
|
||||
throws IOException {
|
||||
|
||||
if (!hasPathCapability(path,
|
||||
CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME)) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Resilient commit support not available for " + path);
|
||||
}
|
||||
return new ResilientCommitByRenameImpl();
|
||||
}
|
||||
|
||||
/**
|
||||
* Resilient commit support.
|
||||
* Provided as a nested class to avoid contaminating the
|
||||
* FS instance with too many private methods which end up
|
||||
* being used widely (as has happened to the S3A FS)
|
||||
*/
|
||||
public class ResilientCommitByRenameImpl implements ResilientCommitByRename {
|
||||
|
||||
/**
|
||||
* Perform the rename.
|
||||
* This will be rate limited, as well as able to recover
|
||||
* from rename errors if the etag was passed in.
|
||||
* @param source path to source file
|
||||
* @param dest destination of rename.
|
||||
* @param sourceEtag etag of source file. may be null or empty
|
||||
* @return the outcome of the operation
|
||||
* @throws IOException any rename failure which was not recovered from.
|
||||
*/
|
||||
public Pair<Boolean, Duration> commitSingleFileByRename(
|
||||
final Path source,
|
||||
final Path dest,
|
||||
@Nullable final String sourceEtag) throws IOException {
|
||||
|
||||
LOG.debug("renameFileWithEtag source: {} dest: {} etag {}", source, dest, sourceEtag);
|
||||
statIncrement(CALL_RENAME);
|
||||
|
||||
trailingPeriodCheck(dest);
|
||||
Path qualifiedSrcPath = makeQualified(source);
|
||||
Path qualifiedDstPath = makeQualified(dest);
|
||||
|
||||
TracingContext tracingContext = new TracingContext(clientCorrelationId,
|
||||
fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat,
|
||||
listener);
|
||||
|
||||
if (qualifiedSrcPath.equals(qualifiedDstPath)) {
|
||||
// rename to itself is forbidden
|
||||
throw new PathIOException(qualifiedSrcPath.toString(), "cannot rename object onto self");
|
||||
}
|
||||
|
||||
// acquire one IO permit
|
||||
final Duration waitTime = rateLimiting.acquire(1);
|
||||
|
||||
try {
|
||||
final boolean recovered = abfsStore.rename(qualifiedSrcPath,
|
||||
qualifiedDstPath, tracingContext, sourceEtag);
|
||||
return Pair.of(recovered, waitTime);
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
LOG.debug("Rename operation failed. ", ex);
|
||||
checkException(source, ex);
|
||||
// never reached
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(final Path f, final boolean recursive) throws IOException {
|
||||
LOG.debug(
|
||||
@ -533,7 +629,7 @@ private void incrementStatistic(AbfsStatistic statistic) {
|
||||
* @throws IllegalArgumentException if the path has a trailing period (.)
|
||||
*/
|
||||
private void trailingPeriodCheck(Path path) throws IllegalArgumentException {
|
||||
while (!path.isRoot()){
|
||||
while (!path.isRoot()) {
|
||||
String pathToString = path.toString();
|
||||
if (pathToString.length() != 0) {
|
||||
if (pathToString.charAt(pathToString.length() - 1) == '.') {
|
||||
@ -541,8 +637,7 @@ private void trailingPeriodCheck(Path path) throws IllegalArgumentException {
|
||||
"ABFS does not allow files or directories to end with a dot.");
|
||||
}
|
||||
path = path.getParent();
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -601,10 +696,10 @@ public synchronized void close() throws IOException {
|
||||
|
||||
@Override
|
||||
public FileStatus getFileStatus(final Path f) throws IOException {
|
||||
TracingContext tracingContext = new TracingContext(clientCorrelationId,
|
||||
fileSystemId, FSOperationType.GET_FILESTATUS, tracingHeaderFormat,
|
||||
listener);
|
||||
return getFileStatus(f, tracingContext);
|
||||
TracingContext tracingContext = new TracingContext(clientCorrelationId,
|
||||
fileSystemId, FSOperationType.GET_FILESTATUS, tracingHeaderFormat,
|
||||
listener);
|
||||
return getFileStatus(f, tracingContext);
|
||||
}
|
||||
|
||||
private FileStatus getFileStatus(final Path path,
|
||||
@ -615,7 +710,7 @@ private FileStatus getFileStatus(final Path path,
|
||||
|
||||
try {
|
||||
return abfsStore.getFileStatus(qualifiedPath, tracingContext);
|
||||
} catch(AzureBlobFileSystemException ex) {
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(path, ex);
|
||||
return null;
|
||||
}
|
||||
@ -639,7 +734,7 @@ public void breakLease(final Path f) throws IOException {
|
||||
fileSystemId, FSOperationType.BREAK_LEASE, tracingHeaderFormat,
|
||||
listener);
|
||||
abfsStore.breakLease(qualifiedPath, tracingContext);
|
||||
} catch(AzureBlobFileSystemException ex) {
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(f, ex);
|
||||
}
|
||||
}
|
||||
@ -666,7 +761,6 @@ public Path makeQualified(Path path) {
|
||||
return super.makeQualified(path);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Path getWorkingDirectory() {
|
||||
return this.workingDir;
|
||||
@ -689,8 +783,8 @@ public String getScheme() {
|
||||
@Override
|
||||
public Path getHomeDirectory() {
|
||||
return makeQualified(new Path(
|
||||
FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX
|
||||
+ "/" + abfsStore.getUser()));
|
||||
FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX
|
||||
+ "/" + abfsStore.getUser()));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -714,8 +808,8 @@ public BlockLocation[] getFileBlockLocations(FileStatus file,
|
||||
}
|
||||
final String blobLocationHost = abfsStore.getAbfsConfiguration().getAzureBlockLocationHost();
|
||||
|
||||
final String[] name = { blobLocationHost };
|
||||
final String[] host = { blobLocationHost };
|
||||
final String[] name = {blobLocationHost};
|
||||
final String[] host = {blobLocationHost};
|
||||
long blockSize = file.getBlockSize();
|
||||
if (blockSize <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
@ -790,15 +884,14 @@ public Void call() throws Exception {
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
finally {
|
||||
} finally {
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
* Set owner of a path (i.e. a file or a directory).
|
||||
* The parameters owner and group cannot both be null.
|
||||
*
|
||||
@ -828,9 +921,9 @@ public void setOwner(final Path path, final String owner, final String group)
|
||||
|
||||
try {
|
||||
abfsStore.setOwner(qualifiedPath,
|
||||
owner,
|
||||
group,
|
||||
tracingContext);
|
||||
owner,
|
||||
group,
|
||||
tracingContext);
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(path, ex);
|
||||
}
|
||||
@ -847,7 +940,10 @@ public void setOwner(final Path path, final String owner, final String group)
|
||||
* @throws IllegalArgumentException If name is null or empty or if value is null
|
||||
*/
|
||||
@Override
|
||||
public void setXAttr(final Path path, final String name, final byte[] value, final EnumSet<XAttrSetFlag> flag)
|
||||
public void setXAttr(final Path path,
|
||||
final String name,
|
||||
final byte[] value,
|
||||
final EnumSet<XAttrSetFlag> flag)
|
||||
throws IOException {
|
||||
LOG.debug("AzureBlobFileSystem.setXAttr path: {}", path);
|
||||
|
||||
@ -971,7 +1067,7 @@ public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec)
|
||||
if (!getIsNamespaceEnabled(tracingContext)) {
|
||||
throw new UnsupportedOperationException(
|
||||
"modifyAclEntries is only supported by storage accounts with the "
|
||||
+ "hierarchical namespace enabled.");
|
||||
+ "hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
if (aclSpec == null || aclSpec.isEmpty()) {
|
||||
@ -1006,7 +1102,7 @@ public void removeAclEntries(final Path path, final List<AclEntry> aclSpec)
|
||||
if (!getIsNamespaceEnabled(tracingContext)) {
|
||||
throw new UnsupportedOperationException(
|
||||
"removeAclEntries is only supported by storage accounts with the "
|
||||
+ "hierarchical namespace enabled.");
|
||||
+ "hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
if (aclSpec == null || aclSpec.isEmpty()) {
|
||||
@ -1038,7 +1134,7 @@ public void removeDefaultAcl(final Path path) throws IOException {
|
||||
if (!getIsNamespaceEnabled(tracingContext)) {
|
||||
throw new UnsupportedOperationException(
|
||||
"removeDefaultAcl is only supported by storage accounts with the "
|
||||
+ "hierarchical namespace enabled.");
|
||||
+ "hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
Path qualifiedPath = makeQualified(path);
|
||||
@ -1068,7 +1164,7 @@ public void removeAcl(final Path path) throws IOException {
|
||||
if (!getIsNamespaceEnabled(tracingContext)) {
|
||||
throw new UnsupportedOperationException(
|
||||
"removeAcl is only supported by storage accounts with the "
|
||||
+ "hierarchical namespace enabled.");
|
||||
+ "hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
Path qualifiedPath = makeQualified(path);
|
||||
@ -1101,7 +1197,7 @@ public void setAcl(final Path path, final List<AclEntry> aclSpec)
|
||||
if (!getIsNamespaceEnabled(tracingContext)) {
|
||||
throw new UnsupportedOperationException(
|
||||
"setAcl is only supported by storage accounts with the hierarchical "
|
||||
+ "namespace enabled.");
|
||||
+ "namespace enabled.");
|
||||
}
|
||||
|
||||
if (aclSpec == null || aclSpec.size() == 0) {
|
||||
@ -1133,7 +1229,7 @@ public AclStatus getAclStatus(final Path path) throws IOException {
|
||||
if (!getIsNamespaceEnabled(tracingContext)) {
|
||||
throw new UnsupportedOperationException(
|
||||
"getAclStatus is only supported by storage account with the "
|
||||
+ "hierarchical namespace enabled.");
|
||||
+ "hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
Path qualifiedPath = makeQualified(path);
|
||||
@ -1243,7 +1339,7 @@ private FileStatus tryGetFileStatus(final Path f, TracingContext tracingContext)
|
||||
|
||||
private boolean fileSystemExists() throws IOException {
|
||||
LOG.debug(
|
||||
"AzureBlobFileSystem.fileSystemExists uri: {}", uri);
|
||||
"AzureBlobFileSystem.fileSystemExists uri: {}", uri);
|
||||
try {
|
||||
TracingContext tracingContext = new TracingContext(clientCorrelationId,
|
||||
fileSystemId, FSOperationType.TEST_OP, tracingHeaderFormat, listener);
|
||||
@ -1534,8 +1630,9 @@ public boolean hasPathCapability(final Path path, final String capability)
|
||||
case CommonPathCapabilities.FS_PERMISSIONS:
|
||||
case CommonPathCapabilities.FS_APPEND:
|
||||
case CommonPathCapabilities.ETAGS_AVAILABLE:
|
||||
case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME:
|
||||
return true;
|
||||
|
||||
case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME:
|
||||
case CommonPathCapabilities.FS_ACLS:
|
||||
return getIsNamespaceEnabled(
|
||||
new TracingContext(clientCorrelationId, fileSystemId,
|
||||
|
@ -62,6 +62,7 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -878,7 +879,22 @@ public void breakLease(final Path path, final TracingContext tracingContext) thr
|
||||
client.breakLease(getRelativePath(path), tracingContext);
|
||||
}
|
||||
|
||||
public void rename(final Path source, final Path destination, TracingContext tracingContext) throws
|
||||
/**
|
||||
* Rename a file or directory.
|
||||
* If a source etag is passed in, the operation will attempt to recover
|
||||
* from a missing source file by probing the destination for
|
||||
* existence and comparing etags.
|
||||
* @param source path to source file
|
||||
* @param destination destination of rename.
|
||||
* @param tracingContext trace context
|
||||
* @param sourceEtag etag of source file. may be null or empty
|
||||
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
|
||||
* @return true if recovery was needed and succeeded.
|
||||
*/
|
||||
public boolean rename(final Path source,
|
||||
final Path destination,
|
||||
final TracingContext tracingContext,
|
||||
final String sourceEtag) throws
|
||||
AzureBlobFileSystemException {
|
||||
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
|
||||
long countAggregate = 0;
|
||||
@ -898,23 +914,29 @@ public void rename(final Path source, final Path destination, TracingContext tra
|
||||
|
||||
String sourceRelativePath = getRelativePath(source);
|
||||
String destinationRelativePath = getRelativePath(destination);
|
||||
// was any operation recovered from?
|
||||
boolean recovered = false;
|
||||
|
||||
do {
|
||||
try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) {
|
||||
AbfsRestOperation op = client
|
||||
.renamePath(sourceRelativePath, destinationRelativePath,
|
||||
continuation, tracingContext);
|
||||
final Pair<AbfsRestOperation, Boolean> pair =
|
||||
client.renamePath(sourceRelativePath, destinationRelativePath,
|
||||
continuation, tracingContext, sourceEtag);
|
||||
|
||||
AbfsRestOperation op = pair.getLeft();
|
||||
perfInfo.registerResult(op.getResult());
|
||||
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
|
||||
perfInfo.registerSuccess(true);
|
||||
countAggregate++;
|
||||
shouldContinue = continuation != null && !continuation.isEmpty();
|
||||
|
||||
// update the recovery flag.
|
||||
recovered |= pair.getRight();
|
||||
if (!shouldContinue) {
|
||||
perfInfo.registerAggregates(startAggregate, countAggregate);
|
||||
}
|
||||
}
|
||||
} while (shouldContinue);
|
||||
return recovered;
|
||||
}
|
||||
|
||||
public void delete(final Path path, final boolean recursive,
|
||||
@ -1932,7 +1954,7 @@ boolean areLeasesFreed() {
|
||||
* @param result response to process.
|
||||
* @return the quote-unwrapped etag.
|
||||
*/
|
||||
private static String extractEtagHeader(AbfsHttpOperation result) {
|
||||
public static String extractEtagHeader(AbfsHttpOperation result) {
|
||||
String etag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||
if (etag != null) {
|
||||
// strip out any wrapper "" quotes which come back, for consistency with
|
||||
|
@ -0,0 +1,130 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathIOException;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem;
|
||||
|
||||
/**
|
||||
* Extension of StoreOperationsThroughFileSystem with ABFS awareness.
|
||||
* Purely for use by jobs committing work through the manifest committer.
|
||||
* The {@link AzureManifestCommitterFactory} will configure
|
||||
* this as the binding to the FS.
|
||||
*
|
||||
* ADLS Gen2 stores support etag-recovery on renames, but not WASB
|
||||
* stores.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate("mapreduce")
|
||||
@InterfaceStability.Unstable
|
||||
public class AbfsManifestStoreOperations extends
|
||||
ManifestStoreOperationsThroughFileSystem {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
AbfsManifestStoreOperations.class);
|
||||
|
||||
/**
|
||||
* Classname, which can be declared in jpb configurations.
|
||||
*/
|
||||
public static final String NAME = AbfsManifestStoreOperations.class.getName();
|
||||
|
||||
/**
|
||||
* Resilient rename calls; only available on an ADLS Gen2 store.
|
||||
* Will be null after binding if the FS isn't compatible.
|
||||
*/
|
||||
private ResilientCommitByRename resilientCommitByRename;
|
||||
|
||||
@Override
|
||||
public AzureBlobFileSystem getFileSystem() {
|
||||
return (AzureBlobFileSystem) super.getFileSystem();
|
||||
}
|
||||
|
||||
/**
|
||||
* Bind to the store.
|
||||
*
|
||||
* @param filesystem FS.
|
||||
* @param path path to work under
|
||||
* @throws IOException binding problems.
|
||||
*/
|
||||
@Override
|
||||
public void bindToFileSystem(FileSystem filesystem, Path path) throws IOException {
|
||||
if (!(filesystem instanceof AzureBlobFileSystem)) {
|
||||
throw new PathIOException(path.toString(),
|
||||
"Not an abfs filesystem: " + filesystem.getClass());
|
||||
}
|
||||
super.bindToFileSystem(filesystem, path);
|
||||
try {
|
||||
resilientCommitByRename = getFileSystem().createResilientCommitSupport(path);
|
||||
LOG.debug("Bonded to filesystem with resilient commits under path {}", path);
|
||||
} catch (UnsupportedOperationException e) {
|
||||
LOG.debug("No resilient commit support under path {}", path);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean storePreservesEtagsThroughRenames(final Path path) {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resilient commits available on hierarchical stores.
|
||||
* @return true if the FS can use etags on renames.
|
||||
*/
|
||||
@Override
|
||||
public boolean storeSupportsResilientCommit() {
|
||||
return resilientCommitByRename != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit a file through an internal ABFS operation.
|
||||
* If resilient commit is unavailable, invokes the superclass, which
|
||||
* will raise an UnsupportedOperationException
|
||||
* @param entry entry to commit
|
||||
* @return the outcome
|
||||
* @throws IOException any failure in resilient commit.
|
||||
* @throws UnsupportedOperationException if not available.
|
||||
*/
|
||||
@Override
|
||||
public CommitFileResult commitFile(final FileEntry entry) throws IOException {
|
||||
|
||||
if (resilientCommitByRename != null) {
|
||||
final Pair<Boolean, Duration> result =
|
||||
resilientCommitByRename.commitSingleFileByRename(
|
||||
entry.getSourcePath(),
|
||||
entry.getDestPath(),
|
||||
entry.getEtag());
|
||||
return CommitFileResult.fromResilientCommit(result.getLeft(),
|
||||
result.getRight());
|
||||
} else {
|
||||
return super.commitFile(entry);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory;
|
||||
|
||||
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
|
||||
|
||||
/**
|
||||
* A Committer for the manifest committer which performs all bindings needed
|
||||
* to work best with abfs.
|
||||
* This includes, at a minimum, switching to the abfs-specific manifest store operations.
|
||||
*
|
||||
* This classname is referenced in configurations, so MUST NOT change.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class AzureManifestCommitterFactory extends ManifestCommitterFactory {
|
||||
|
||||
/**
|
||||
* Classname, which can be declared in job configurations.
|
||||
*/
|
||||
public static final String NAME = ManifestCommitterFactory.class.getName();
|
||||
|
||||
@Override
|
||||
public ManifestCommitter createOutputCommitter(final Path outputPath,
|
||||
final TaskAttemptContext context) throws IOException {
|
||||
final Configuration conf = context.getConfiguration();
|
||||
// use ABFS Store operations
|
||||
conf.set(OPT_STORE_OPERATIONS_CLASS,
|
||||
AbfsManifestStoreOperations.NAME);
|
||||
return super.createOutputCommitter(outputPath, context);
|
||||
}
|
||||
}
|
@ -0,0 +1,101 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathIOException;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
|
||||
/**
|
||||
* API exclusively for committing files.
|
||||
*
|
||||
* This is only for use by (@link {@link AbfsManifestStoreOperations},
|
||||
* and is intended to be implemented by ABFS.
|
||||
* To ensure that there is no need to add mapreduce JARs to the
|
||||
* classpath just to work with ABFS, this interface
|
||||
* MUST NOT refer to anything in the
|
||||
* {@code org.apache.hadoop.mapreduce} package.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public interface ResilientCommitByRename extends IOStatisticsSource {
|
||||
|
||||
/**
|
||||
* Rename source file to dest path *Exactly*; no subdirectory games here.
|
||||
* if the method does not raise an exception,then
|
||||
* the data at dest is the data which was at source.
|
||||
*
|
||||
* Requirements
|
||||
*
|
||||
* <pre>
|
||||
* exists(FS, source) else raise FileNotFoundException
|
||||
* source != dest else raise PathIOException
|
||||
* not exists(FS, dest)
|
||||
* isDir(FS, dest.getParent)
|
||||
* </pre>
|
||||
* <ol>
|
||||
* <li>source != dest else raise PathIOException</li>
|
||||
* <li>source must exist else raise FileNotFoundException</li>
|
||||
* <li>source must exist and be a file</li>
|
||||
* <li>dest must not exist; </li>
|
||||
* <li>dest.getParent() must be a dir</li>
|
||||
* <li>if sourceEtag is non-empty, it MAY be used to qualify/validate the rename.</li>
|
||||
* </ol>
|
||||
*
|
||||
* The outcome of the operation is undefined if source is not a file, dest exists,
|
||||
* dest.getParent() doesn't exist/is a file.
|
||||
* That is: implementations SHOULD assume that the code calling this method has
|
||||
* set up the destination directory tree and is only invoking this call on a file.
|
||||
* Accordingly: <i>implementations MAY skip validation checks</i>
|
||||
*
|
||||
* Post Conditions on a successful operation:
|
||||
* <pre>
|
||||
* FS' where:
|
||||
* not exists(FS', source)
|
||||
* and exists(FS', dest)
|
||||
* and data(FS', dest) == data (FS, source)
|
||||
* </pre>
|
||||
* This is exactly the same outcome as `FileSystem.rename()` when the same preconditions
|
||||
* are met. This API call simply restricts the operation to file rename with strict
|
||||
* conditions, (no need to be 'clever' about dest path calculation) and the ability
|
||||
* to pass in etags, modtimes and file status values.
|
||||
*
|
||||
* @param source path to source file
|
||||
* @param dest destination of rename.
|
||||
* @param sourceEtag etag of source file. may be null or empty
|
||||
* @return true if recovery was needed.
|
||||
* @throws FileNotFoundException source file not found
|
||||
* @throws PathIOException failure, including source and dest being the same path
|
||||
* @throws IOException any other exception
|
||||
*/
|
||||
Pair<Boolean, Duration> commitSingleFileByRename(
|
||||
Path source,
|
||||
Path dest,
|
||||
@Nullable String sourceEtag) throws IOException;
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Support for manifest committer.
|
||||
* Unless otherwise stated: classes are private.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
@ -220,6 +220,9 @@ public final class ConfigurationKeys {
|
||||
/** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */
|
||||
public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track";
|
||||
|
||||
/** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */
|
||||
public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit";
|
||||
|
||||
public static String accountProperty(String property, String account) {
|
||||
return property + "." + account;
|
||||
}
|
||||
|
@ -133,5 +133,10 @@ public final class FileSystemConfigurations {
|
||||
public static final String DATA_BLOCKS_BUFFER_DEFAULT =
|
||||
DATA_BLOCKS_BUFFER_DISK;
|
||||
|
||||
/**
|
||||
* IO rate limit. Value: {@value}
|
||||
*/
|
||||
public static final int RATE_LIMIT_DEFAULT = 10_000;
|
||||
|
||||
private FileSystemConfigurations() {}
|
||||
}
|
||||
|
@ -51,6 +51,7 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
|
||||
@ -67,6 +68,8 @@
|
||||
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
|
||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
|
||||
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
||||
@ -478,8 +481,30 @@ public AbfsRestOperation breakLease(final String path,
|
||||
return op;
|
||||
}
|
||||
|
||||
public AbfsRestOperation renamePath(String source, final String destination,
|
||||
final String continuation, TracingContext tracingContext)
|
||||
|
||||
/**
|
||||
* Rename a file or directory.
|
||||
* If a source etag is passed in, the operation will attempt to recover
|
||||
* from a missing source file by probing the destination for
|
||||
* existence and comparing etags.
|
||||
* The second value in the result will be true to indicate that this
|
||||
* took place.
|
||||
* As rename recovery is only attempted if the source etag is non-empty,
|
||||
* in normal rename operations rename recovery will never happen.
|
||||
* @param source path to source file
|
||||
* @param destination destination of rename.
|
||||
* @param continuation continuation.
|
||||
* @param tracingContext trace context
|
||||
* @param sourceEtag etag of source file. may be null or empty
|
||||
* @return pair of (the rename operation, flag indicating recovery took place)
|
||||
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
|
||||
*/
|
||||
public Pair<AbfsRestOperation, Boolean> renamePath(
|
||||
final String source,
|
||||
final String destination,
|
||||
final String continuation,
|
||||
final TracingContext tracingContext,
|
||||
final String sourceEtag)
|
||||
throws AzureBlobFileSystemException {
|
||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||
|
||||
@ -505,9 +530,73 @@ public AbfsRestOperation renamePath(String source, final String destination,
|
||||
HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders);
|
||||
// no attempt at recovery using timestamps as it was not reliable.
|
||||
op.execute(tracingContext);
|
||||
return op;
|
||||
try {
|
||||
op.execute(tracingContext);
|
||||
return Pair.of(op, false);
|
||||
} catch (AzureBlobFileSystemException e) {
|
||||
// If we have no HTTP response, throw the original exception.
|
||||
if (!op.hasResult()) {
|
||||
throw e;
|
||||
}
|
||||
boolean etagCheckSucceeded = renameIdempotencyCheckOp(
|
||||
source,
|
||||
sourceEtag, op, destination, tracingContext);
|
||||
if (!etagCheckSucceeded) {
|
||||
// idempotency did not return different result
|
||||
// throw back the exception
|
||||
throw e;
|
||||
}
|
||||
return Pair.of(op, true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the rename request failure is post a retry and if earlier rename
|
||||
* request might have succeeded at back-end.
|
||||
*
|
||||
* If a source etag was passed in, and the error was 404, get the
|
||||
* etag of any file at the destination.
|
||||
* If it matches the source etag, then the rename is considered
|
||||
* a success.
|
||||
* Exceptions raised in the probe of the destination are swallowed,
|
||||
* so that they do not interfere with the original rename failures.
|
||||
* @param source source path
|
||||
* @param op Rename request REST operation response with non-null HTTP response
|
||||
* @param destination rename destination path
|
||||
* @param sourceEtag etag of source file. may be null or empty
|
||||
* @param tracingContext Tracks identifiers for request header
|
||||
* @return true if the file was successfully copied
|
||||
*/
|
||||
public boolean renameIdempotencyCheckOp(
|
||||
final String source,
|
||||
final String sourceEtag,
|
||||
final AbfsRestOperation op,
|
||||
final String destination,
|
||||
TracingContext tracingContext) {
|
||||
Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");
|
||||
|
||||
if ((op.isARetriedRequest())
|
||||
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
|
||||
&& isNotEmpty(sourceEtag)) {
|
||||
|
||||
// Server has returned HTTP 404, which means rename source no longer
|
||||
// exists. Check on destination status and if its etag matches
|
||||
// that of the source, consider it to be a success.
|
||||
LOG.debug("rename {} to {} failed, checking etag of destination",
|
||||
source, destination);
|
||||
|
||||
try {
|
||||
final AbfsRestOperation destStatusOp = getPathStatus(destination,
|
||||
false, tracingContext);
|
||||
final AbfsHttpOperation result = destStatusOp.getResult();
|
||||
|
||||
return result.getStatusCode() == HttpURLConnection.HTTP_OK
|
||||
&& sourceEtag.equals(extractEtagHeader(result));
|
||||
} catch (AzureBlobFileSystemException ignored) {
|
||||
// GetFileStatus on the destination failed, the rename did not take place
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public AbfsRestOperation append(final String path, final byte[] buffer,
|
||||
|
@ -48,9 +48,15 @@ public String getStorageAccountKey(String accountName, Configuration rawConfig)
|
||||
// Validating the key.
|
||||
validateStorageAccountKey(key);
|
||||
} catch (IllegalAccessException | InvalidConfigurationValueException e) {
|
||||
throw new KeyProviderException("Failure to initialize configuration", e);
|
||||
LOG.debug("Failure to retrieve storage account key for {}", accountName,
|
||||
e);
|
||||
throw new KeyProviderException("Failure to initialize configuration for "
|
||||
+ accountName
|
||||
+ " key =\"" + key + "\""
|
||||
+ ": " + e, e);
|
||||
} catch(IOException ioe) {
|
||||
LOG.warn("Unable to get key from credential providers. {}", ioe);
|
||||
LOG.warn("Unable to get key for {} from credential providers. {}",
|
||||
accountName, ioe, ioe);
|
||||
}
|
||||
|
||||
return key;
|
||||
|
@ -271,11 +271,12 @@ protected void createFilesystemForSASTests() throws Exception {
|
||||
// The SAS tests do not have permission to create a filesystem
|
||||
// so first create temporary instance of the filesystem using SharedKey
|
||||
// then re-use the filesystem it creates with SAS auth instead of SharedKey.
|
||||
AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
|
||||
ContractTestUtils.assertPathExists(tempFs, "This path should exist",
|
||||
new Path("/"));
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
|
||||
usingFilesystemForSASTests = true;
|
||||
try (AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig)){
|
||||
ContractTestUtils.assertPathExists(tempFs, "This path should exist",
|
||||
new Path("/"));
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
|
||||
usingFilesystemForSASTests = true;
|
||||
}
|
||||
}
|
||||
|
||||
public AzureBlobFileSystem getFileSystem() throws IOException {
|
||||
|
@ -32,7 +32,10 @@
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
|
||||
@ -76,13 +79,19 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
|
||||
new Random().nextBytes(b);
|
||||
|
||||
Path testPath = path(TEST_PATH);
|
||||
try (FSDataOutputStream stream = fs.create(testPath)) {
|
||||
FSDataOutputStream stream = fs.create(testPath);
|
||||
try {
|
||||
stream.write(b);
|
||||
} finally{
|
||||
stream.close();
|
||||
}
|
||||
IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
|
||||
|
||||
final byte[] readBuffer = new byte[2 * bufferSize];
|
||||
int result;
|
||||
IOStatisticsSource statisticsSource = null;
|
||||
try (FSDataInputStream inputStream = fs.open(testPath)) {
|
||||
statisticsSource = inputStream;
|
||||
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
|
||||
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
|
||||
fs.getFileSystemId(), FSOperationType.READ, true, 0,
|
||||
@ -100,6 +109,8 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
|
||||
inputStream.seek(0);
|
||||
result = inputStream.read(readBuffer, 0, bufferSize);
|
||||
}
|
||||
IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
|
||||
|
||||
assertNotEquals("data read in final read()", -1, result);
|
||||
assertArrayEquals(readBuffer, b);
|
||||
}
|
||||
|
@ -401,7 +401,8 @@ public void testSignatureMask() throws Exception {
|
||||
fs.create(new Path(src)).close();
|
||||
AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
|
||||
.renamePath(src, "/testABC" + "/abc.txt", null,
|
||||
getTestTracingContext(fs, false));
|
||||
getTestTracingContext(fs, false), null)
|
||||
.getLeft();
|
||||
AbfsHttpOperation result = abfsHttpRestOperation.getResult();
|
||||
String url = result.getMaskedUrl();
|
||||
String encodedUrl = result.getMaskedEncodedUrl();
|
||||
@ -418,7 +419,7 @@ public void testSignatureMaskOnExceptionMessage() throws Exception {
|
||||
intercept(IOException.class, "sig=XXXX",
|
||||
() -> getFileSystem().getAbfsClient()
|
||||
.renamePath("testABC/test.xt", "testABC/abc.txt", null,
|
||||
getTestTracingContext(getFileSystem(), false)));
|
||||
getTestTracingContext(getFileSystem(), false), null));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -526,7 +526,8 @@ private void testRenamePath(final boolean isWithCPK) throws Exception {
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
AbfsRestOperation abfsRestOperation = abfsClient
|
||||
.renamePath(testFileName, newName, null,
|
||||
getTestTracingContext(fs, false));
|
||||
getTestTracingContext(fs, false), null)
|
||||
.getLeft();
|
||||
assertCPKHeaders(abfsRestOperation, false);
|
||||
assertNoCPKResponseHeadersPresent(abfsRestOperation);
|
||||
|
||||
|
@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
|
||||
|
||||
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
|
||||
|
||||
/**
|
||||
* Helper methods for committer tests on ABFS.
|
||||
*/
|
||||
final class AbfsCommitTestHelper {
|
||||
private AbfsCommitTestHelper() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare the test configuration.
|
||||
* @param contractTestBinding test binding
|
||||
* @return an extracted and patched configuration.
|
||||
*/
|
||||
static Configuration prepareTestConfiguration(
|
||||
ABFSContractTestBinding contractTestBinding) {
|
||||
final Configuration conf =
|
||||
contractTestBinding.getRawConfiguration();
|
||||
|
||||
// use ABFS Store operations
|
||||
conf.set(OPT_STORE_OPERATIONS_CLASS,
|
||||
AbfsManifestStoreOperations.NAME);
|
||||
|
||||
return conf;
|
||||
}
|
||||
}
|
@ -0,0 +1,260 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
|
||||
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.util.DurationInfo;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled;
|
||||
import static org.apache.hadoop.io.IOUtils.closeStream;
|
||||
|
||||
/**
|
||||
* Tests which create a yarn minicluster.
|
||||
* These are all considered scale tests; the probe for
|
||||
* scale tests being enabled is executed before the cluster
|
||||
* is set up to avoid wasting time on non-scale runs.
|
||||
*/
|
||||
public abstract class AbstractAbfsClusterITest extends
|
||||
AbstractManifestCommitterTest {
|
||||
|
||||
public static final int NO_OF_NODEMANAGERS = 2;
|
||||
|
||||
private final ABFSContractTestBinding binding;
|
||||
|
||||
|
||||
/**
|
||||
* The static cluster binding with the lifecycle of this test; served
|
||||
* through instance-level methods for sharing across methods in the
|
||||
* suite.
|
||||
*/
|
||||
@SuppressWarnings("StaticNonFinalField")
|
||||
private static ClusterBinding clusterBinding;
|
||||
|
||||
protected AbstractAbfsClusterITest() throws Exception {
|
||||
binding = new ABFSContractTestBinding();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getTestTimeoutMillis() {
|
||||
return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
binding.setup();
|
||||
super.setup();
|
||||
requireScaleTestsEnabled();
|
||||
if (getClusterBinding() == null) {
|
||||
clusterBinding = demandCreateClusterBinding();
|
||||
}
|
||||
assertNotNull("cluster is not bound", getClusterBinding());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClusters() throws IOException {
|
||||
terminateCluster(clusterBinding);
|
||||
clusterBinding = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(final Configuration conf) {
|
||||
return new AbfsFileSystemContract(conf, binding.isSecureMode());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the cluster binding which every subclass must create.
|
||||
*/
|
||||
protected static final class ClusterBinding {
|
||||
|
||||
private String clusterName;
|
||||
|
||||
private final MiniMRYarnCluster yarn;
|
||||
|
||||
public ClusterBinding(
|
||||
final String clusterName,
|
||||
final MiniMRYarnCluster yarn) {
|
||||
this.clusterName = clusterName;
|
||||
this.yarn = requireNonNull(yarn);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the cluster FS, which will either be HDFS or the local FS.
|
||||
* @return a filesystem.
|
||||
* @throws IOException failure
|
||||
*/
|
||||
public FileSystem getClusterFS() throws IOException {
|
||||
return FileSystem.getLocal(yarn.getConfig());
|
||||
}
|
||||
|
||||
public MiniMRYarnCluster getYarn() {
|
||||
return yarn;
|
||||
}
|
||||
|
||||
public Configuration getConf() {
|
||||
return getYarn().getConfig();
|
||||
}
|
||||
|
||||
public String getClusterName() {
|
||||
return clusterName;
|
||||
}
|
||||
|
||||
public void terminate() {
|
||||
closeStream(getYarn());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the cluster binding.
|
||||
* The configuration will be patched by propagating down options
|
||||
* from the maven build (S3Guard binding etc) and turning off unwanted
|
||||
* YARN features.
|
||||
*
|
||||
* If an HDFS cluster is requested,
|
||||
* the HDFS and YARN clusters will share the same configuration, so
|
||||
* the HDFS cluster binding is implicitly propagated to YARN.
|
||||
* If one is not requested, the local filesystem is used as the cluster FS.
|
||||
* @param conf configuration to start with.
|
||||
* @return the cluster binding.
|
||||
* @throws IOException failure.
|
||||
*/
|
||||
protected static ClusterBinding createCluster(
|
||||
final JobConf conf) throws IOException {
|
||||
try (DurationInfo d = new DurationInfo(LOG, "Creating YARN MiniCluster")) {
|
||||
conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
|
||||
// create a unique cluster name based on the current time in millis.
|
||||
String timestamp = LocalDateTime.now().format(
|
||||
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS"));
|
||||
String clusterName = "yarn-" + timestamp;
|
||||
MiniMRYarnCluster yarnCluster =
|
||||
new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS);
|
||||
yarnCluster.init(conf);
|
||||
yarnCluster.start();
|
||||
return new ClusterBinding(clusterName, yarnCluster);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Terminate the cluster if it is not null.
|
||||
* @param cluster the cluster
|
||||
*/
|
||||
protected static void terminateCluster(ClusterBinding cluster) {
|
||||
if (cluster != null) {
|
||||
cluster.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the cluster binding for this subclass.
|
||||
* @return the cluster binding
|
||||
*/
|
||||
protected ClusterBinding getClusterBinding() {
|
||||
return clusterBinding;
|
||||
}
|
||||
|
||||
protected MiniMRYarnCluster getYarn() {
|
||||
return getClusterBinding().getYarn();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* We stage work into a temporary directory rather than directly under
|
||||
* the user's home directory, as that is often rejected by CI test
|
||||
* runners.
|
||||
*/
|
||||
@Rule
|
||||
public final TemporaryFolder stagingFilesDir = new TemporaryFolder();
|
||||
|
||||
|
||||
/**
|
||||
* binding on demand rather than in a BeforeClass static method.
|
||||
* Subclasses can override this to change the binding options.
|
||||
* @return the cluster binding
|
||||
*/
|
||||
protected ClusterBinding demandCreateClusterBinding() throws Exception {
|
||||
return createCluster(new JobConf());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a job configuration.
|
||||
* This creates a new job conf from the yarn
|
||||
* cluster configuration then calls
|
||||
* {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized.
|
||||
* @return the new job configuration.
|
||||
* @throws IOException failure
|
||||
*/
|
||||
protected JobConf newJobConf() throws IOException {
|
||||
JobConf jobConf = new JobConf(getYarn().getConfig());
|
||||
jobConf.addResource(getConfiguration());
|
||||
applyCustomConfigOptions(jobConf);
|
||||
return jobConf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Patch the (job) configuration for this committer.
|
||||
* @param jobConf configuration to patch
|
||||
* @return a configuration which will run this configuration.
|
||||
*/
|
||||
protected Configuration patchConfigurationForCommitter(
|
||||
final Configuration jobConf) {
|
||||
enableManifestCommitter(jobConf);
|
||||
return jobConf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Override point to let implementations tune the MR Job conf.
|
||||
* @param jobConf configuration
|
||||
*/
|
||||
protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Assume that scale tests are enabled.
|
||||
*/
|
||||
protected void requireScaleTestsEnabled() {
|
||||
assumeScaleTestsEnabled(getConfiguration());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCleanupStage;
|
||||
|
||||
/**
|
||||
* Cleanup logic on ABFS.
|
||||
*/
|
||||
public class ITestAbfsCleanupStage extends TestCleanupStage {
|
||||
|
||||
private final ABFSContractTestBinding binding;
|
||||
|
||||
public ITestAbfsCleanupStage() throws Exception {
|
||||
binding = new ABFSContractTestBinding();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
binding.setup();
|
||||
super.setup();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(final Configuration conf) {
|
||||
return new AbfsFileSystemContract(conf, binding.isSecureMode());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCommitTaskStage;
|
||||
|
||||
/**
|
||||
* ABFS storage test of task committer.
|
||||
*/
|
||||
public class ITestAbfsCommitTaskStage extends TestCommitTaskStage {
|
||||
|
||||
private final ABFSContractTestBinding binding;
|
||||
|
||||
public ITestAbfsCommitTaskStage() throws Exception {
|
||||
binding = new ABFSContractTestBinding();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
binding.setup();
|
||||
super.setup();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(final Configuration conf) {
|
||||
return new AbfsFileSystemContract(conf, binding.isSecureMode());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCreateOutputDirectoriesStage;
|
||||
|
||||
/**
|
||||
* ABFS storage test of directory creation.
|
||||
*/
|
||||
public class ITestAbfsCreateOutputDirectoriesStage extends TestCreateOutputDirectoriesStage {
|
||||
|
||||
private final ABFSContractTestBinding binding;
|
||||
|
||||
public ITestAbfsCreateOutputDirectoriesStage() throws Exception {
|
||||
binding = new ABFSContractTestBinding();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
binding.setup();
|
||||
super.setup();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(final Configuration conf) {
|
||||
return new AbfsFileSystemContract(conf, binding.isSecureMode());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,101 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.runners.MethodSorters;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestJobThroughManifestCommitter;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
|
||||
|
||||
/**
|
||||
* Test the Manifest committer stages against ABFS.
|
||||
*/
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
public class ITestAbfsJobThroughManifestCommitter
|
||||
extends TestJobThroughManifestCommitter {
|
||||
|
||||
private final ABFSContractTestBinding binding;
|
||||
|
||||
public ITestAbfsJobThroughManifestCommitter() throws Exception {
|
||||
binding = new ABFSContractTestBinding();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
binding.setup();
|
||||
super.setup();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return enableManifestCommitter(prepareTestConfiguration(binding));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(final Configuration conf) {
|
||||
return new AbfsFileSystemContract(conf, binding.isSecureMode());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldDeleteTestRootAtEndOfTestRun() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add read of manifest and validate of output's etags.
|
||||
* @param attemptId attempt ID
|
||||
* @param files files which were created.
|
||||
* @param manifest manifest
|
||||
* @throws IOException failure
|
||||
*/
|
||||
@Override
|
||||
protected void validateTaskAttemptManifest(String attemptId,
|
||||
List<Path> files,
|
||||
TaskManifest manifest) throws IOException {
|
||||
super.validateTaskAttemptManifest(attemptId, files, manifest);
|
||||
final List<FileEntry> commit = manifest.getFilesToCommit();
|
||||
final ManifestStoreOperations operations = getStoreOperations();
|
||||
for (FileEntry entry : commit) {
|
||||
Assertions.assertThat(entry.getEtag())
|
||||
.describedAs("Etag of %s", entry)
|
||||
.isNotEmpty();
|
||||
final FileStatus sourceStatus = operations.getFileStatus(entry.getSourcePath());
|
||||
final String etag = ManifestCommitterSupport.getEtag(sourceStatus);
|
||||
Assertions.assertThat(etag)
|
||||
.describedAs("Etag of %s", sourceStatus)
|
||||
.isEqualTo(entry.getEtag());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestLoadManifestsStage;
|
||||
|
||||
/**
|
||||
* ABFS storage test of saving and loading a large number
|
||||
* of manifests.
|
||||
*/
|
||||
public class ITestAbfsLoadManifestsStage extends TestLoadManifestsStage {
|
||||
|
||||
private final ABFSContractTestBinding binding;
|
||||
|
||||
public ITestAbfsLoadManifestsStage() throws Exception {
|
||||
binding = new ABFSContractTestBinding();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
binding.setup();
|
||||
super.setup();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(final Configuration conf) {
|
||||
return new AbfsFileSystemContract(conf, binding.isSecureMode());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestManifestCommitProtocol;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
|
||||
|
||||
/**
|
||||
* Test the Manifest protocol against ABFS.
|
||||
*/
|
||||
public class ITestAbfsManifestCommitProtocol extends
|
||||
TestManifestCommitProtocol {
|
||||
|
||||
private final ABFSContractTestBinding binding;
|
||||
|
||||
public ITestAbfsManifestCommitProtocol() throws Exception {
|
||||
binding = new ABFSContractTestBinding();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
binding.setup();
|
||||
super.setup();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return enableManifestCommitter(prepareTestConfiguration(binding));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(final Configuration conf) {
|
||||
return new AbfsFileSystemContract(conf, binding.isSecureMode());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected String suitename() {
|
||||
return "ITestAbfsManifestCommitProtocol";
|
||||
}
|
||||
}
|
@ -0,0 +1,175 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
|
||||
import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
/**
|
||||
* Test {@link AbfsManifestStoreOperations}.
|
||||
* As this looks at etag handling through FS operations, it's actually testing how etags work
|
||||
* in ABFS (preservation across renames) and in the client (are they consistent
|
||||
* in LIST and HEAD calls).
|
||||
*
|
||||
* Skipped when tested against wasb-compatible stores.
|
||||
*/
|
||||
public class ITestAbfsManifestStoreOperations extends AbstractManifestCommitterTest {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ITestAbfsManifestStoreOperations.class);
|
||||
|
||||
private final ABFSContractTestBinding binding;
|
||||
|
||||
public ITestAbfsManifestStoreOperations() throws Exception {
|
||||
binding = new ABFSContractTestBinding();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
binding.setup();
|
||||
super.setup();
|
||||
|
||||
// skip tests on non-HNS stores
|
||||
assumeTrue("Resilient rename not available",
|
||||
getFileSystem().hasPathCapability(getContract().getTestPath(),
|
||||
ETAGS_PRESERVED_IN_RENAME));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return enableManifestCommitter(prepareTestConfiguration(binding));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(final Configuration conf) {
|
||||
return new AbfsFileSystemContract(conf, binding.isSecureMode());
|
||||
}
|
||||
|
||||
/**
|
||||
* basic consistency across operations, as well as being non-empty.
|
||||
*/
|
||||
@Test
|
||||
public void testEtagConsistencyAcrossListAndHead() throws Throwable {
|
||||
describe("Etag values must be non-empty and consistent across LIST and HEAD Calls.");
|
||||
final Path path = methodPath();
|
||||
final FileSystem fs = getFileSystem();
|
||||
ContractTestUtils.touch(fs, path);
|
||||
final ManifestStoreOperations operations = createManifestStoreOperations();
|
||||
Assertions.assertThat(operations)
|
||||
.describedAs("Store operations class loaded via Configuration")
|
||||
.isInstanceOf(AbfsManifestStoreOperations.class);
|
||||
|
||||
final FileStatus st = operations.getFileStatus(path);
|
||||
final String etag = operations.getEtag(st);
|
||||
Assertions.assertThat(etag)
|
||||
.describedAs("Etag of %s", st)
|
||||
.isNotBlank();
|
||||
LOG.info("etag of empty file is \"{}\"", etag);
|
||||
|
||||
final FileStatus[] statuses = fs.listStatus(path);
|
||||
Assertions.assertThat(statuses)
|
||||
.describedAs("List(%s)", path)
|
||||
.hasSize(1);
|
||||
final FileStatus lsStatus = statuses[0];
|
||||
Assertions.assertThat(operations.getEtag(lsStatus))
|
||||
.describedAs("etag of list status (%s) compared to HEAD value of %s", lsStatus, st)
|
||||
.isEqualTo(etag);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEtagsOfDifferentDataDifferent() throws Throwable {
|
||||
describe("Verify that two different blocks of data written have different tags");
|
||||
|
||||
final Path path = methodPath();
|
||||
final FileSystem fs = getFileSystem();
|
||||
Path src = new Path(path, "src");
|
||||
|
||||
ContractTestUtils.createFile(fs, src, true,
|
||||
"data1234".getBytes(StandardCharsets.UTF_8));
|
||||
final ManifestStoreOperations operations = createManifestStoreOperations();
|
||||
final FileStatus srcStatus = operations.getFileStatus(src);
|
||||
final String srcTag = operations.getEtag(srcStatus);
|
||||
LOG.info("etag of file 1 is \"{}\"", srcTag);
|
||||
|
||||
// now overwrite with data of same length
|
||||
// (ensure that path or length aren't used exclusively as tag)
|
||||
ContractTestUtils.createFile(fs, src, true,
|
||||
"1234data".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
// validate
|
||||
final String tag2 = operations.getEtag(operations.getFileStatus(src));
|
||||
LOG.info("etag of file 2 is \"{}\"", tag2);
|
||||
|
||||
Assertions.assertThat(tag2)
|
||||
.describedAs("etag of updated file")
|
||||
.isNotEqualTo(srcTag);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEtagConsistencyAcrossRename() throws Throwable {
|
||||
describe("Verify that when a file is renamed, the etag remains unchanged");
|
||||
final Path path = methodPath();
|
||||
final FileSystem fs = getFileSystem();
|
||||
Path src = new Path(path, "src");
|
||||
Path dest = new Path(path, "dest");
|
||||
|
||||
ContractTestUtils.createFile(fs, src, true,
|
||||
"sample data".getBytes(StandardCharsets.UTF_8));
|
||||
final ManifestStoreOperations operations = createManifestStoreOperations();
|
||||
final FileStatus srcStatus = operations.getFileStatus(src);
|
||||
final String srcTag = operations.getEtag(srcStatus);
|
||||
LOG.info("etag of short file is \"{}\"", srcTag);
|
||||
|
||||
Assertions.assertThat(srcTag)
|
||||
.describedAs("Etag of %s", srcStatus)
|
||||
.isNotBlank();
|
||||
|
||||
// rename
|
||||
operations.commitFile(new FileEntry(src, dest, 0, srcTag));
|
||||
|
||||
// validate
|
||||
FileStatus destStatus = operations.getFileStatus(dest);
|
||||
final String destTag = operations.getEtag(destStatus);
|
||||
Assertions.assertThat(destTag)
|
||||
.describedAs("etag of list status (%s) compared to HEAD value of %s", destStatus, srcStatus)
|
||||
.isEqualTo(srcTag);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestRenameStageFailure;
|
||||
|
||||
/**
|
||||
* Rename failure logic on ABFS.
|
||||
* This will go through the resilient rename operation.
|
||||
*/
|
||||
public class ITestAbfsRenameStageFailure extends TestRenameStageFailure {
|
||||
|
||||
/**
|
||||
* How many files to create.
|
||||
*/
|
||||
private static final int FILES_TO_CREATE = 20;
|
||||
|
||||
private final ABFSContractTestBinding binding;
|
||||
|
||||
public ITestAbfsRenameStageFailure() throws Exception {
|
||||
binding = new ABFSContractTestBinding();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
binding.setup();
|
||||
super.setup();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(final Configuration conf) {
|
||||
return new AbfsFileSystemContract(conf, binding.isSecureMode());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean requireRenameResilience() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int filesToCreate() {
|
||||
return FILES_TO_CREATE;
|
||||
}
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
|
||||
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestTaskManifestFileIO;
|
||||
|
||||
/**
|
||||
* Test Reading/writing manifest file through ABFS.
|
||||
*/
|
||||
public class ITestAbfsTaskManifestFileIO extends TestTaskManifestFileIO {
|
||||
|
||||
private final ABFSContractTestBinding binding;
|
||||
|
||||
public ITestAbfsTaskManifestFileIO() throws Exception {
|
||||
binding = new ABFSContractTestBinding();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
binding.setup();
|
||||
super.setup();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return AbfsCommitTestHelper.prepareTestConfiguration(binding);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(final Configuration conf) {
|
||||
return new AbfsFileSystemContract(conf, binding.isSecureMode());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,353 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.junit.Assume;
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.MethodSorters;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.examples.terasort.TeraGen;
|
||||
import org.apache.hadoop.examples.terasort.TeraSort;
|
||||
import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
|
||||
import org.apache.hadoop.examples.terasort.TeraValidate;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
|
||||
import org.apache.hadoop.util.DurationInfo;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.util.functional.RemoteIterators;
|
||||
|
||||
import static java.util.Optional.empty;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
|
||||
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.loadSuccessFile;
|
||||
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateSuccessFile;
|
||||
|
||||
/**
|
||||
* Runs Terasort against ABFS using the manifest committer.
|
||||
* The tests run in sequence, so each operation is isolated.
|
||||
* Scale test only (it is big and slow)
|
||||
*/
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
@SuppressWarnings({"StaticNonFinalField", "OptionalUsedAsFieldOrParameterType"})
|
||||
public class ITestAbfsTerasort extends AbstractAbfsClusterITest {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ITestAbfsTerasort.class);
|
||||
|
||||
public static final int EXPECTED_PARTITION_COUNT = 10;
|
||||
|
||||
public static final int PARTITION_SAMPLE_SIZE = 1000;
|
||||
|
||||
public static final int ROW_COUNT = 1000;
|
||||
|
||||
/**
|
||||
* This has to be common across all test methods.
|
||||
*/
|
||||
private static final Path TERASORT_PATH = new Path("/ITestAbfsTerasort");
|
||||
|
||||
/**
|
||||
* Duration tracker created in the first of the test cases and closed
|
||||
* in {@link #test_140_teracomplete()}.
|
||||
*/
|
||||
private static Optional<DurationInfo> terasortDuration = empty();
|
||||
|
||||
/**
|
||||
* Tracker of which stages are completed and how long they took.
|
||||
*/
|
||||
private static final Map<String, DurationInfo> COMPLETED_STAGES = new HashMap<>();
|
||||
|
||||
/**
|
||||
* FileSystem statistics are collected from the _SUCCESS markers.
|
||||
*/
|
||||
protected static final IOStatisticsSnapshot JOB_IOSTATS =
|
||||
snapshotIOStatistics();
|
||||
|
||||
/** Base path for all the terasort input and output paths. */
|
||||
private Path terasortPath;
|
||||
|
||||
/** Input (teragen) path. */
|
||||
private Path sortInput;
|
||||
|
||||
/** Path where sorted data goes. */
|
||||
private Path sortOutput;
|
||||
|
||||
/** Path for validated job's output. */
|
||||
private Path sortValidate;
|
||||
|
||||
public ITestAbfsTerasort() throws Exception {
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
// superclass calls requireScaleTestsEnabled();
|
||||
super.setup();
|
||||
prepareToTerasort();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up the job conf with the options for terasort chosen by the scale
|
||||
* options.
|
||||
* @param conf configuration
|
||||
*/
|
||||
@Override
|
||||
protected void applyCustomConfigOptions(JobConf conf) {
|
||||
// small sample size for faster runs
|
||||
conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(),
|
||||
getSampleSizeForEachPartition());
|
||||
conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
|
||||
getExpectedPartitionCount());
|
||||
conf.setBoolean(
|
||||
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
|
||||
false);
|
||||
}
|
||||
|
||||
private int getExpectedPartitionCount() {
|
||||
return EXPECTED_PARTITION_COUNT;
|
||||
}
|
||||
|
||||
private int getSampleSizeForEachPartition() {
|
||||
return PARTITION_SAMPLE_SIZE;
|
||||
}
|
||||
|
||||
protected int getRowCount() {
|
||||
return ROW_COUNT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up the terasort by initializing paths variables
|
||||
* The paths used must be unique across parameterized runs but
|
||||
* common across all test cases in a single parameterized run.
|
||||
*/
|
||||
private void prepareToTerasort() {
|
||||
terasortPath = getFileSystem().makeQualified(TERASORT_PATH);
|
||||
sortInput = new Path(terasortPath, "sortin");
|
||||
sortOutput = new Path(terasortPath, "sortout");
|
||||
sortValidate = new Path(terasortPath, "validate");
|
||||
}
|
||||
|
||||
/**
|
||||
* Declare that a stage has completed.
|
||||
* @param stage stage name/key in the map
|
||||
* @param d duration.
|
||||
*/
|
||||
private static void completedStage(final String stage,
|
||||
final DurationInfo d) {
|
||||
COMPLETED_STAGES.put(stage, d);
|
||||
}
|
||||
|
||||
/**
|
||||
* Declare a stage which is required for this test case.
|
||||
* @param stage stage name
|
||||
*/
|
||||
private static void requireStage(final String stage) {
|
||||
Assume.assumeTrue(
|
||||
"Required stage was not completed: " + stage,
|
||||
COMPLETED_STAGES.get(stage) != null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a single stage in the terasort.
|
||||
* Updates the completed stages map with the stage duration -if successful.
|
||||
* @param stage Stage name for the stages map.
|
||||
* @param jobConf job conf
|
||||
* @param dest destination directory -the _SUCCESS file will be expected here.
|
||||
* @param tool tool to run.
|
||||
* @param args args for the tool.
|
||||
* @param minimumFileCount minimum number of files to have been created
|
||||
* @throws Exception any failure
|
||||
*/
|
||||
private void executeStage(
|
||||
final String stage,
|
||||
final JobConf jobConf,
|
||||
final Path dest,
|
||||
final Tool tool,
|
||||
final String[] args,
|
||||
final int minimumFileCount) throws Exception {
|
||||
int result;
|
||||
|
||||
// the duration info is created outside a try-with-resources
|
||||
// clause as it is used later.
|
||||
DurationInfo d = new DurationInfo(LOG, stage);
|
||||
try {
|
||||
result = ToolRunner.run(jobConf, tool, args);
|
||||
} finally {
|
||||
d.close();
|
||||
}
|
||||
dumpOutputTree(dest);
|
||||
assertEquals(stage
|
||||
+ "(" + StringUtils.join(", ", args) + ")"
|
||||
+ " failed", 0, result);
|
||||
final ManifestSuccessData successFile = validateSuccessFile(getFileSystem(), dest,
|
||||
minimumFileCount, "");
|
||||
JOB_IOSTATS.aggregate(successFile.getIOStatistics());
|
||||
|
||||
completedStage(stage, d);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up terasort by cleaning out the destination, and note the initial
|
||||
* time before any of the jobs are executed.
|
||||
*
|
||||
* This is executed first <i>for each parameterized run</i>.
|
||||
* It is where all variables which need to be reset for each run need
|
||||
* to be reset.
|
||||
*/
|
||||
@Test
|
||||
public void test_100_terasort_setup() throws Throwable {
|
||||
describe("Setting up for a terasort");
|
||||
|
||||
getFileSystem().delete(terasortPath, true);
|
||||
terasortDuration = Optional.of(new DurationInfo(LOG, false, "Terasort"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_110_teragen() throws Throwable {
|
||||
describe("Teragen to %s", sortInput);
|
||||
getFileSystem().delete(sortInput, true);
|
||||
|
||||
JobConf jobConf = newJobConf();
|
||||
patchConfigurationForCommitter(jobConf);
|
||||
executeStage("teragen",
|
||||
jobConf,
|
||||
sortInput,
|
||||
new TeraGen(),
|
||||
new String[]{Integer.toString(getRowCount()), sortInput.toString()},
|
||||
1);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void test_120_terasort() throws Throwable {
|
||||
describe("Terasort from %s to %s", sortInput, sortOutput);
|
||||
requireStage("teragen");
|
||||
getFileSystem().delete(sortOutput, true);
|
||||
|
||||
loadSuccessFile(getFileSystem(), sortInput);
|
||||
JobConf jobConf = newJobConf();
|
||||
patchConfigurationForCommitter(jobConf);
|
||||
executeStage("terasort",
|
||||
jobConf,
|
||||
sortOutput,
|
||||
new TeraSort(),
|
||||
new String[]{sortInput.toString(), sortOutput.toString()},
|
||||
1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_130_teravalidate() throws Throwable {
|
||||
describe("TeraValidate from %s to %s", sortOutput, sortValidate);
|
||||
requireStage("terasort");
|
||||
getFileSystem().delete(sortValidate, true);
|
||||
loadSuccessFile(getFileSystem(), sortOutput);
|
||||
JobConf jobConf = newJobConf();
|
||||
patchConfigurationForCommitter(jobConf);
|
||||
executeStage("teravalidate",
|
||||
jobConf,
|
||||
sortValidate,
|
||||
new TeraValidate(),
|
||||
new String[]{sortOutput.toString(), sortValidate.toString()},
|
||||
1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Print the results, and save to the base dir as a CSV file.
|
||||
* Why there? Makes it easy to list and compare.
|
||||
*/
|
||||
@Test
|
||||
public void test_140_teracomplete() throws Throwable {
|
||||
terasortDuration.ifPresent(d -> {
|
||||
d.close();
|
||||
completedStage("overall", d);
|
||||
});
|
||||
|
||||
// IO Statistics
|
||||
IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, JOB_IOSTATS);
|
||||
|
||||
// and the summary
|
||||
final StringBuilder results = new StringBuilder();
|
||||
results.append("\"Operation\"\t\"Duration\"\n");
|
||||
|
||||
// this is how you dynamically create a function in a method
|
||||
// for use afterwards.
|
||||
// Works because there's no IOEs being raised in this sequence.
|
||||
Consumer<String> stage = (s) -> {
|
||||
DurationInfo duration = COMPLETED_STAGES.get(s);
|
||||
results.append(String.format("\"%s\"\t\"%s\"\n",
|
||||
s,
|
||||
duration == null ? "" : duration));
|
||||
};
|
||||
|
||||
stage.accept("teragen");
|
||||
stage.accept("terasort");
|
||||
stage.accept("teravalidate");
|
||||
stage.accept("overall");
|
||||
String text = results.toString();
|
||||
File resultsFile = File.createTempFile("results", ".csv");
|
||||
FileUtils.write(resultsFile, text, StandardCharsets.UTF_8);
|
||||
LOG.info("Results are in {}\n{}", resultsFile, text);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the duration so if two committer tests are run sequentially.
|
||||
* Without this the total execution time is reported as from the start of
|
||||
* the first test suite to the end of the second.
|
||||
*/
|
||||
@Test
|
||||
public void test_150_teracleanup() throws Throwable {
|
||||
terasortDuration = Optional.empty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_200_directory_deletion() throws Throwable {
|
||||
getFileSystem().delete(terasortPath, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Dump the files under a path -but not fail if the path is not present.,
|
||||
* @param path path to dump
|
||||
* @throws Exception any failure.
|
||||
*/
|
||||
protected void dumpOutputTree(Path path) throws Exception {
|
||||
LOG.info("Files under output directory {}", path);
|
||||
try {
|
||||
RemoteIterators.foreach(getFileSystem().listFiles(path, true),
|
||||
(status) -> LOG.info("{}", status));
|
||||
} catch (FileNotFoundException e) {
|
||||
LOG.info("Output directory {} not found", path);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Unit and integration tests for the manifest committer.
|
||||
* JSON job reports will be saved to
|
||||
* {@code target/reports}
|
||||
*/
|
||||
package org.apache.hadoop.fs.azurebfs.commit;
|
@ -34,7 +34,7 @@ public class AbfsFileSystemContract extends AbstractBondedFSContract {
|
||||
public static final String CONTRACT_XML = "abfs.xml";
|
||||
private final boolean isSecure;
|
||||
|
||||
protected AbfsFileSystemContract(final Configuration conf, boolean secure) {
|
||||
public AbfsFileSystemContract(final Configuration conf, boolean secure) {
|
||||
super(conf);
|
||||
//insert the base features
|
||||
addConfResource(CONTRACT_XML);
|
||||
|
25
hadoop-tools/hadoop-azure/src/test/resources/core-site.xml
Normal file
25
hadoop-tools/hadoop-azure/src/test/resources/core-site.xml
Normal file
@ -0,0 +1,25 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
<include xmlns="http://www.w3.org/2001/XInclude" href="azure-test.xml">
|
||||
<fallback/>
|
||||
</include>
|
||||
</configuration>
|
Loading…
Reference in New Issue
Block a user