diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml index 7087d786a3..b750b8b91c 100644 --- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml @@ -74,4 +74,13 @@ + + + + + + + + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 5a70323395..193be48029 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -275,6 +275,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS) private long sasTokenRenewPeriodForStreamsInSeconds; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR) + private boolean enableAbfsListIterator; + public AbfsConfiguration(final Configuration rawConfig, String accountName) throws IllegalAccessException, InvalidConfigurationValueException, IOException { this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders( @@ -896,6 +900,10 @@ public int getMaxWriteRequestsToQueue() { return this.maxWriteRequestsToQueue; } + public boolean enableAbfsListIterator() { + return this.enableAbfsListIterator; + } + @VisibleForTesting void setReadBufferSize(int bufferSize) { this.readBufferSize = bufferSize; @@ -961,4 +969,9 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) { this.optimizeFooterRead = optimizeFooterRead; } + @VisibleForTesting + public void setEnableAbfsListIterator(boolean enableAbfsListIterator) { + this.enableAbfsListIterator = enableAbfsListIterator; + } + } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 4d285534b0..ead8566b4c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -46,6 +46,8 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept; +import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -79,6 +81,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.util.Progressable; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; @@ -983,6 +986,19 @@ public boolean exists(Path f) throws IOException { return super.exists(f); } + @Override + public RemoteIterator listStatusIterator(Path path) + throws IOException { + LOG.debug("AzureBlobFileSystem.listStatusIterator path : {}", path); + if (abfsStore.getAbfsConfiguration().enableAbfsListIterator()) { + AbfsListStatusRemoteIterator abfsLsItr = + new AbfsListStatusRemoteIterator(getFileStatus(path), abfsStore); + return RemoteIterators.typeCastingRemoteIterator(abfsLsItr); + } else { + return super.listStatusIterator(path); + } + } + private FileStatus tryGetFileStatus(final Path f) { try { return getFileStatus(f); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index c8dd518b4f..f4be159bf9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -102,6 +102,7 @@ import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials; import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker; import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo; +import org.apache.hadoop.fs.azurebfs.services.ListingSupport; import org.apache.hadoop.fs.azurebfs.utils.Base64; import org.apache.hadoop.fs.azurebfs.utils.CRC64; import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; @@ -131,7 +132,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class AzureBlobFileSystemStore implements Closeable { +public class AzureBlobFileSystemStore implements Closeable, ListingSupport { private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class); private AbfsClient client; @@ -838,6 +839,7 @@ public FileStatus getFileStatus(final Path path) throws IOException { * @param path The list path. * @return the entries in the path. * */ + @Override public FileStatus[] listStatus(final Path path) throws IOException { return listStatus(path, null); } @@ -854,7 +856,17 @@ public FileStatus[] listStatus(final Path path) throws IOException { * @return the entries in the path start from "startFrom" in lexical order. * */ @InterfaceStability.Unstable + @Override public FileStatus[] listStatus(final Path path, final String startFrom) throws IOException { + List fileStatuses = new ArrayList<>(); + listStatus(path, startFrom, fileStatuses, true, null); + return fileStatuses.toArray(new FileStatus[fileStatuses.size()]); + } + + @Override + public String listStatus(final Path path, final String startFrom, + List fileStatuses, final boolean fetchAll, + String continuation) throws IOException { final Instant startAggregate = abfsPerfTracker.getLatencyInstant(); long countAggregate = 0; boolean shouldContinue = true; @@ -865,16 +877,16 @@ public FileStatus[] listStatus(final Path path, final String startFrom) throws I startFrom); final String relativePath = getRelativePath(path); - String continuation = null; - // generate continuation token if a valid startFrom is provided. - if (startFrom != null && !startFrom.isEmpty()) { - continuation = getIsNamespaceEnabled() - ? generateContinuationTokenForXns(startFrom) - : generateContinuationTokenForNonXns(relativePath, startFrom); + if (continuation == null || continuation.isEmpty()) { + // generate continuation token if a valid startFrom is provided. + if (startFrom != null && !startFrom.isEmpty()) { + continuation = getIsNamespaceEnabled() + ? generateContinuationTokenForXns(startFrom) + : generateContinuationTokenForNonXns(relativePath, startFrom); + } } - ArrayList fileStatuses = new ArrayList<>(); do { try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) { AbfsRestOperation op = client.listPath(relativePath, false, @@ -928,7 +940,8 @@ public FileStatus[] listStatus(final Path path, final String startFrom) throws I perfInfo.registerSuccess(true); countAggregate++; - shouldContinue = continuation != null && !continuation.isEmpty(); + shouldContinue = + fetchAll && continuation != null && !continuation.isEmpty(); if (!shouldContinue) { perfInfo.registerAggregates(startAggregate, countAggregate); @@ -936,7 +949,7 @@ public FileStatus[] listStatus(final Path path, final String startFrom) throws I } } while (shouldContinue); - return fileStatuses.toArray(new FileStatus[fileStatuses.size()]); + return continuation; } // generate continuation token for xns account diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index cdef9c9b7a..8a9c63ddbe 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -130,6 +130,8 @@ public final class ConfigurationKeys { public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement"; public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider"; public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; + /** Setting this true will make the driver use it's own RemoteIterator implementation */ + public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator"; /** End point of ABFS account: {@value}. */ public static final String AZURE_ABFS_ENDPOINT = "fs.azure.abfs.endpoint"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index a23dfd5292..9b760c472a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -101,5 +101,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_DELETE_CONSIDERED_IDEMPOTENT = true; public static final int DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS = 5 * 60 * 1000; // 5 mins + public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true; + private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java new file mode 100644 index 0000000000..0c664fc2fb --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import javax.activation.UnsupportedDataTypeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.RemoteIterator; + +public class AbfsListStatusRemoteIterator + implements RemoteIterator { + + private static final Logger LOG = LoggerFactory + .getLogger(AbfsListStatusRemoteIterator.class); + + private static final boolean FETCH_ALL_FALSE = false; + private static final int MAX_QUEUE_SIZE = 10; + private static final long POLL_WAIT_TIME_IN_MS = 250; + + private final FileStatus fileStatus; + private final ListingSupport listingSupport; + private final ArrayBlockingQueue iteratorsQueue; + + private volatile boolean isAsyncInProgress = false; + private boolean isIterationComplete = false; + private String continuation; + private Iterator currIterator; + + public AbfsListStatusRemoteIterator(final FileStatus fileStatus, + final ListingSupport listingSupport) { + this.fileStatus = fileStatus; + this.listingSupport = listingSupport; + iteratorsQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE); + currIterator = Collections.emptyIterator(); + fetchBatchesAsync(); + } + + @Override + public boolean hasNext() throws IOException { + if (currIterator.hasNext()) { + return true; + } + currIterator = getNextIterator(); + return currIterator.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + if (!this.hasNext()) { + throw new NoSuchElementException(); + } + return currIterator.next(); + } + + private Iterator getNextIterator() throws IOException { + fetchBatchesAsync(); + try { + Object obj = null; + while (obj == null + && (!isIterationComplete || !iteratorsQueue.isEmpty())) { + obj = iteratorsQueue.poll(POLL_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS); + } + if (obj == null) { + return Collections.emptyIterator(); + } else if (obj instanceof Iterator) { + return (Iterator) obj; + } else if (obj instanceof IOException) { + throw (IOException) obj; + } else { + throw new UnsupportedDataTypeException(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Thread got interrupted: {}", e); + throw new IOException(e); + } + } + + private void fetchBatchesAsync() { + if (isAsyncInProgress || isIterationComplete) { + return; + } + synchronized (this) { + if (isAsyncInProgress || isIterationComplete) { + return; + } + isAsyncInProgress = true; + } + CompletableFuture.runAsync(() -> asyncOp()); + } + + private void asyncOp() { + try { + while (!isIterationComplete && iteratorsQueue.size() <= MAX_QUEUE_SIZE) { + addNextBatchIteratorToQueue(); + } + } catch (IOException ioe) { + LOG.error("Fetching filestatuses failed", ioe); + try { + iteratorsQueue.put(ioe); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + LOG.error("Thread got interrupted: {}", interruptedException); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Thread got interrupted: {}", e); + } finally { + synchronized (this) { + isAsyncInProgress = false; + } + } + } + + private void addNextBatchIteratorToQueue() + throws IOException, InterruptedException { + List fileStatuses = new ArrayList<>(); + continuation = listingSupport + .listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE, + continuation); + if (!fileStatuses.isEmpty()) { + iteratorsQueue.put(fileStatuses.iterator()); + } + synchronized (this) { + if (continuation == null || continuation.isEmpty()) { + isIterationComplete = true; + } + } + } + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java new file mode 100644 index 0000000000..4c449409aa --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface ListingSupport { + + /** + * @param path The list path. + * @return the entries in the path. + * @throws IOException in case of error + */ + FileStatus[] listStatus(Path path) throws IOException; + + /** + * @param path Path the list path. + * @param startFrom The entry name that list results should start with. + * For example, if folder "/folder" contains four + * files: "afile", "bfile", "hfile", "ifile". Then + * listStatus(Path("/folder"), "hfile") will return + * "/folder/hfile" and "folder/ifile" Notice that if + * startFrom is a non-existent entry name, then the + * list response contains all entries after this + * non-existent entry in lexical order: listStatus + * (Path("/folder"), "cfile") will return + * "/folder/hfile" and "/folder/ifile". + * @return the entries in the path start from "startFrom" in lexical order. + * @throws IOException in case of error + */ + FileStatus[] listStatus(Path path, String startFrom) throws IOException; + + /** + * @param path The list path + * @param startFrom The entry name that list results should start with. + * For example, if folder "/folder" contains four + * files: "afile", "bfile", "hfile", "ifile". Then + * listStatus(Path("/folder"), "hfile") will return + * "/folder/hfile" and "folder/ifile" Notice that if + * startFrom is a non-existent entry name, then the + * list response contains all entries after this + * non-existent entry in lexical order: listStatus + * (Path("/folder"), "cfile") will return + * "/folder/hfile" and "/folder/ifile". + * @param fileStatuses This list has to be filled with the FileStatus objects + * @param fetchAll flag to indicate if the above list needs to be + * filled with just one page os results or the entire + * result. + * @param continuation Contiuation token. null means start rom the begining. + * @return Continuation tokem + * @throws IOException in case of error + */ + String listStatus(Path path, String startFrom, List fileStatuses, + boolean fetchAll, String continuation) throws IOException; +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java new file mode 100644 index 0000000000..6d5e4cf3bc --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator; +import org.apache.hadoop.fs.azurebfs.services.ListingSupport; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.verify; + +/** + * Test ListStatusRemoteIterator operation. + */ +public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTest { + + private static final int TEST_FILES_NUMBER = 1000; + + public ITestAbfsListStatusRemoteIterator() throws Exception { + } + + @Test + public void testAbfsIteratorWithHasNext() throws Exception { + Path testDir = createTestDirectory(); + setPageSize(10); + final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER, + testDir, "testListPath"); + + ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore()); + RemoteIterator fsItr = new AbfsListStatusRemoteIterator( + getFileSystem().getFileStatus(testDir), listngSupport); + Assertions.assertThat(fsItr) + .describedAs("RemoteIterator should be instance of " + + "AbfsListStatusRemoteIterator by default") + .isInstanceOf(AbfsListStatusRemoteIterator.class); + int itrCount = 0; + while (fsItr.hasNext()) { + FileStatus fileStatus = fsItr.next(); + String pathStr = fileStatus.getPath().toString(); + fileNames.remove(pathStr); + itrCount++; + } + Assertions.assertThat(itrCount) + .describedAs("Number of iterations should be equal to the files " + + "created") + .isEqualTo(TEST_FILES_NUMBER); + Assertions.assertThat(fileNames.size()) + .describedAs("After removing every iterm found from the iterator, " + + "there should be no more elements in the fileNames") + .isEqualTo(0); + int minNumberOfInvokations = TEST_FILES_NUMBER / 10; + verify(listngSupport, Mockito.atLeast(minNumberOfInvokations)) + .listStatus(any(Path.class), nullable(String.class), + anyList(), anyBoolean(), + nullable(String.class)); + } + + @Test + public void testAbfsIteratorWithoutHasNext() throws Exception { + Path testDir = createTestDirectory(); + setPageSize(10); + final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER, + testDir, "testListPath"); + + ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore()); + RemoteIterator fsItr = new AbfsListStatusRemoteIterator( + getFileSystem().getFileStatus(testDir), listngSupport); + Assertions.assertThat(fsItr) + .describedAs("RemoteIterator should be instance of " + + "AbfsListStatusRemoteIterator by default") + .isInstanceOf(AbfsListStatusRemoteIterator.class); + int itrCount = 0; + for (int i = 0; i < TEST_FILES_NUMBER; i++) { + FileStatus fileStatus = fsItr.next(); + String pathStr = fileStatus.getPath().toString(); + fileNames.remove(pathStr); + itrCount++; + } + Assertions.assertThatThrownBy(() -> fsItr.next()) + .describedAs( + "next() should throw NoSuchElementException since next has been " + + "called " + TEST_FILES_NUMBER + " times") + .isInstanceOf(NoSuchElementException.class); + Assertions.assertThat(itrCount) + .describedAs("Number of iterations should be equal to the files " + + "created") + .isEqualTo(TEST_FILES_NUMBER); + Assertions.assertThat(fileNames.size()) + .describedAs("After removing every iterm found from the iterator, " + + "there should be no more elements in the fileNames") + .isEqualTo(0); + int minNumberOfInvokations = TEST_FILES_NUMBER / 10; + verify(listngSupport, Mockito.atLeast(minNumberOfInvokations)) + .listStatus(any(Path.class), nullable(String.class), + anyList(), anyBoolean(), + nullable(String.class)); + } + + @Test + public void testWithAbfsIteratorDisabled() throws Exception { + Path testDir = createTestDirectory(); + setPageSize(10); + setEnableAbfsIterator(false); + final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER, + testDir, "testListPath"); + + RemoteIterator fsItr = + getFileSystem().listStatusIterator(testDir); + Assertions.assertThat(fsItr) + .describedAs("RemoteIterator should not be instance of " + + "AbfsListStatusRemoteIterator when it is disabled") + .isNotInstanceOf(AbfsListStatusRemoteIterator.class); + int itrCount = 0; + while (fsItr.hasNext()) { + FileStatus fileStatus = fsItr.next(); + String pathStr = fileStatus.getPath().toString(); + fileNames.remove(pathStr); + itrCount++; + } + Assertions.assertThat(itrCount) + .describedAs("Number of iterations should be equal to the files " + + "created") + .isEqualTo(TEST_FILES_NUMBER); + Assertions.assertThat(fileNames.size()) + .describedAs("After removing every iterm found from the iterator, " + + "there should be no more elements in the fileNames") + .isEqualTo(0); + } + + @Test + public void testWithAbfsIteratorDisabledWithoutHasNext() throws Exception { + Path testDir = createTestDirectory(); + setPageSize(10); + setEnableAbfsIterator(false); + final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER, + testDir, "testListPath"); + + RemoteIterator fsItr = + getFileSystem().listStatusIterator(testDir); + Assertions.assertThat(fsItr) + .describedAs("RemoteIterator should not be instance of " + + "AbfsListStatusRemoteIterator when it is disabled") + .isNotInstanceOf(AbfsListStatusRemoteIterator.class); + int itrCount = 0; + for (int i = 0; i < TEST_FILES_NUMBER; i++) { + FileStatus fileStatus = fsItr.next(); + String pathStr = fileStatus.getPath().toString(); + fileNames.remove(pathStr); + itrCount++; + } + Assertions.assertThatThrownBy(() -> fsItr.next()) + .describedAs( + "next() should throw NoSuchElementException since next has been " + + "called " + TEST_FILES_NUMBER + " times") + .isInstanceOf(NoSuchElementException.class); + Assertions.assertThat(itrCount) + .describedAs("Number of iterations should be equal to the files " + + "created") + .isEqualTo(TEST_FILES_NUMBER); + Assertions.assertThat(fileNames.size()) + .describedAs("After removing every iterm found from the iterator, " + + "there should be no more elements in the fileNames") + .isEqualTo(0); + } + + @Test + public void testNextWhenNoMoreElementsPresent() throws Exception { + Path testDir = createTestDirectory(); + setPageSize(10); + RemoteIterator fsItr = + new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir), + getFileSystem().getAbfsStore()); + fsItr = Mockito.spy(fsItr); + Mockito.doReturn(false).when(fsItr).hasNext(); + + RemoteIterator finalFsItr = fsItr; + Assertions.assertThatThrownBy(() -> finalFsItr.next()) + .describedAs( + "next() should throw NoSuchElementException if hasNext() return " + + "false") + .isInstanceOf(NoSuchElementException.class); + } + + @Test + public void testHasNextForEmptyDir() throws Exception { + Path testDir = createTestDirectory(); + setPageSize(10); + RemoteIterator fsItr = getFileSystem() + .listStatusIterator(testDir); + Assertions.assertThat(fsItr.hasNext()) + .describedAs("hasNext returns false for empty directory") + .isFalse(); + } + + @Test + public void testHasNextForFile() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + String testFileName = "testFile"; + Path testFile = new Path(testFileName); + getFileSystem().create(testFile); + setPageSize(10); + RemoteIterator fsItr = fs.listStatusIterator(testFile); + Assertions.assertThat(fsItr.hasNext()) + .describedAs("hasNext returns true for file").isTrue(); + Assertions.assertThat(fsItr.next().getPath().toString()) + .describedAs("next returns the file itself") + .endsWith(testFileName); + } + + @Test + public void testIOException() throws Exception { + Path testDir = createTestDirectory(); + setPageSize(10); + getFileSystem().mkdirs(testDir); + + String exceptionMessage = "test exception"; + ListingSupport lsSupport =getMockListingSupport(exceptionMessage); + RemoteIterator fsItr = + new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir), + lsSupport); + + Assertions.assertThatThrownBy(() -> fsItr.next()) + .describedAs( + "When ioException is not null and queue is empty exception should be " + + "thrown") + .isInstanceOf(IOException.class) + .hasMessage(exceptionMessage); + } + + @Test + public void testNonExistingPath() throws Throwable { + Path nonExistingDir = new Path("nonExistingPath"); + Assertions.assertThatThrownBy( + () -> getFileSystem().listStatusIterator(nonExistingDir)).describedAs( + "test the listStatusIterator call on a path which is not " + + "present should result in FileNotFoundException") + .isInstanceOf(FileNotFoundException.class); + } + + private ListingSupport getMockListingSupport(String exceptionMessage) { + return new ListingSupport() { + @Override + public FileStatus[] listStatus(Path path) throws IOException { + return null; + } + + @Override + public FileStatus[] listStatus(Path path, String startFrom) + throws IOException { + return null; + } + + @Override + public String listStatus(Path path, String startFrom, + List fileStatuses, boolean fetchAll, String continuation) + throws IOException { + throw new IOException(exceptionMessage); + } + }; + } + + private Path createTestDirectory() throws IOException { + String testDirectoryName = "testDirectory" + System.currentTimeMillis(); + Path testDirectory = new Path(testDirectoryName); + getFileSystem().mkdirs(testDirectory); + return testDirectory; + } + + private void setEnableAbfsIterator(boolean shouldEnable) throws IOException { + AzureBlobFileSystemStore abfsStore = getAbfsStore(getFileSystem()); + abfsStore.getAbfsConfiguration().setEnableAbfsListIterator(shouldEnable); + } + + private void setPageSize(int pageSize) throws IOException { + AzureBlobFileSystemStore abfsStore = getAbfsStore(getFileSystem()); + abfsStore.getAbfsConfiguration().setListMaxResults(pageSize); + } + + private List createFilesUnderDirectory(int numFiles, Path rootPath, + String filenamePrefix) + throws ExecutionException, InterruptedException, IOException { + final List> tasks = new ArrayList<>(); + final List fileNames = new ArrayList<>(); + ExecutorService es = Executors.newFixedThreadPool(10); + try { + for (int i = 0; i < numFiles; i++) { + final Path filePath = new Path(rootPath, filenamePrefix + i); + Callable callable = () -> { + getFileSystem().create(filePath); + fileNames.add(makeQualified(filePath).toString()); + return null; + }; + tasks.add(es.submit(callable)); + } + for (Future task : tasks) { + task.get(); + } + } finally { + es.shutdownNow(); + } + return fileNames; + } + +}