From 6ce5f8734f1864a2d628b23479cf3f6621b2fcb4 Mon Sep 17 00:00:00 2001 From: bilaharith <52483117+bilaharith@users.noreply.github.com> Date: Wed, 18 Mar 2020 19:44:18 +0530 Subject: [PATCH] HADOOP-16920 ABFS: Make list page size configurable. Contributed by Bilahari T H. The page limit is set in "fs.azure.list.max.results"; default value is 500. There's currently a limit of 5000 in the store -there are no range checks in the client code so that limit can be changed on the server without any need to update the abfs connector. --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 14 +++ .../fs/azurebfs/AzureBlobFileSystemStore.java | 4 +- .../azurebfs/constants/ConfigurationKeys.java | 1 + .../constants/FileSystemConfigurations.java | 1 + .../hadoop/fs/azurebfs/ITestAbfsClient.java | 90 +++++++++++++++++++ 5 files changed, 108 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 779f524325..61fe3d8d6d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -125,6 +125,11 @@ public class AbfsConfiguration{ DefaultValue = MAX_CONCURRENT_WRITE_THREADS) private int maxConcurrentWriteThreads; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_LIST_MAX_RESULTS, + MinValue = 1, + DefaultValue = DEFAULT_AZURE_LIST_MAX_RESULTS) + private int listMaxResults; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_IN, MinValue = 1, DefaultValue = MAX_CONCURRENT_READ_THREADS) @@ -432,6 +437,10 @@ public int getMaxConcurrentReadThreads() { return this.maxConcurrentReadThreads; } + public int getListMaxResults() { + return this.listMaxResults; + } + public boolean getTolerateOobAppends() { return this.tolerateOobAppends; } @@ -702,6 +711,11 @@ void setDisableOutputStreamFlush(boolean disableOutputStreamFlush) { this.disableOutputStreamFlush = disableOutputStreamFlush; } + @VisibleForTesting + void setListMaxResults(int listMaxResults) { + this.listMaxResults = listMaxResults; + } + private String getTrimmedPasswordString(String key, String defaultValue) throws IOException { String value = getPasswordString(key); if (StringUtils.isBlank(value)) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index bbf3608374..bff0e455cf 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -126,7 +126,6 @@ public class AzureBlobFileSystemStore implements Closeable { private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss z"; private static final String TOKEN_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'"; private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1"; - private static final int LIST_MAX_RESULTS = 500; private static final int GET_SET_AGGREGATE_COUNT = 2; private final AbfsConfiguration abfsConfiguration; @@ -682,7 +681,8 @@ public FileStatus[] listStatus(final Path path, final String startFrom) throws I ArrayList fileStatuses = new ArrayList<>(); do { try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) { - AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation); + AbfsRestOperation op = client.listPath(relativePath, false, + abfsConfiguration.getListMaxResults(), continuation); perfInfo.registerResult(op.getResult()); continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); ListResultSchema retrievedSchema = op.getResult().getListResultSchema(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 3b0111e960..a63e953534 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -45,6 +45,7 @@ public final class ConfigurationKeys { public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out"; public static final String AZURE_CONCURRENT_CONNECTION_VALUE_IN = "fs.azure.concurrentRequestCount.in"; public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append"; + public static final String AZURE_LIST_MAX_RESULTS = "fs.azure.list.max.results"; public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization"; public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization"; public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index c29ee908b3..c6b308ed5f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -46,6 +46,7 @@ public final class FileSystemConfigurations { public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost"; + public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 500; public static final int MAX_CONCURRENT_READ_THREADS = 12; public static final int MAX_CONCURRENT_WRITE_THREADS = 8; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java index 3d6869d948..a4d6458990 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java @@ -18,13 +18,23 @@ package org.apache.hadoop.fs.azurebfs; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; @@ -38,6 +48,7 @@ */ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest { private static final int LIST_MAX_RESULTS = 500; + private static final int LIST_MAX_RESULTS_SERVER = 5000; public ITestAbfsClient() throws Exception { super(); @@ -75,4 +86,83 @@ public void testUnknownHost() throws Exception { "UnknownHostException: " + fakeAccountName, () -> FileSystem.get(conf.getRawConfiguration())); } + + @Test + public void testListPathWithValidListMaxResultsValues() + throws IOException, ExecutionException, InterruptedException { + final int fileCount = 10; + final String directory = "testWithValidListMaxResultsValues"; + createDirectoryWithNFiles(directory, fileCount); + final int[] testData = {fileCount + 100, fileCount + 1, fileCount, + fileCount - 1, 1}; + for (int i = 0; i < testData.length; i++) { + int listMaxResults = testData[i]; + setListMaxResults(listMaxResults); + int expectedListResultsSize = + listMaxResults > fileCount ? fileCount : listMaxResults; + Assertions.assertThat(listPath(directory)).describedAs( + "AbfsClient.listPath result should contain %d items when " + + "listMaxResults is %d and directory contains %d items", + expectedListResultsSize, listMaxResults, fileCount) + .hasSize(expectedListResultsSize); + } + } + + @Test + public void testListPathWithValueGreaterThanServerMaximum() + throws IOException, ExecutionException, InterruptedException { + setListMaxResults(LIST_MAX_RESULTS_SERVER + 100); + final String directory = "testWithValueGreaterThanServerMaximum"; + createDirectoryWithNFiles(directory, LIST_MAX_RESULTS_SERVER + 200); + Assertions.assertThat(listPath(directory)).describedAs( + "AbfsClient.listPath result will contain a maximum of %d items " + + "even if listMaxResults >= %d or directory " + + "contains more than %d items", LIST_MAX_RESULTS_SERVER, + LIST_MAX_RESULTS_SERVER, LIST_MAX_RESULTS_SERVER) + .hasSize(LIST_MAX_RESULTS_SERVER); + } + + @Test + public void testListPathWithInvalidListMaxResultsValues() throws Exception { + for (int i = -1; i < 1; i++) { + setListMaxResults(i); + intercept(AbfsRestOperationException.class, "Operation failed: \"One of " + + "the query parameters specified in the request URI is outside" + " " + + "the permissible range.", () -> listPath("directory")); + } + } + + private List listPath(String directory) + throws IOException { + return getFileSystem().getAbfsClient() + .listPath(directory, false, getListMaxResults(), null).getResult() + .getListResultSchema().paths(); + } + + private int getListMaxResults() throws IOException { + return getFileSystem().getAbfsStore().getAbfsConfiguration() + .getListMaxResults(); + } + + private void setListMaxResults(int listMaxResults) throws IOException { + getFileSystem().getAbfsStore().getAbfsConfiguration() + .setListMaxResults(listMaxResults); + } + + private void createDirectoryWithNFiles(String directory, int n) + throws ExecutionException, InterruptedException { + final List> tasks = new ArrayList<>(); + ExecutorService es = Executors.newFixedThreadPool(10); + for (int i = 0; i < n; i++) { + final Path fileName = new Path("/" + directory + "/test" + i); + tasks.add(es.submit(() -> { + touch(fileName); + return null; + })); + } + for (Future task : tasks) { + task.get(); + } + es.shutdownNow(); + } }