HADOOP-18075. ABFS: Fix failure caused by listFiles() in ITestAbfsRestOperationException (#4040)
Contributed by Sumangala Patki
This commit is contained in:
parent
d05655d2ad
commit
b56af00114
@ -1193,7 +1193,7 @@ public RemoteIterator<FileStatus> listStatusIterator(Path path)
|
||||
TracingContext tracingContext = new TracingContext(clientCorrelationId,
|
||||
fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat, listener);
|
||||
AbfsListStatusRemoteIterator abfsLsItr =
|
||||
new AbfsListStatusRemoteIterator(getFileStatus(path, tracingContext), abfsStore,
|
||||
new AbfsListStatusRemoteIterator(path, abfsStore,
|
||||
tracingContext);
|
||||
return RemoteIterators.typeCastingRemoteIterator(abfsLsItr);
|
||||
} else {
|
||||
@ -1368,9 +1368,9 @@ private void checkCheckAccessException(final Path path,
|
||||
* @throws IOException if the exception error code is not on the allowed list.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static void checkException(final Path path,
|
||||
final AzureBlobFileSystemException exception,
|
||||
final AzureServiceErrorCode... allowedErrorCodesList) throws IOException {
|
||||
public static void checkException(final Path path,
|
||||
final AzureBlobFileSystemException exception,
|
||||
final AzureServiceErrorCode... allowedErrorCodesList) throws IOException {
|
||||
if (exception instanceof AbfsRestOperationException) {
|
||||
AbfsRestOperationException ere = (AbfsRestOperationException) exception;
|
||||
|
||||
|
@ -32,7 +32,10 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
|
||||
public class AbfsListStatusRemoteIterator
|
||||
@ -45,7 +48,7 @@ public class AbfsListStatusRemoteIterator
|
||||
private static final int MAX_QUEUE_SIZE = 10;
|
||||
private static final long POLL_WAIT_TIME_IN_MS = 250;
|
||||
|
||||
private final FileStatus fileStatus;
|
||||
private final Path path;
|
||||
private final ListingSupport listingSupport;
|
||||
private final ArrayBlockingQueue<AbfsListResult> listResultQueue;
|
||||
private final TracingContext tracingContext;
|
||||
@ -55,13 +58,15 @@ public class AbfsListStatusRemoteIterator
|
||||
private String continuation;
|
||||
private Iterator<FileStatus> currIterator;
|
||||
|
||||
public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
|
||||
final ListingSupport listingSupport, TracingContext tracingContext) {
|
||||
this.fileStatus = fileStatus;
|
||||
public AbfsListStatusRemoteIterator(final Path path,
|
||||
final ListingSupport listingSupport, TracingContext tracingContext)
|
||||
throws IOException {
|
||||
this.path = path;
|
||||
this.listingSupport = listingSupport;
|
||||
this.tracingContext = tracingContext;
|
||||
listResultQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
|
||||
currIterator = Collections.emptyIterator();
|
||||
addNextBatchIteratorToQueue();
|
||||
fetchBatchesAsync();
|
||||
}
|
||||
|
||||
@ -130,9 +135,6 @@ private void asyncOp() {
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.error("Thread got interrupted: {}", interruptedException);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.error("Thread got interrupted: {}", e);
|
||||
} finally {
|
||||
synchronized (this) {
|
||||
isAsyncInProgress = false;
|
||||
@ -141,13 +143,21 @@ private void asyncOp() {
|
||||
}
|
||||
|
||||
private synchronized void addNextBatchIteratorToQueue()
|
||||
throws IOException, InterruptedException {
|
||||
throws IOException {
|
||||
List<FileStatus> fileStatuses = new ArrayList<>();
|
||||
continuation = listingSupport
|
||||
.listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
|
||||
continuation, tracingContext);
|
||||
if (!fileStatuses.isEmpty()) {
|
||||
listResultQueue.put(new AbfsListResult(fileStatuses.iterator()));
|
||||
try {
|
||||
try {
|
||||
continuation = listingSupport.listStatus(path, null, fileStatuses,
|
||||
FETCH_ALL_FALSE, continuation, tracingContext);
|
||||
} catch (AbfsRestOperationException ex) {
|
||||
AzureBlobFileSystem.checkException(path, ex);
|
||||
}
|
||||
if (!fileStatuses.isEmpty()) {
|
||||
listResultQueue.put(new AbfsListResult(fileStatuses.iterator()));
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.error("Thread interrupted", ie);
|
||||
}
|
||||
if (continuation == null || continuation.isEmpty()) {
|
||||
isIterationComplete = true;
|
||||
|
@ -68,10 +68,9 @@ public void testAbfsIteratorWithHasNext() throws Exception {
|
||||
setPageSize(10);
|
||||
final List<String> fileNames = createFilesUnderDirectory(testDir);
|
||||
|
||||
ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
|
||||
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(
|
||||
getFileSystem().getFileStatus(testDir), listngSupport,
|
||||
getTestTracingContext(getFileSystem(), true));
|
||||
ListingSupport listingSupport = Mockito.spy(getFileSystem().getAbfsStore());
|
||||
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(testDir,
|
||||
listingSupport, getTestTracingContext(getFileSystem(), true));
|
||||
Assertions.assertThat(fsItr)
|
||||
.describedAs("RemoteIterator should be instance of "
|
||||
+ "AbfsListStatusRemoteIterator by default")
|
||||
@ -84,7 +83,7 @@ public void testAbfsIteratorWithHasNext() throws Exception {
|
||||
}
|
||||
verifyIteratorResultCount(itrCount, fileNames);
|
||||
int minNumberOfInvocations = TEST_FILES_NUMBER / 10;
|
||||
verify(listngSupport, Mockito.atLeast(minNumberOfInvocations))
|
||||
verify(listingSupport, Mockito.atLeast(minNumberOfInvocations))
|
||||
.listStatus(any(Path.class), nullable(String.class),
|
||||
anyList(), anyBoolean(),
|
||||
nullable(String.class),
|
||||
@ -97,10 +96,9 @@ public void testAbfsIteratorWithoutHasNext() throws Exception {
|
||||
setPageSize(10);
|
||||
final List<String> fileNames = createFilesUnderDirectory(testDir);
|
||||
|
||||
ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
|
||||
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(
|
||||
getFileSystem().getFileStatus(testDir), listngSupport,
|
||||
getTestTracingContext(getFileSystem(), true));
|
||||
ListingSupport listingSupport = Mockito.spy(getFileSystem().getAbfsStore());
|
||||
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(testDir,
|
||||
listingSupport, getTestTracingContext(getFileSystem(), true));
|
||||
Assertions.assertThat(fsItr)
|
||||
.describedAs("RemoteIterator should be instance of "
|
||||
+ "AbfsListStatusRemoteIterator by default")
|
||||
@ -114,7 +112,7 @@ public void testAbfsIteratorWithoutHasNext() throws Exception {
|
||||
LambdaTestUtils.intercept(NoSuchElementException.class, fsItr::next);
|
||||
verifyIteratorResultCount(itrCount, fileNames);
|
||||
int minNumberOfInvocations = TEST_FILES_NUMBER / 10;
|
||||
verify(listngSupport, Mockito.atLeast(minNumberOfInvocations))
|
||||
verify(listingSupport, Mockito.atLeast(minNumberOfInvocations))
|
||||
.listStatus(any(Path.class), nullable(String.class),
|
||||
anyList(), anyBoolean(),
|
||||
nullable(String.class),
|
||||
@ -169,10 +167,9 @@ public void testWithAbfsIteratorDisabledWithoutHasNext() throws Exception {
|
||||
public void testNextWhenNoMoreElementsPresent() throws Exception {
|
||||
Path testDir = createTestDirectory();
|
||||
setPageSize(10);
|
||||
RemoteIterator<FileStatus> fsItr =
|
||||
new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
|
||||
getFileSystem().getAbfsStore(),
|
||||
getTestTracingContext(getFileSystem(), true));
|
||||
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(testDir,
|
||||
getFileSystem().getAbfsStore(),
|
||||
getTestTracingContext(getFileSystem(), true));
|
||||
fsItr = Mockito.spy(fsItr);
|
||||
Mockito.doReturn(false).when(fsItr).hasNext();
|
||||
|
||||
@ -212,12 +209,11 @@ public void testIOException() throws Exception {
|
||||
getFileSystem().mkdirs(testDir);
|
||||
|
||||
String exceptionMessage = "test exception";
|
||||
ListingSupport lsSupport =getMockListingSupport(exceptionMessage);
|
||||
RemoteIterator<FileStatus> fsItr =
|
||||
new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
|
||||
lsSupport, getTestTracingContext(getFileSystem(), true));
|
||||
ListingSupport lsSupport = getMockListingSupport(exceptionMessage);
|
||||
|
||||
LambdaTestUtils.intercept(IOException.class, fsItr::next);
|
||||
LambdaTestUtils.intercept(IOException.class,
|
||||
() -> new AbfsListStatusRemoteIterator(testDir, lsSupport,
|
||||
getTestTracingContext(getFileSystem(), true)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -74,8 +74,9 @@ public void testAbfsRestOperationExceptionFormat() throws IOException {
|
||||
// verify its format
|
||||
String errorMessage = ex.getLocalizedMessage();
|
||||
String[] errorFields = errorMessage.split(",");
|
||||
|
||||
Assert.assertEquals(6, errorFields.length);
|
||||
Assertions.assertThat(errorFields)
|
||||
.describedAs("fields in exception of %s", ex)
|
||||
.hasSize(6);
|
||||
// Check status message, status code, HTTP Request Type and URL.
|
||||
Assert.assertEquals("Operation failed: \"The specified path does not exist.\"", errorFields[0].trim());
|
||||
Assert.assertEquals("404", errorFields[1].trim());
|
||||
|
Loading…
Reference in New Issue
Block a user