HADOOP-18075. ABFS: Fix failure caused by listFiles() in ITestAbfsRestOperationException (#4040)
Contributed by Sumangala Patki Change-Id: I245c08dab050d59b90ac6fdcb4c03153db77be0b
This commit is contained in:
parent
0ed0375413
commit
36a50ba3e0
@ -1185,7 +1185,7 @@ public RemoteIterator<FileStatus> listStatusIterator(Path path)
|
|||||||
TracingContext tracingContext = new TracingContext(clientCorrelationId,
|
TracingContext tracingContext = new TracingContext(clientCorrelationId,
|
||||||
fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat, listener);
|
fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat, listener);
|
||||||
AbfsListStatusRemoteIterator abfsLsItr =
|
AbfsListStatusRemoteIterator abfsLsItr =
|
||||||
new AbfsListStatusRemoteIterator(getFileStatus(path, tracingContext), abfsStore,
|
new AbfsListStatusRemoteIterator(path, abfsStore,
|
||||||
tracingContext);
|
tracingContext);
|
||||||
return RemoteIterators.typeCastingRemoteIterator(abfsLsItr);
|
return RemoteIterators.typeCastingRemoteIterator(abfsLsItr);
|
||||||
} else {
|
} else {
|
||||||
@ -1360,9 +1360,9 @@ private void checkCheckAccessException(final Path path,
|
|||||||
* @throws IOException if the exception error code is not on the allowed list.
|
* @throws IOException if the exception error code is not on the allowed list.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static void checkException(final Path path,
|
public static void checkException(final Path path,
|
||||||
final AzureBlobFileSystemException exception,
|
final AzureBlobFileSystemException exception,
|
||||||
final AzureServiceErrorCode... allowedErrorCodesList) throws IOException {
|
final AzureServiceErrorCode... allowedErrorCodesList) throws IOException {
|
||||||
if (exception instanceof AbfsRestOperationException) {
|
if (exception instanceof AbfsRestOperationException) {
|
||||||
AbfsRestOperationException ere = (AbfsRestOperationException) exception;
|
AbfsRestOperationException ere = (AbfsRestOperationException) exception;
|
||||||
|
|
||||||
|
@ -32,7 +32,10 @@
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||||
|
|
||||||
public class AbfsListStatusRemoteIterator
|
public class AbfsListStatusRemoteIterator
|
||||||
@ -45,7 +48,7 @@ public class AbfsListStatusRemoteIterator
|
|||||||
private static final int MAX_QUEUE_SIZE = 10;
|
private static final int MAX_QUEUE_SIZE = 10;
|
||||||
private static final long POLL_WAIT_TIME_IN_MS = 250;
|
private static final long POLL_WAIT_TIME_IN_MS = 250;
|
||||||
|
|
||||||
private final FileStatus fileStatus;
|
private final Path path;
|
||||||
private final ListingSupport listingSupport;
|
private final ListingSupport listingSupport;
|
||||||
private final ArrayBlockingQueue<AbfsListResult> listResultQueue;
|
private final ArrayBlockingQueue<AbfsListResult> listResultQueue;
|
||||||
private final TracingContext tracingContext;
|
private final TracingContext tracingContext;
|
||||||
@ -55,13 +58,15 @@ public class AbfsListStatusRemoteIterator
|
|||||||
private String continuation;
|
private String continuation;
|
||||||
private Iterator<FileStatus> currIterator;
|
private Iterator<FileStatus> currIterator;
|
||||||
|
|
||||||
public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
|
public AbfsListStatusRemoteIterator(final Path path,
|
||||||
final ListingSupport listingSupport, TracingContext tracingContext) {
|
final ListingSupport listingSupport, TracingContext tracingContext)
|
||||||
this.fileStatus = fileStatus;
|
throws IOException {
|
||||||
|
this.path = path;
|
||||||
this.listingSupport = listingSupport;
|
this.listingSupport = listingSupport;
|
||||||
this.tracingContext = tracingContext;
|
this.tracingContext = tracingContext;
|
||||||
listResultQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
|
listResultQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
|
||||||
currIterator = Collections.emptyIterator();
|
currIterator = Collections.emptyIterator();
|
||||||
|
addNextBatchIteratorToQueue();
|
||||||
fetchBatchesAsync();
|
fetchBatchesAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,9 +135,6 @@ private void asyncOp() {
|
|||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
LOG.error("Thread got interrupted: {}", interruptedException);
|
LOG.error("Thread got interrupted: {}", interruptedException);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
LOG.error("Thread got interrupted: {}", e);
|
|
||||||
} finally {
|
} finally {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
isAsyncInProgress = false;
|
isAsyncInProgress = false;
|
||||||
@ -141,13 +143,21 @@ private void asyncOp() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void addNextBatchIteratorToQueue()
|
private synchronized void addNextBatchIteratorToQueue()
|
||||||
throws IOException, InterruptedException {
|
throws IOException {
|
||||||
List<FileStatus> fileStatuses = new ArrayList<>();
|
List<FileStatus> fileStatuses = new ArrayList<>();
|
||||||
continuation = listingSupport
|
try {
|
||||||
.listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
|
try {
|
||||||
continuation, tracingContext);
|
continuation = listingSupport.listStatus(path, null, fileStatuses,
|
||||||
if (!fileStatuses.isEmpty()) {
|
FETCH_ALL_FALSE, continuation, tracingContext);
|
||||||
listResultQueue.put(new AbfsListResult(fileStatuses.iterator()));
|
} catch (AbfsRestOperationException ex) {
|
||||||
|
AzureBlobFileSystem.checkException(path, ex);
|
||||||
|
}
|
||||||
|
if (!fileStatuses.isEmpty()) {
|
||||||
|
listResultQueue.put(new AbfsListResult(fileStatuses.iterator()));
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
LOG.error("Thread interrupted", ie);
|
||||||
}
|
}
|
||||||
if (continuation == null || continuation.isEmpty()) {
|
if (continuation == null || continuation.isEmpty()) {
|
||||||
isIterationComplete = true;
|
isIterationComplete = true;
|
||||||
|
@ -68,10 +68,9 @@ public void testAbfsIteratorWithHasNext() throws Exception {
|
|||||||
setPageSize(10);
|
setPageSize(10);
|
||||||
final List<String> fileNames = createFilesUnderDirectory(testDir);
|
final List<String> fileNames = createFilesUnderDirectory(testDir);
|
||||||
|
|
||||||
ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
|
ListingSupport listingSupport = Mockito.spy(getFileSystem().getAbfsStore());
|
||||||
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(
|
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(testDir,
|
||||||
getFileSystem().getFileStatus(testDir), listngSupport,
|
listingSupport, getTestTracingContext(getFileSystem(), true));
|
||||||
getTestTracingContext(getFileSystem(), true));
|
|
||||||
Assertions.assertThat(fsItr)
|
Assertions.assertThat(fsItr)
|
||||||
.describedAs("RemoteIterator should be instance of "
|
.describedAs("RemoteIterator should be instance of "
|
||||||
+ "AbfsListStatusRemoteIterator by default")
|
+ "AbfsListStatusRemoteIterator by default")
|
||||||
@ -84,7 +83,7 @@ public void testAbfsIteratorWithHasNext() throws Exception {
|
|||||||
}
|
}
|
||||||
verifyIteratorResultCount(itrCount, fileNames);
|
verifyIteratorResultCount(itrCount, fileNames);
|
||||||
int minNumberOfInvocations = TEST_FILES_NUMBER / 10;
|
int minNumberOfInvocations = TEST_FILES_NUMBER / 10;
|
||||||
verify(listngSupport, Mockito.atLeast(minNumberOfInvocations))
|
verify(listingSupport, Mockito.atLeast(minNumberOfInvocations))
|
||||||
.listStatus(any(Path.class), nullable(String.class),
|
.listStatus(any(Path.class), nullable(String.class),
|
||||||
anyList(), anyBoolean(),
|
anyList(), anyBoolean(),
|
||||||
nullable(String.class),
|
nullable(String.class),
|
||||||
@ -97,10 +96,9 @@ public void testAbfsIteratorWithoutHasNext() throws Exception {
|
|||||||
setPageSize(10);
|
setPageSize(10);
|
||||||
final List<String> fileNames = createFilesUnderDirectory(testDir);
|
final List<String> fileNames = createFilesUnderDirectory(testDir);
|
||||||
|
|
||||||
ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
|
ListingSupport listingSupport = Mockito.spy(getFileSystem().getAbfsStore());
|
||||||
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(
|
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(testDir,
|
||||||
getFileSystem().getFileStatus(testDir), listngSupport,
|
listingSupport, getTestTracingContext(getFileSystem(), true));
|
||||||
getTestTracingContext(getFileSystem(), true));
|
|
||||||
Assertions.assertThat(fsItr)
|
Assertions.assertThat(fsItr)
|
||||||
.describedAs("RemoteIterator should be instance of "
|
.describedAs("RemoteIterator should be instance of "
|
||||||
+ "AbfsListStatusRemoteIterator by default")
|
+ "AbfsListStatusRemoteIterator by default")
|
||||||
@ -114,7 +112,7 @@ public void testAbfsIteratorWithoutHasNext() throws Exception {
|
|||||||
LambdaTestUtils.intercept(NoSuchElementException.class, fsItr::next);
|
LambdaTestUtils.intercept(NoSuchElementException.class, fsItr::next);
|
||||||
verifyIteratorResultCount(itrCount, fileNames);
|
verifyIteratorResultCount(itrCount, fileNames);
|
||||||
int minNumberOfInvocations = TEST_FILES_NUMBER / 10;
|
int minNumberOfInvocations = TEST_FILES_NUMBER / 10;
|
||||||
verify(listngSupport, Mockito.atLeast(minNumberOfInvocations))
|
verify(listingSupport, Mockito.atLeast(minNumberOfInvocations))
|
||||||
.listStatus(any(Path.class), nullable(String.class),
|
.listStatus(any(Path.class), nullable(String.class),
|
||||||
anyList(), anyBoolean(),
|
anyList(), anyBoolean(),
|
||||||
nullable(String.class),
|
nullable(String.class),
|
||||||
@ -169,10 +167,9 @@ public void testWithAbfsIteratorDisabledWithoutHasNext() throws Exception {
|
|||||||
public void testNextWhenNoMoreElementsPresent() throws Exception {
|
public void testNextWhenNoMoreElementsPresent() throws Exception {
|
||||||
Path testDir = createTestDirectory();
|
Path testDir = createTestDirectory();
|
||||||
setPageSize(10);
|
setPageSize(10);
|
||||||
RemoteIterator<FileStatus> fsItr =
|
RemoteIterator<FileStatus> fsItr = new AbfsListStatusRemoteIterator(testDir,
|
||||||
new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
|
getFileSystem().getAbfsStore(),
|
||||||
getFileSystem().getAbfsStore(),
|
getTestTracingContext(getFileSystem(), true));
|
||||||
getTestTracingContext(getFileSystem(), true));
|
|
||||||
fsItr = Mockito.spy(fsItr);
|
fsItr = Mockito.spy(fsItr);
|
||||||
Mockito.doReturn(false).when(fsItr).hasNext();
|
Mockito.doReturn(false).when(fsItr).hasNext();
|
||||||
|
|
||||||
@ -212,12 +209,11 @@ public void testIOException() throws Exception {
|
|||||||
getFileSystem().mkdirs(testDir);
|
getFileSystem().mkdirs(testDir);
|
||||||
|
|
||||||
String exceptionMessage = "test exception";
|
String exceptionMessage = "test exception";
|
||||||
ListingSupport lsSupport =getMockListingSupport(exceptionMessage);
|
ListingSupport lsSupport = getMockListingSupport(exceptionMessage);
|
||||||
RemoteIterator<FileStatus> fsItr =
|
|
||||||
new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
|
|
||||||
lsSupport, getTestTracingContext(getFileSystem(), true));
|
|
||||||
|
|
||||||
LambdaTestUtils.intercept(IOException.class, fsItr::next);
|
LambdaTestUtils.intercept(IOException.class,
|
||||||
|
() -> new AbfsListStatusRemoteIterator(testDir, lsSupport,
|
||||||
|
getTestTracingContext(getFileSystem(), true)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -74,8 +74,9 @@ public void testAbfsRestOperationExceptionFormat() throws IOException {
|
|||||||
// verify its format
|
// verify its format
|
||||||
String errorMessage = ex.getLocalizedMessage();
|
String errorMessage = ex.getLocalizedMessage();
|
||||||
String[] errorFields = errorMessage.split(",");
|
String[] errorFields = errorMessage.split(",");
|
||||||
|
Assertions.assertThat(errorFields)
|
||||||
Assert.assertEquals(6, errorFields.length);
|
.describedAs("fields in exception of %s", ex)
|
||||||
|
.hasSize(6);
|
||||||
// Check status message, status code, HTTP Request Type and URL.
|
// Check status message, status code, HTTP Request Type and URL.
|
||||||
Assert.assertEquals("Operation failed: \"The specified path does not exist.\"", errorFields[0].trim());
|
Assert.assertEquals("Operation failed: \"The specified path does not exist.\"", errorFields[0].trim());
|
||||||
Assert.assertEquals("404", errorFields[1].trim());
|
Assert.assertEquals("404", errorFields[1].trim());
|
||||||
|
Loading…
Reference in New Issue
Block a user