diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 21501d28f4..5534b5fb44 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -117,6 +117,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
+import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
@@ -235,6 +236,7 @@ public String toString() {
sb.append("uri=").append(uri);
sb.append(", user='").append(abfsStore.getUser()).append('\'');
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
+ sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
sb.append('}');
return sb.toString();
}
@@ -1636,6 +1638,11 @@ public boolean hasPathCapability(final Path path, final String capability)
new TracingContext(clientCorrelationId, fileSystemId,
FSOperationType.HAS_PATH_CAPABILITY, tracingHeaderFormat,
listener));
+
+ // probe for presence of the HADOOP-18546 readahead fix.
+ case CAPABILITY_SAFE_READAHEAD:
+ return true;
+
default:
return super.hasPathCapability(p, capability);
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/InternalConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/InternalConstants.java
new file mode 100644
index 0000000000..12d4f14d92
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/InternalConstants.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.constants;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Constants which are used internally and which don't fit into the other
+ * classes.
+ * For use within the {@code hadoop-azure} module only.
+ */
+@InterfaceAudience.Private
+public final class InternalConstants {
+
+ private InternalConstants() {
+ }
+
+ /**
+ * Does this version of the store have safe readahead?
+ * Possible combinations of this and the probe
+ * {@code "fs.capability.etags.available"}.
+ *
+ * - {@value}: store is safe
+ * - !etags: store is safe
+ * - etags && !{@value}: store is UNSAFE
+ *
+ */
+ public static final String CAPABILITY_SAFE_READAHEAD =
+ "fs.azure.capability.readahead.safe";
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index e7ddffe99f..14188535b8 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -50,6 +50,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
+import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.util.StringUtils.toLowerCase;
/**
@@ -828,11 +829,12 @@ public IOStatistics getIOStatistics() {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
+ sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
+ sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
if (streamStatistics != null) {
- sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
- sb.append(streamStatistics.toString());
- sb.append("}");
+ sb.append(", ").append(streamStatistics);
}
+ sb.append("}");
return sb.toString();
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 62d050d0fc..0f91afe098 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -101,6 +101,7 @@ private void init() {
// hide instance constructor
private ReadBufferManager() {
+ LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java
index 8b60dd801c..f7d4a5b7a8 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java
@@ -20,14 +20,22 @@
import java.net.URI;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
+import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE;
+import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
+import static org.apache.hadoop.fs.CommonPathCapabilities.FS_ACLS;
+import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
+import static org.junit.Assume.assumeTrue;
+
/**
* Test AzureBlobFileSystem initialization.
*/
@@ -74,4 +82,28 @@ public void ensureSecureAzureBlobFileSystemIsInitialized() throws Exception {
assertNotNull("working directory", fs.getWorkingDirectory());
}
}
+
+ @Test
+ public void testFileSystemCapabilities() throws Throwable {
+ final AzureBlobFileSystem fs = getFileSystem();
+
+ final Path p = new Path("}");
+ // etags always present
+ Assertions.assertThat(fs.hasPathCapability(p, ETAGS_AVAILABLE))
+ .describedAs("path capability %s in %s", ETAGS_AVAILABLE, fs)
+ .isTrue();
+ // readahead always correct
+ Assertions.assertThat(fs.hasPathCapability(p, CAPABILITY_SAFE_READAHEAD))
+ .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
+ .isTrue();
+
+ // etags-over-rename and ACLs are either both true or both false.
+ final boolean etagsAcrossRename = fs.hasPathCapability(p, ETAGS_PRESERVED_IN_RENAME);
+ final boolean acls = fs.hasPathCapability(p, FS_ACLS);
+ Assertions.assertThat(etagsAcrossRename)
+ .describedAs("capabilities %s=%s and %s=%s in %s",
+ ETAGS_PRESERVED_IN_RENAME, etagsAcrossRename,
+ FS_ACLS, acls, fs)
+ .isEqualTo(acls);
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
index eca670fba9..a57430fa80 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -44,9 +44,24 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
+import static org.apache.hadoop.test.LambdaTestUtils.eventually;
public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
+ /**
+ * Time before the JUnit test times out for eventually() clauses
+ * to fail. This copes with slow network connections and debugging
+ * sessions, yet still allows for tests to fail with meaningful
+ * messages.
+ */
+ public static final int TIMEOUT_OFFSET = 5 * 60_000;
+
+ /**
+ * Interval between eventually preobes.
+ */
+ public static final int PROBE_INTERVAL_MILLIS = 1_000;
+
public ITestReadBufferManager() throws Exception {
}
@@ -61,6 +76,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
}
ExecutorService executorService = Executors.newFixedThreadPool(4);
AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+ // verify that the fs has the capability to validate the fix
+ Assertions.assertThat(fs.hasPathCapability(new Path("/"), CAPABILITY_SAFE_READAHEAD))
+ .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
+ .isTrue();
+
try {
for (int i = 0; i < 4; i++) {
final String fileName = methodName.getMethodName() + i;
@@ -80,9 +100,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
- // verify there is no work in progress or the readahead queue.
- assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
+ // readahead queue is empty
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
+ // verify the in progress list eventually empties out.
+ eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () ->
+ assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()));
}
private void assertListEmpty(String listName, List list) {