diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListResult.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListResult.java new file mode 100644 index 0000000000..0b63a34293 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListResult.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; + +import org.apache.hadoop.fs.FileStatus; + +/** + * Class to store listStatus results for AbfsListStatusRemoteIterator. The + * results can either be of type Iterator or an exception thrown during the + * operation + */ +public class AbfsListResult { + private IOException listException = null; + + private Iterator fileStatusIterator + = Collections.emptyIterator(); + + AbfsListResult(IOException ex) { + this.listException = ex; + } + + AbfsListResult(Iterator fileStatusIterator) { + this.fileStatusIterator = fileStatusIterator; + } + + IOException getListingException() { + return listException; + } + + Iterator getFileStatusIterator() { + return fileStatusIterator; + } + + boolean isFailedListing() { + return (listException != null); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java index 835217f945..ce6207bf5f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java @@ -27,7 +27,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import javax.activation.UnsupportedDataTypeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +47,7 @@ public class AbfsListStatusRemoteIterator private final FileStatus fileStatus; private final ListingSupport listingSupport; - private final ArrayBlockingQueue iteratorsQueue; + private final ArrayBlockingQueue listResultQueue; private final TracingContext tracingContext; private volatile boolean isAsyncInProgress = false; @@ -61,7 +60,7 @@ public AbfsListStatusRemoteIterator(final FileStatus fileStatus, this.fileStatus = fileStatus; this.listingSupport = listingSupport; this.tracingContext = tracingContext; - iteratorsQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE); + listResultQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE); currIterator = Collections.emptyIterator(); fetchBatchesAsync(); } @@ -86,19 +85,17 @@ public FileStatus next() throws IOException { private Iterator getNextIterator() throws IOException { fetchBatchesAsync(); try { - Object obj = null; - while (obj == null - && (!isIterationComplete || !iteratorsQueue.isEmpty())) { - obj = iteratorsQueue.poll(POLL_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS); + AbfsListResult listResult = null; + while (listResult == null + && (!isIterationComplete || !listResultQueue.isEmpty())) { + listResult = listResultQueue.poll(POLL_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS); } - if (obj == null) { + if (listResult == null) { return Collections.emptyIterator(); - } else if (obj instanceof Iterator) { - return (Iterator) obj; - } else if (obj instanceof IOException) { - throw (IOException) obj; + } else if (listResult.isFailedListing()) { + throw listResult.getListingException(); } else { - throw new UnsupportedDataTypeException(); + return listResult.getFileStatusIterator(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -122,13 +119,13 @@ private void fetchBatchesAsync() { private void asyncOp() { try { - while (!isIterationComplete && iteratorsQueue.size() <= MAX_QUEUE_SIZE) { + while (!isIterationComplete && listResultQueue.size() <= MAX_QUEUE_SIZE) { addNextBatchIteratorToQueue(); } } catch (IOException ioe) { LOG.error("Fetching filestatuses failed", ioe); try { - iteratorsQueue.put(ioe); + listResultQueue.put(new AbfsListResult(ioe)); } catch (InterruptedException interruptedException) { Thread.currentThread().interrupt(); LOG.error("Thread got interrupted: {}", interruptedException); @@ -143,19 +140,17 @@ private void asyncOp() { } } - private void addNextBatchIteratorToQueue() + private synchronized void addNextBatchIteratorToQueue() throws IOException, InterruptedException { List fileStatuses = new ArrayList<>(); continuation = listingSupport .listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE, continuation, tracingContext); if (!fileStatuses.isEmpty()) { - iteratorsQueue.put(fileStatuses.iterator()); + listResultQueue.put(new AbfsListResult(fileStatuses.iterator())); } - synchronized (this) { - if (continuation == null || continuation.isEmpty()) { - isIterationComplete = true; - } + if (continuation == null || continuation.isEmpty()) { + isIterationComplete = true; } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java index 9e81a0127b..3f50aec659 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java @@ -21,17 +21,20 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.assertj.core.api.Assertions; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -39,6 +42,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator; import org.apache.hadoop.fs.azurebfs.services.ListingSupport; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.test.LambdaTestUtils; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -52,6 +56,8 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTest { private static final int TEST_FILES_NUMBER = 1000; + private static final Logger LOG = LoggerFactory.getLogger( + ITestAbfsListStatusRemoteIterator.class); public ITestAbfsListStatusRemoteIterator() throws Exception { } @@ -60,8 +66,7 @@ public ITestAbfsListStatusRemoteIterator() throws Exception { public void testAbfsIteratorWithHasNext() throws Exception { Path testDir = createTestDirectory(); setPageSize(10); - final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER, - testDir, "testListPath"); + final List fileNames = createFilesUnderDirectory(testDir); ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore()); RemoteIterator fsItr = new AbfsListStatusRemoteIterator( @@ -74,20 +79,12 @@ public void testAbfsIteratorWithHasNext() throws Exception { int itrCount = 0; while (fsItr.hasNext()) { FileStatus fileStatus = fsItr.next(); - String pathStr = fileStatus.getPath().toString(); - fileNames.remove(pathStr); + verifyIteratorResultContent(fileStatus, fileNames); itrCount++; } - Assertions.assertThat(itrCount) - .describedAs("Number of iterations should be equal to the files " - + "created") - .isEqualTo(TEST_FILES_NUMBER); - Assertions.assertThat(fileNames.size()) - .describedAs("After removing every iterm found from the iterator, " - + "there should be no more elements in the fileNames") - .isEqualTo(0); - int minNumberOfInvokations = TEST_FILES_NUMBER / 10; - verify(listngSupport, Mockito.atLeast(minNumberOfInvokations)) + verifyIteratorResultCount(itrCount, fileNames); + int minNumberOfInvocations = TEST_FILES_NUMBER / 10; + verify(listngSupport, Mockito.atLeast(minNumberOfInvocations)) .listStatus(any(Path.class), nullable(String.class), anyList(), anyBoolean(), nullable(String.class), @@ -98,8 +95,7 @@ public void testAbfsIteratorWithHasNext() throws Exception { public void testAbfsIteratorWithoutHasNext() throws Exception { Path testDir = createTestDirectory(); setPageSize(10); - final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER, - testDir, "testListPath"); + final List fileNames = createFilesUnderDirectory(testDir); ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore()); RemoteIterator fsItr = new AbfsListStatusRemoteIterator( @@ -112,25 +108,13 @@ public void testAbfsIteratorWithoutHasNext() throws Exception { int itrCount = 0; for (int i = 0; i < TEST_FILES_NUMBER; i++) { FileStatus fileStatus = fsItr.next(); - String pathStr = fileStatus.getPath().toString(); - fileNames.remove(pathStr); + verifyIteratorResultContent(fileStatus, fileNames); itrCount++; } - Assertions.assertThatThrownBy(() -> fsItr.next()) - .describedAs( - "next() should throw NoSuchElementException since next has been " - + "called " + TEST_FILES_NUMBER + " times") - .isInstanceOf(NoSuchElementException.class); - Assertions.assertThat(itrCount) - .describedAs("Number of iterations should be equal to the files " - + "created") - .isEqualTo(TEST_FILES_NUMBER); - Assertions.assertThat(fileNames.size()) - .describedAs("After removing every iterm found from the iterator, " - + "there should be no more elements in the fileNames") - .isEqualTo(0); - int minNumberOfInvokations = TEST_FILES_NUMBER / 10; - verify(listngSupport, Mockito.atLeast(minNumberOfInvokations)) + LambdaTestUtils.intercept(NoSuchElementException.class, fsItr::next); + verifyIteratorResultCount(itrCount, fileNames); + int minNumberOfInvocations = TEST_FILES_NUMBER / 10; + verify(listngSupport, Mockito.atLeast(minNumberOfInvocations)) .listStatus(any(Path.class), nullable(String.class), anyList(), anyBoolean(), nullable(String.class), @@ -141,9 +125,8 @@ public void testAbfsIteratorWithoutHasNext() throws Exception { public void testWithAbfsIteratorDisabled() throws Exception { Path testDir = createTestDirectory(); setPageSize(10); - setEnableAbfsIterator(false); - final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER, - testDir, "testListPath"); + disableAbfsIterator(); + final List fileNames = createFilesUnderDirectory(testDir); RemoteIterator fsItr = getFileSystem().listStatusIterator(testDir); @@ -154,73 +137,46 @@ public void testWithAbfsIteratorDisabled() throws Exception { int itrCount = 0; while (fsItr.hasNext()) { FileStatus fileStatus = fsItr.next(); - String pathStr = fileStatus.getPath().toString(); - fileNames.remove(pathStr); + verifyIteratorResultContent(fileStatus, fileNames); itrCount++; } - Assertions.assertThat(itrCount) - .describedAs("Number of iterations should be equal to the files " - + "created") - .isEqualTo(TEST_FILES_NUMBER); - Assertions.assertThat(fileNames.size()) - .describedAs("After removing every iterm found from the iterator, " - + "there should be no more elements in the fileNames") - .isEqualTo(0); + verifyIteratorResultCount(itrCount, fileNames); } @Test public void testWithAbfsIteratorDisabledWithoutHasNext() throws Exception { Path testDir = createTestDirectory(); setPageSize(10); - setEnableAbfsIterator(false); - final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER, - testDir, "testListPath"); + disableAbfsIterator(); + final List fileNames = createFilesUnderDirectory(testDir); - RemoteIterator fsItr = - getFileSystem().listStatusIterator(testDir); - Assertions.assertThat(fsItr) - .describedAs("RemoteIterator should not be instance of " - + "AbfsListStatusRemoteIterator when it is disabled") + RemoteIterator fsItr = getFileSystem().listStatusIterator( + testDir); + Assertions.assertThat(fsItr).describedAs( + "RemoteIterator should not be instance of " + + "AbfsListStatusRemoteIterator when it is disabled") .isNotInstanceOf(AbfsListStatusRemoteIterator.class); - int itrCount = 0; - for (int i = 0; i < TEST_FILES_NUMBER; i++) { + int itrCount; + for (itrCount = 0; itrCount < TEST_FILES_NUMBER; itrCount++) { FileStatus fileStatus = fsItr.next(); - String pathStr = fileStatus.getPath().toString(); - fileNames.remove(pathStr); - itrCount++; + verifyIteratorResultContent(fileStatus, fileNames); } - Assertions.assertThatThrownBy(() -> fsItr.next()) - .describedAs( - "next() should throw NoSuchElementException since next has been " - + "called " + TEST_FILES_NUMBER + " times") - .isInstanceOf(NoSuchElementException.class); - Assertions.assertThat(itrCount) - .describedAs("Number of iterations should be equal to the files " - + "created") - .isEqualTo(TEST_FILES_NUMBER); - Assertions.assertThat(fileNames.size()) - .describedAs("After removing every iterm found from the iterator, " - + "there should be no more elements in the fileNames") - .isEqualTo(0); + LambdaTestUtils.intercept(NoSuchElementException.class, fsItr::next); + verifyIteratorResultCount(itrCount, fileNames); } @Test public void testNextWhenNoMoreElementsPresent() throws Exception { Path testDir = createTestDirectory(); setPageSize(10); - RemoteIterator fsItr = + RemoteIterator fsItr = new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir), getFileSystem().getAbfsStore(), getTestTracingContext(getFileSystem(), true)); fsItr = Mockito.spy(fsItr); Mockito.doReturn(false).when(fsItr).hasNext(); - RemoteIterator finalFsItr = fsItr; - Assertions.assertThatThrownBy(() -> finalFsItr.next()) - .describedAs( - "next() should throw NoSuchElementException if hasNext() return " - + "false") - .isInstanceOf(NoSuchElementException.class); + LambdaTestUtils.intercept(NoSuchElementException.class, fsItr::next); } @Test @@ -257,38 +213,47 @@ public void testIOException() throws Exception { String exceptionMessage = "test exception"; ListingSupport lsSupport =getMockListingSupport(exceptionMessage); - RemoteIterator fsItr = + RemoteIterator fsItr = new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir), lsSupport, getTestTracingContext(getFileSystem(), true)); - Assertions.assertThatThrownBy(() -> fsItr.next()) - .describedAs( - "When ioException is not null and queue is empty exception should be " - + "thrown") - .isInstanceOf(IOException.class) - .hasMessage(exceptionMessage); + LambdaTestUtils.intercept(IOException.class, fsItr::next); } @Test - public void testNonExistingPath() throws Throwable { + public void testNonExistingPath() throws Exception { Path nonExistingDir = new Path("nonExistingPath"); - Assertions.assertThatThrownBy( - () -> getFileSystem().listStatusIterator(nonExistingDir)).describedAs( - "test the listStatusIterator call on a path which is not " - + "present should result in FileNotFoundException") - .isInstanceOf(FileNotFoundException.class); + LambdaTestUtils.intercept(FileNotFoundException.class, + () -> getFileSystem().listStatusIterator(nonExistingDir)); + } + + private void verifyIteratorResultContent(FileStatus fileStatus, + List fileNames) { + String pathStr = fileStatus.getPath().toString(); + Assert.assertTrue( + String.format("Could not remove path %s from filenames %s", pathStr, + fileNames), fileNames.remove(pathStr)); + } + + private void verifyIteratorResultCount(int itrCount, List fileNames) { + Assertions.assertThat(itrCount).describedAs( + "Number of iterations should be equal to the files created") + .isEqualTo(TEST_FILES_NUMBER); + Assertions.assertThat(fileNames) + .describedAs("After removing every item found from the iterator, " + + "there should be no more elements in the fileNames") + .hasSize(0); } private ListingSupport getMockListingSupport(String exceptionMessage) { return new ListingSupport() { @Override - public FileStatus[] listStatus(Path path, TracingContext tracingContext) throws IOException { + public FileStatus[] listStatus(Path path, TracingContext tracingContext) { return null; } @Override - public FileStatus[] listStatus(Path path, String startFrom, TracingContext tracingContext) - throws IOException { + public FileStatus[] listStatus(Path path, String startFrom, TracingContext tracingContext) { return null; } @@ -303,15 +268,14 @@ public String listStatus(Path path, String startFrom, } private Path createTestDirectory() throws IOException { - String testDirectoryName = "testDirectory" + System.currentTimeMillis(); - Path testDirectory = path(testDirectoryName); + Path testDirectory = path("testDirectory"); getFileSystem().mkdirs(testDirectory); return testDirectory; } - private void setEnableAbfsIterator(boolean shouldEnable) throws IOException { + private void disableAbfsIterator() throws IOException { AzureBlobFileSystemStore abfsStore = getAbfsStore(getFileSystem()); - abfsStore.getAbfsConfiguration().setEnableAbfsListIterator(shouldEnable); + abfsStore.getAbfsConfiguration().setEnableAbfsListIterator(false); } private void setPageSize(int pageSize) throws IOException { @@ -319,21 +283,21 @@ private void setPageSize(int pageSize) throws IOException { abfsStore.getAbfsConfiguration().setListMaxResults(pageSize); } - private List createFilesUnderDirectory(int numFiles, Path rootPath, - String filenamePrefix) + private List createFilesUnderDirectory(Path rootPath) throws ExecutionException, InterruptedException, IOException { final List> tasks = new ArrayList<>(); - final List fileNames = new ArrayList<>(); + final List fileNames = Collections.synchronizedList(new ArrayList<>()); ExecutorService es = Executors.newFixedThreadPool(10); try { - for (int i = 0; i < numFiles; i++) { - final Path filePath = new Path(rootPath, filenamePrefix + i); - Callable callable = () -> { - getFileSystem().create(filePath); - fileNames.add(makeQualified(filePath).toString()); + for (int i = 0; i < ITestAbfsListStatusRemoteIterator.TEST_FILES_NUMBER; i++) { + Path filePath = makeQualified(new Path(rootPath, "testListPath" + i)); + tasks.add(es.submit(() -> { + touch(filePath); + synchronized (fileNames) { + Assert.assertTrue(fileNames.add(filePath.toString())); + } return null; - }; - tasks.add(es.submit(callable)); + })); } for (Future task : tasks) { task.get(); @@ -341,6 +305,10 @@ private List createFilesUnderDirectory(int numFiles, Path rootPath, } finally { es.shutdownNow(); } + LOG.debug(fileNames.toString()); + Assertions.assertThat(fileNames) + .describedAs("File creation incorrect or fileNames not added to list") + .hasSize(ITestAbfsListStatusRemoteIterator.TEST_FILES_NUMBER); return fileNames; }