HADOOP-18577. ABFS: Add probes of readahead fix (#5205)
Followup patch to HADOOP-18456 as part of HADOOP-18521, ABFS ReadBufferManager buffer sharing across concurrent HTTP requests Add probes of readahead fix aid in checking safety of hadoop ABFS client across different releases. * ReadBufferManager constructor logs the fact it is safe at TRACE * AbfsInputStream declares it is fixed in toString() by including fs.azure.capability.readahead.safe" in the result. The ABFS FileSystem hasPathCapability("fs.azure.capability.readahead.safe") probe returns true to indicate the client's readahead manager has been fixed to be safe when prefetching. All Hadoop releases for which probe this returns false and for which the probe "fs.capability.etags.available" returns true at risk of returning invalid data when reading ADLS Gen2/Azure storage data. Contributed by Steve Loughran.
This commit is contained in:
parent
65892a7759
commit
daa33aafff
@ -117,6 +117,7 @@
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
|
||||
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
|
||||
@ -235,6 +236,7 @@ public String toString() {
|
||||
sb.append("uri=").append(uri);
|
||||
sb.append(", user='").append(abfsStore.getUser()).append('\'');
|
||||
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
|
||||
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
@ -1636,6 +1638,11 @@ public boolean hasPathCapability(final Path path, final String capability)
|
||||
new TracingContext(clientCorrelationId, fileSystemId,
|
||||
FSOperationType.HAS_PATH_CAPABILITY, tracingHeaderFormat,
|
||||
listener));
|
||||
|
||||
// probe for presence of the HADOOP-18546 readahead fix.
|
||||
case CAPABILITY_SAFE_READAHEAD:
|
||||
return true;
|
||||
|
||||
default:
|
||||
return super.hasPathCapability(p, capability);
|
||||
}
|
||||
|
@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.constants;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Constants which are used internally and which don't fit into the other
|
||||
* classes.
|
||||
* For use within the {@code hadoop-azure} module only.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class InternalConstants {
|
||||
|
||||
private InternalConstants() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Does this version of the store have safe readahead?
|
||||
* Possible combinations of this and the probe
|
||||
* {@code "fs.capability.etags.available"}.
|
||||
* <ol>
|
||||
* <li>{@value}: store is safe</li>
|
||||
* <li>!etags: store is safe</li>
|
||||
* <li>etags && !{@value}: store is <i>UNSAFE</i></li>
|
||||
* </ol>
|
||||
*/
|
||||
public static final String CAPABILITY_SAFE_READAHEAD =
|
||||
"fs.azure.capability.readahead.safe";
|
||||
}
|
@ -50,6 +50,7 @@
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
|
||||
import static org.apache.hadoop.util.StringUtils.toLowerCase;
|
||||
|
||||
/**
|
||||
@ -828,11 +829,12 @@ public IOStatistics getIOStatistics() {
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(super.toString());
|
||||
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
|
||||
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
|
||||
if (streamStatistics != null) {
|
||||
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
|
||||
sb.append(streamStatistics.toString());
|
||||
sb.append("}");
|
||||
sb.append(", ").append(streamStatistics);
|
||||
}
|
||||
sb.append("}");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
@ -101,6 +101,7 @@ private void init() {
|
||||
|
||||
// hide instance constructor
|
||||
private ReadBufferManager() {
|
||||
LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
|
||||
}
|
||||
|
||||
|
||||
|
@ -20,14 +20,22 @@
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE;
|
||||
import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
|
||||
import static org.apache.hadoop.fs.CommonPathCapabilities.FS_ACLS;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
/**
|
||||
* Test AzureBlobFileSystem initialization.
|
||||
*/
|
||||
@ -74,4 +82,28 @@ public void ensureSecureAzureBlobFileSystemIsInitialized() throws Exception {
|
||||
assertNotNull("working directory", fs.getWorkingDirectory());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFileSystemCapabilities() throws Throwable {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
final Path p = new Path("}");
|
||||
// etags always present
|
||||
Assertions.assertThat(fs.hasPathCapability(p, ETAGS_AVAILABLE))
|
||||
.describedAs("path capability %s in %s", ETAGS_AVAILABLE, fs)
|
||||
.isTrue();
|
||||
// readahead always correct
|
||||
Assertions.assertThat(fs.hasPathCapability(p, CAPABILITY_SAFE_READAHEAD))
|
||||
.describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
|
||||
.isTrue();
|
||||
|
||||
// etags-over-rename and ACLs are either both true or both false.
|
||||
final boolean etagsAcrossRename = fs.hasPathCapability(p, ETAGS_PRESERVED_IN_RENAME);
|
||||
final boolean acls = fs.hasPathCapability(p, FS_ACLS);
|
||||
Assertions.assertThat(etagsAcrossRename)
|
||||
.describedAs("capabilities %s=%s and %s=%s in %s",
|
||||
ETAGS_PRESERVED_IN_RENAME, etagsAcrossRename,
|
||||
FS_ACLS, acls, fs)
|
||||
.isEqualTo(acls);
|
||||
}
|
||||
}
|
||||
|
@ -44,9 +44,24 @@
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
|
||||
|
||||
public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
|
||||
|
||||
/**
|
||||
* Time before the JUnit test times out for eventually() clauses
|
||||
* to fail. This copes with slow network connections and debugging
|
||||
* sessions, yet still allows for tests to fail with meaningful
|
||||
* messages.
|
||||
*/
|
||||
public static final int TIMEOUT_OFFSET = 5 * 60_000;
|
||||
|
||||
/**
|
||||
* Interval between eventually preobes.
|
||||
*/
|
||||
public static final int PROBE_INTERVAL_MILLIS = 1_000;
|
||||
|
||||
public ITestReadBufferManager() throws Exception {
|
||||
}
|
||||
|
||||
@ -61,6 +76,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
|
||||
}
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(4);
|
||||
AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
|
||||
// verify that the fs has the capability to validate the fix
|
||||
Assertions.assertThat(fs.hasPathCapability(new Path("/"), CAPABILITY_SAFE_READAHEAD))
|
||||
.describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
|
||||
.isTrue();
|
||||
|
||||
try {
|
||||
for (int i = 0; i < 4; i++) {
|
||||
final String fileName = methodName.getMethodName() + i;
|
||||
@ -80,9 +100,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
|
||||
}
|
||||
|
||||
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
|
||||
// verify there is no work in progress or the readahead queue.
|
||||
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
|
||||
// readahead queue is empty
|
||||
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
|
||||
// verify the in progress list eventually empties out.
|
||||
eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () ->
|
||||
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()));
|
||||
}
|
||||
|
||||
private void assertListEmpty(String listName, List<ReadBuffer> list) {
|
||||
|
Loading…
Reference in New Issue
Block a user