From 98fe0d0fc31e74d1bcf6770e009e7772e980144e Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 24 Nov 2021 17:33:12 +0000 Subject: [PATCH] HADOOP-17979. Add Interface EtagSource to allow FileStatus subclasses to provide etags (#3633) Contributed by Steve Loughran --- .../hadoop/fs/CommonPathCapabilities.java | 18 ++ .../java/org/apache/hadoop/fs/EtagSource.java | 38 ++++ .../site/markdown/filesystem/filesystem.md | 91 +++++++- .../fs/contract/AbstractContractEtagTest.java | 194 ++++++++++++++++++ .../apache/hadoop/fs/s3a/S3AFileStatus.java | 11 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 12 +- .../hadoop/fs/s3a/S3ALocatedFileStatus.java | 18 +- .../hadoop/fs/s3a/impl/RenameOperation.java | 2 +- .../fs/contract/s3a/ITestS3AContractEtag.java | 34 +++ .../fs/azurebfs/AzureBlobFileSystem.java | 37 ++++ .../fs/azurebfs/AzureBlobFileSystemStore.java | 57 ++++- .../services/AbfsLocatedFileStatus.java | 73 +++++++ .../ITestAbfsFileSystemContractEtag.java | 57 +++++ 13 files changed, 629 insertions(+), 13 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EtagSource.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractEtagTest.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractEtag.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLocatedFileStatus.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractEtag.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java index df932df43a..aa231554eb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java @@ -146,4 +146,22 @@ private CommonPathCapabilities() { */ public static final String ABORTABLE_STREAM = "fs.capability.outputstream.abortable"; + + /** + * Does this FS support etags? + * That is: will FileStatus entries from listing/getFileStatus + * probes support EtagSource and return real values. + */ + public static final String ETAGS_AVAILABLE = + "fs.capability.etags.available"; + + /** + * Are etags guaranteed to be preserved across rename() operations.. + * FileSystems MUST NOT declare support for this feature + * unless this holds. + */ + public static final String ETAGS_PRESERVED_IN_RENAME = + "fs.capability.etags.preserved.in.rename"; + + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EtagSource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EtagSource.java new file mode 100644 index 0000000000..d7efdc705d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EtagSource.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +/** + * An optional interface for {@link FileStatus} subclasses to implement + * to provide access to etags. + * If available FS SHOULD also implement the matching PathCapabilities + * -- etag supported: {@link CommonPathCapabilities#ETAGS_AVAILABLE}. + * -- etag consistent over rename: + * {@link CommonPathCapabilities#ETAGS_PRESERVED_IN_RENAME}. + */ +public interface EtagSource { + + /** + * Return an etag of this file status. + * A return value of null or "" means "no etag" + * @return a possibly null or empty etag. + */ + String getEtag(); + +} diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 2eb0bc0719..0e01aa1dc8 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -1240,7 +1240,7 @@ Renaming a file where the destination is a directory moves the file as a child FS' where: not exists(FS', src) and exists(FS', dest) - and data(FS', dest) == data (FS, dest) + and data(FS', dest) == data (FS, source) result = True @@ -1698,3 +1698,92 @@ in:readahead | READAHEAD | CanSetReadahead | Set the readahead on the input st dropbehind | DROPBEHIND | CanSetDropBehind | Drop the cache. in:unbuffer | UNBUFFER | CanUnbuffer | Reduce the buffering on the input stream. +## Etag probes through the interface `EtagSource` + +FileSystem implementations MAY support querying HTTP etags from `FileStatus` +entries. If so, the requirements are as follows + +### Etag support MUST BE across all list/`getFileStatus()` calls. + +That is: when adding etag support, all operations which return `FileStatus` or `ListLocatedStatus` +entries MUST return subclasses which are instances of `EtagSource`. + +### FileStatus instances MUST have etags whenever the remote store provides them. + +To support etags, they MUST BE to be provided in both `getFileStatus()` +and list calls. + +Implementors note: the core APIs which MUST BE overridden to achieve this are as follows: + +```java +FileStatus getFileStatus(Path) +FileStatus[] listStatus(Path) +RemoteIterator listStatusIterator(Path) +RemoteIterator listFiles([Path, boolean) +``` + + +### Etags of files MUST BE Consistent across all list/getFileStatus operations. + +The value of `EtagSource.getEtag()` MUST be the same for list* queries which return etags for calls of `getFileStatus()` for the specific object. + +```java +((EtagSource)getFileStatus(path)).getEtag() == ((EtagSource)listStatus(path)[0]).getEtag() +``` + +Similarly, the same value MUST BE returned for `listFiles()`, `listStatusIncremental()` of the path and +when listing the parent path, of all files in the listing. + +### Etags MUST BE different for different file contents. + +Two different arrays of data written to the same path MUST have different etag values when probed. +This is a requirement of the HTTP specification. + +### Etags of files SHOULD BE preserved across rename operations + +After a file is renamed, the value of `((EtagSource)getFileStatus(dest)).getEtag()` +SHOULD be the same as the value of `((EtagSource)getFileStatus(source)).getEtag()` +was before the rename took place. + +This is an implementation detail of the store; it does not hold for AWS S3. + +If and only if the store consistently meets this requirement, the filesystem SHOULD +declare in `hasPathCapability()` that it supports +`fs.capability.etags.preserved.in.rename` + +### Directories MAY have etags + +Directory entries MAY return etags in listing/probe operations; these entries MAY be preserved across renames. + +Equally, directory entries MAY NOT provide such entries, MAY NOT preserve them acrosss renames, +and MAY NOT guarantee consistency over time. + +Note: special mention of the root path "/". +As that isn't a real "directory", nobody should expect it to have an etag. + +### All etag-aware `FileStatus` subclass MUST BE `Serializable`; MAY BE `Writable` + +The base `FileStatus` class implements `Serializable` and `Writable` and marshalls its fields appropriately. + +Subclasses MUST support java serialization (Some Apache Spark applications use it), preserving the etag. +This is a matter of making the etag field non-static and adding a `serialVersionUID`. + +The `Writable` support was used for marshalling status data over Hadoop IPC calls; +in Hadoop 3 that is implemented through `org/apache/hadoop/fs/protocolPB/PBHelper.java`and the methods deprecated. +Subclasses MAY override the deprecated methods to add etag marshalling. +However -but there is no expectation of this and such marshalling is unlikely to ever take place. + +### Appropriate etag Path Capabilities SHOULD BE declared + +1. `hasPathCapability(path, "fs.capability.etags.available")` MUST return true iff + the filesystem returns valid (non-empty etags) on file status/listing operations. +2. `hasPathCapability(path, "fs.capability.etags.consistent.across.rename")` MUST return + true if and only if etags are preserved across renames. + +### Non-requirements of etag support + +* There is no requirement/expectation that `FileSystem.getFileChecksum(Path)` returns + a checksum value related to the etag of an object, if any value is returned. +* If the same data is uploaded to the twice to the same or a different path, + the etag of the second upload MAY NOT match that of the first upload. + diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractEtagTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractEtagTest.java new file mode 100644 index 0000000000..e7a121b704 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractEtagTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract; + +import java.nio.charset.StandardCharsets; + +import org.assertj.core.api.Assertions; +import org.junit.Assume; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.EtagSource; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE; +import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME; + +/** + * For filesystems which support etags, validate correctness + * of their implementation. + */ +public abstract class AbstractContractEtagTest extends + AbstractFSContractTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractContractEtagTest.class); + + /** + * basic consistency across operations, as well as being non-empty. + */ + @Test + public void testEtagConsistencyAcrossListAndHead() throws Throwable { + describe("Etag values must be non-empty and consistent across LIST and HEAD Calls."); + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + + Assertions.assertThat(fs.hasPathCapability(path, ETAGS_AVAILABLE)) + .describedAs("path capability %s of %s", + ETAGS_AVAILABLE, path) + .isTrue(); + + ContractTestUtils.touch(fs, path); + + final FileStatus st = fs.getFileStatus(path); + final String etag = etagFromStatus(st); + LOG.info("etag of empty file is \"{}\"", etag); + + final FileStatus[] statuses = fs.listStatus(path); + Assertions.assertThat(statuses) + .describedAs("List(%s)", path) + .hasSize(1); + final FileStatus lsStatus = statuses[0]; + Assertions.assertThat(etagFromStatus(lsStatus)) + .describedAs("etag of list status (%s) compared to HEAD value of %s", lsStatus, st) + .isEqualTo(etag); + } + + /** + * Get an etag from a FileStatus which MUST BE + * an implementation of EtagSource and + * whose etag MUST NOT BE null/empty. + * @param st the status + * @return the etag + */ + String etagFromStatus(FileStatus st) { + Assertions.assertThat(st) + .describedAs("FileStatus %s", st) + .isInstanceOf(EtagSource.class); + final String etag = ((EtagSource) st).getEtag(); + Assertions.assertThat(etag) + .describedAs("Etag of %s", st) + .isNotBlank(); + return etag; + } + + /** + * Overwritten data has different etags. + */ + @Test + public void testEtagsOfDifferentDataDifferent() throws Throwable { + describe("Verify that two different blocks of data written have different tags"); + + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + Path src = new Path(path, "src"); + + ContractTestUtils.createFile(fs, src, true, + "data1234".getBytes(StandardCharsets.UTF_8)); + final FileStatus srcStatus = fs.getFileStatus(src); + final String srcTag = etagFromStatus(srcStatus); + LOG.info("etag of file 1 is \"{}\"", srcTag); + + // now overwrite with data of same length + // (ensure that path or length aren't used exclusively as tag) + ContractTestUtils.createFile(fs, src, true, + "1234data".getBytes(StandardCharsets.UTF_8)); + + // validate + final String tag2 = etagFromStatus(fs.getFileStatus(src)); + LOG.info("etag of file 2 is \"{}\"", tag2); + + Assertions.assertThat(tag2) + .describedAs("etag of updated file") + .isNotEqualTo(srcTag); + } + + /** + * If supported, rename preserves etags. + */ + @Test + public void testEtagConsistencyAcrossRename() throws Throwable { + describe("Verify that when a file is renamed, the etag remains unchanged"); + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + Assume.assumeTrue( + "Filesystem does not declare that etags are preserved across renames", + fs.hasPathCapability(path, ETAGS_PRESERVED_IN_RENAME)); + Path src = new Path(path, "src"); + Path dest = new Path(path, "dest"); + + ContractTestUtils.createFile(fs, src, true, + "sample data".getBytes(StandardCharsets.UTF_8)); + final FileStatus srcStatus = fs.getFileStatus(src); + LOG.info("located file status string value " + srcStatus); + + final String srcTag = etagFromStatus(srcStatus); + LOG.info("etag of short file is \"{}\"", srcTag); + + Assertions.assertThat(srcTag) + .describedAs("Etag of %s", srcStatus) + .isNotBlank(); + + // rename + fs.rename(src, dest); + + // validate + FileStatus destStatus = fs.getFileStatus(dest); + final String destTag = etagFromStatus(destStatus); + Assertions.assertThat(destTag) + .describedAs("etag of list status (%s) compared to HEAD value of %s", + destStatus, srcStatus) + .isEqualTo(srcTag); + } + + /** + * For effective use of etags, listLocatedStatus SHOULD return status entries + * with consistent values. + * This ensures that listing during query planning can collect and use the etags. + */ + @Test + public void testLocatedStatusAlsoHasEtag() throws Throwable { + describe("verify that listLocatedStatus() and listFiles() are etag sources"); + final Path path = methodPath(); + final FileSystem fs = getFileSystem(); + Path src = new Path(path, "src"); + ContractTestUtils.createFile(fs, src, true, + "sample data".getBytes(StandardCharsets.UTF_8)); + final FileStatus srcStatus = fs.getFileStatus(src); + final String srcTag = etagFromStatus(srcStatus); + final LocatedFileStatus entry = fs.listLocatedStatus(path).next(); + LOG.info("located file status string value " + entry); + final String listTag = etagFromStatus(entry); + Assertions.assertThat(listTag) + .describedAs("etag of listLocatedStatus (%s) compared to HEAD value of %s", + entry, srcStatus) + .isEqualTo(srcTag); + + final LocatedFileStatus entry2 = fs.listFiles(path, false).next(); + Assertions.assertThat(etagFromStatus(entry2)) + .describedAs("etag of listFiles (%s) compared to HEAD value of %s", + entry, srcStatus) + .isEqualTo(srcTag); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java index bc6df7aed8..eda9177cf6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java @@ -19,6 +19,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.EtagSource; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -30,7 +31,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class S3AFileStatus extends FileStatus { +public class S3AFileStatus extends FileStatus implements EtagSource { private static final long serialVersionUID = -5955674081978903922L; @@ -166,8 +167,16 @@ public void setIsEmptyDirectory(Tristate isEmptyDirectory) { /** * @return the S3 object eTag when available, else null. + * @deprecated use {@link EtagSource#getEtag()} for + * public access. */ + @Deprecated public String getETag() { + return getEtag(); + } + + @Override + public String getEtag() { return eTag; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 0914d6b144..115abe302f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1617,7 +1617,7 @@ private S3ObjectAttributes createObjectAttributes( final S3AFileStatus fileStatus) { return createObjectAttributes( fileStatus.getPath(), - fileStatus.getETag(), + fileStatus.getEtag(), fileStatus.getVersionId(), fileStatus.getLen()); } @@ -5242,7 +5242,13 @@ public boolean hasPathCapability(final Path path, final String capability) case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE: return true; - /* + // etags are avaialable in listings, but they + // are not consistent across renames. + // therefore, only availability is declared + case CommonPathCapabilities.ETAGS_AVAILABLE: + return true; + + /* * Marker policy capabilities are handed off. */ case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP: @@ -5329,7 +5335,7 @@ private FSDataInputStream select(final Path source, changeDetectionPolicy, ra, auditSpan); if (changeDetectionPolicy.getSource() != ChangeDetectionPolicy.Source.None - && fileStatus.getETag() != null) { + && fileStatus.getEtag() != null) { // if there is change detection, and the status includes at least an // etag, // check that the object metadata lines up with what is expected diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ALocatedFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ALocatedFileStatus.java index a5a56f530d..d29afb2e34 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ALocatedFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ALocatedFileStatus.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.EtagSource; import org.apache.hadoop.fs.LocatedFileStatus; import static org.apache.hadoop.util.Preconditions.checkNotNull; @@ -26,7 +27,7 @@ /** * {@link LocatedFileStatus} extended to also carry ETag and object version ID. */ -public class S3ALocatedFileStatus extends LocatedFileStatus { +public class S3ALocatedFileStatus extends LocatedFileStatus implements EtagSource { private static final long serialVersionUID = 3597192103662929338L; @@ -37,12 +38,23 @@ public class S3ALocatedFileStatus extends LocatedFileStatus { public S3ALocatedFileStatus(S3AFileStatus status, BlockLocation[] locations) { super(checkNotNull(status), locations); - this.eTag = status.getETag(); + this.eTag = status.getEtag(); this.versionId = status.getVersionId(); isEmptyDirectory = status.isEmptyDirectory(); } + /** + * @return the S3 object eTag when available, else null. + * @deprecated use {@link EtagSource#getEtag()} for + * public access. + */ + @Deprecated public String getETag() { + return getEtag(); + } + + @Override + public String getEtag() { return eTag; } @@ -77,7 +89,7 @@ public S3AFileStatus toS3AFileStatus() { getModificationTime(), getBlockSize(), getOwner(), - getETag(), + getEtag(), getVersionId()); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java index 540d092142..74f7533aa5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java @@ -593,7 +593,7 @@ protected CompletableFuture initiateCopy( S3ObjectAttributes sourceAttributes = callbacks.createObjectAttributes( source.getPath(), - source.getETag(), + source.getEtag(), source.getVersionId(), source.getLen()); // queue the copy operation for execution in the thread pool diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractEtag.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractEtag.java new file mode 100644 index 0000000000..824b2dab1f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractEtag.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractEtagTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Test S3A etag support. + */ +public class ITestS3AContractEtag extends AbstractContractEtagTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 9cb77230fe..4786614014 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -63,8 +63,10 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations; @@ -79,6 +81,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; +import org.apache.hadoop.fs.azurebfs.services.AbfsLocatedFileStatus; import org.apache.hadoop.fs.azurebfs.utils.Listener; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; @@ -109,6 +112,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel; +import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator; +import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator; /** * A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on listStatusIterator(Path path) } } + /** + * Incremental listing of located status entries, + * preserving etags. + * @param path path to list + * @param filter a path filter + * @return iterator of results. + * @throws FileNotFoundException source path not found. + * @throws IOException other values. + */ + @Override + protected RemoteIterator listLocatedStatus( + final Path path, + final PathFilter filter) + throws FileNotFoundException, IOException { + + LOG.debug("AzureBlobFileSystem.listStatusIterator path : {}", path); + // get a paged iterator over the source data, filtering out non-matching + // entries. + final RemoteIterator sourceEntries = filteringRemoteIterator( + listStatusIterator(path), + (st) -> filter.accept(st.getPath())); + // and then map that to a remote iterator of located file status + // entries, propagating any etags. + return mappingRemoteIterator(sourceEntries, + st -> new AbfsLocatedFileStatus(st, + st.isFile() + ? getFileBlockLocations(st, 0, st.getLen()) + : null)); + } + private FileStatus tryGetFileStatus(final Path f, TracingContext tracingContext) { try { return getFileStatus(f, tracingContext); @@ -1498,6 +1533,8 @@ public boolean hasPathCapability(final Path path, final String capability) switch (validatePathCapabilityArgs(p, capability)) { case CommonPathCapabilities.FS_PERMISSIONS: case CommonPathCapabilities.FS_APPEND: + case CommonPathCapabilities.ETAGS_AVAILABLE: + case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME: return true; case CommonPathCapabilities.FS_ACLS: return getIsNamespaceEnabled( diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 62c675a60c..f4f8959964 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -65,6 +65,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.EtagSource; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -975,7 +976,7 @@ public FileStatus getFileStatus(final Path path, final long blockSize = abfsConfiguration.getAzureBlockSize(); final AbfsHttpOperation result = op.getResult(); - final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG); + String eTag = extractEtagHeader(result); final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS)); final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions); @@ -1733,10 +1734,27 @@ private AbfsPerfInfo startTracking(String callerName, String calleeName) { return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName); } - private static class VersionedFileStatus extends FileStatus { - private final String version; + /** + * A File status with version info extracted from the etag value returned + * in a LIST or HEAD request. + * The etag is included in the java serialization. + */ + private static final class VersionedFileStatus extends FileStatus + implements EtagSource { - VersionedFileStatus( + /** + * The superclass is declared serializable; this subclass can also + * be serialized. + */ + private static final long serialVersionUID = -2009013240419749458L; + + /** + * The etag of an object. + * Not-final so that serialization via reflection will preserve the value. + */ + private String version; + + private VersionedFileStatus( final String owner, final String group, final FsPermission fsPermission, final boolean hasAcl, final long length, final boolean isdir, final int blockReplication, final long blocksize, final long modificationTime, final Path path, @@ -1797,6 +1815,11 @@ public String getVersion() { return this.version; } + @Override + public String getEtag() { + return getVersion(); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder( @@ -1902,4 +1925,30 @@ boolean areLeasesFreed() { } return true; } + + /** + * Get the etag header from a response, stripping any quotations. + * see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag + * @param result response to process. + * @return the quote-unwrapped etag. + */ + private static String extractEtagHeader(AbfsHttpOperation result) { + String etag = result.getResponseHeader(HttpHeaderConfigurations.ETAG); + if (etag != null) { + // strip out any wrapper "" quotes which come back, for consistency with + // list calls + if (etag.startsWith("W/\"")) { + // Weak etag + etag = etag.substring(3); + } else if (etag.startsWith("\"")) { + // strong etag + etag = etag.substring(1); + } + if (etag.endsWith("\"")) { + // trailing quote + etag = etag.substring(0, etag.length() - 1); + } + } + return etag; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLocatedFileStatus.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLocatedFileStatus.java new file mode 100644 index 0000000000..29da2c5043 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLocatedFileStatus.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.EtagSource; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; + +import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link LocatedFileStatus} extended to also carry an ETag. + */ +public class AbfsLocatedFileStatus extends LocatedFileStatus implements EtagSource { + + private static final long serialVersionUID = -8185960773314341594L; + + /** + * etag; may be null. + */ + private final String etag; + + public AbfsLocatedFileStatus(FileStatus status, BlockLocation[] locations) { + super(checkNotNull(status), locations); + if (status instanceof EtagSource) { + this.etag = ((EtagSource) status).getEtag(); + } else { + this.etag = null; + } + } + + @Override + public String getEtag() { + return etag; + } + + @Override + public String toString() { + return "AbfsLocatedFileStatus{" + + "etag='" + etag + '\'' + "} " + + super.toString(); + } + // equals() and hashCode() overridden to avoid FindBugs warning. + // Base implementation is equality on Path only, which is still appropriate. + + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractEtag.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractEtag.java new file mode 100644 index 0000000000..d498ae71a4 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractEtag.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractEtagTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Contract test for etag support. + */ +public class ITestAbfsFileSystemContractEtag extends AbstractContractEtagTest { + private final boolean isSecure; + private final ABFSContractTestBinding binding; + + public ITestAbfsFileSystemContractEtag() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + // Base rename contract test class re-uses the test folder + // This leads to failures when the test is re-run as same ABFS test + // containers are re-used for test run and creation of source and + // destination test paths fail, as they are already present. + binding.getFileSystem().delete(binding.getTestPath(), true); + } + + @Override + protected Configuration createConfiguration() { + return binding.getRawConfiguration(); + } + + @Override + protected AbstractFSContract createContract(final Configuration conf) { + return new AbfsFileSystemContract(conf, isSecure); + } +}