HADOOP-13675. Bug in return value for delete() calls in WASB. Contributed by Dushyanth
This commit is contained in:
parent
8c4680852b
commit
15dd1f3381
@ -2045,10 +2045,10 @@ public PartialListing listAll(String prefix, final int maxListingCount,
|
||||
* The key to search for.
|
||||
* @return The wanted directory, or null if not found.
|
||||
*/
|
||||
private static FileMetadata getDirectoryInList(
|
||||
private static FileMetadata getFileMetadataInList(
|
||||
final Iterable<FileMetadata> list, String key) {
|
||||
for (FileMetadata current : list) {
|
||||
if (current.isDir() && current.getKey().equals(key)) {
|
||||
if (current.getKey().equals(key)) {
|
||||
return current;
|
||||
}
|
||||
}
|
||||
@ -2114,7 +2114,7 @@ private PartialListing list(String prefix, String delimiter,
|
||||
|
||||
// Add the metadata to the list, but remove any existing duplicate
|
||||
// entries first that we may have added by finding nested files.
|
||||
FileMetadata existing = getDirectoryInList(fileMetadata, blobKey);
|
||||
FileMetadata existing = getFileMetadataInList(fileMetadata, blobKey);
|
||||
if (existing != null) {
|
||||
fileMetadata.remove(existing);
|
||||
}
|
||||
@ -2141,7 +2141,7 @@ private PartialListing list(String prefix, String delimiter,
|
||||
|
||||
// Add the directory metadata to the list only if it's not already
|
||||
// there.
|
||||
if (getDirectoryInList(fileMetadata, dirKey) == null) {
|
||||
if (getFileMetadataInList(fileMetadata, dirKey) == null) {
|
||||
fileMetadata.add(directoryMetadata);
|
||||
}
|
||||
|
||||
@ -2249,7 +2249,7 @@ private void buildUpList(CloudBlobDirectoryWrapper aCloudBlobDirectory,
|
||||
|
||||
// Add the directory metadata to the list only if it's not already
|
||||
// there.
|
||||
FileMetadata existing = getDirectoryInList(aFileMetadataList, blobKey);
|
||||
FileMetadata existing = getFileMetadataInList(aFileMetadataList, blobKey);
|
||||
if (existing != null) {
|
||||
aFileMetadataList.remove(existing);
|
||||
}
|
||||
@ -2278,7 +2278,7 @@ private void buildUpList(CloudBlobDirectoryWrapper aCloudBlobDirectory,
|
||||
// absolute path is being used or not.
|
||||
String dirKey = normalizeKey(directory);
|
||||
|
||||
if (getDirectoryInList(aFileMetadataList, dirKey) == null) {
|
||||
if (getFileMetadataInList(aFileMetadataList, dirKey) == null) {
|
||||
// Reached the targeted listing depth. Return metadata for the
|
||||
// directory using default permissions.
|
||||
//
|
||||
@ -2376,18 +2376,24 @@ private void safeDelete(CloudBlobWrapper blob, SelfRenewingLease lease) throws S
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* API implementation to delete a blob in the back end azure storage.
|
||||
*/
|
||||
@Override
|
||||
public void delete(String key, SelfRenewingLease lease) throws IOException {
|
||||
public boolean delete(String key, SelfRenewingLease lease) throws IOException {
|
||||
try {
|
||||
if (checkContainer(ContainerAccessType.ReadThenWrite) == ContainerState.DoesntExist) {
|
||||
// Container doesn't exist, no need to do anything
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Get the blob reference and delete it.
|
||||
CloudBlobWrapper blob = getBlobReference(key);
|
||||
if (blob.exists(getInstrumentedContext())) {
|
||||
safeDelete(blob, lease);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// Re-throw as an Azure storage exception.
|
||||
@ -2395,10 +2401,13 @@ public void delete(String key, SelfRenewingLease lease) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* API implementation to delete a blob in the back end azure storage.
|
||||
*/
|
||||
@Override
|
||||
public void delete(String key) throws IOException {
|
||||
public boolean delete(String key) throws IOException {
|
||||
try {
|
||||
delete(key, null);
|
||||
return delete(key, null);
|
||||
} catch (IOException e) {
|
||||
Throwable t = e.getCause();
|
||||
if(t != null && t instanceof StorageException) {
|
||||
@ -2407,7 +2416,7 @@ public void delete(String key) throws IOException {
|
||||
SelfRenewingLease lease = null;
|
||||
try {
|
||||
lease = acquireLease(key);
|
||||
delete(key, lease);
|
||||
return delete(key, lease);
|
||||
} catch (AzureException e3) {
|
||||
LOG.warn("Got unexpected exception trying to acquire lease on "
|
||||
+ key + "." + e3.getMessage());
|
||||
|
@ -1765,8 +1765,11 @@ public boolean delete(Path f, boolean recursive,
|
||||
}
|
||||
|
||||
try {
|
||||
store.delete(key);
|
||||
if (store.delete(key)) {
|
||||
instrumentation.fileDeleted();
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} catch(IOException e) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
@ -1885,7 +1888,7 @@ public boolean execute(FileMetadata file) throws IOException{
|
||||
}
|
||||
|
||||
// Delete the current directory
|
||||
if (!deleteFile(metaFile.getKey(), metaFile.isDir())) {
|
||||
if (store.retrieveMetadata(metaFile.getKey()) != null && !deleteFile(metaFile.getKey(), metaFile.isDir())) {
|
||||
LOG.error("Failed delete directory {}", f.toString());
|
||||
return false;
|
||||
}
|
||||
@ -1913,12 +1916,16 @@ public AzureFileSystemThreadPoolExecutor getThreadPoolExecutor(int threadCount,
|
||||
@VisibleForTesting
|
||||
boolean deleteFile(String key, boolean isDir) throws IOException {
|
||||
try {
|
||||
store.delete(key);
|
||||
if (store.delete(key)) {
|
||||
if (isDir) {
|
||||
instrumentation.directoryDeleted();
|
||||
} else {
|
||||
instrumentation.fileDeleted();
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} catch(IOException e) {
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
|
||||
@ -1929,8 +1936,6 @@ boolean deleteFile(String key, boolean isDir) throws IOException {
|
||||
|
||||
throw e;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -2790,6 +2795,8 @@ void handleFile(FileMetadata file, FileMetadata tempFile)
|
||||
throws IOException {
|
||||
|
||||
LOG.debug("Deleting dangling file {}", file.getKey());
|
||||
// Not handling delete return type as false return essentially
|
||||
// means its a no-op for the caller
|
||||
store.delete(file.getKey());
|
||||
store.delete(tempFile.getKey());
|
||||
}
|
||||
|
@ -74,7 +74,16 @@ PartialListing listAll(String prefix, final int maxListingCount,
|
||||
void changePermissionStatus(String key, PermissionStatus newPermission)
|
||||
throws AzureException;
|
||||
|
||||
void delete(String key) throws IOException;
|
||||
/**
|
||||
* API to delete a blob in the back end azure storage.
|
||||
* @param key - key to the blob being deleted.
|
||||
* @return return true when delete is successful, false if
|
||||
* blob cannot be found or delete is not possible without
|
||||
* exception.
|
||||
* @throws IOException Exception encountered while deleting in
|
||||
* azure storage.
|
||||
*/
|
||||
boolean delete(String key) throws IOException;
|
||||
|
||||
void rename(String srcKey, String dstKey) throws IOException;
|
||||
|
||||
@ -104,7 +113,17 @@ void updateFolderLastModifiedTime(String key, SelfRenewingLease folderLease)
|
||||
void updateFolderLastModifiedTime(String key, Date lastModified,
|
||||
SelfRenewingLease folderLease) throws AzureException;
|
||||
|
||||
void delete(String key, SelfRenewingLease lease) throws IOException;
|
||||
/**
|
||||
* API to delete a blob in the back end azure storage.
|
||||
* @param key - key to the blob being deleted.
|
||||
* @param lease - Active lease on the blob.
|
||||
* @return return true when delete is successful, false if
|
||||
* blob cannot be found or delete is not possible without
|
||||
* exception.
|
||||
* @throws IOException Exception encountered while deleting in
|
||||
* azure storage.
|
||||
*/
|
||||
boolean delete(String key, SelfRenewingLease lease) throws IOException;
|
||||
|
||||
SelfRenewingLease acquireLease(String key) throws AzureException;
|
||||
|
||||
|
@ -0,0 +1,119 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/***
|
||||
* Test class to hold all Live Azure storage concurrency tests.
|
||||
*/
|
||||
public class TestNativeAzureFileSystemConcurrencyLive
|
||||
extends AbstractWasbTestBase {
|
||||
|
||||
private static final int TEST_COUNT = 102;
|
||||
@Override
|
||||
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||
return AzureBlobStorageTestAccount.create();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test multi-threaded deletes in WASB. Expected behavior is one of the thread
|
||||
* should be to successfully delete the file and return true and all other
|
||||
* threads need to return false.
|
||||
*/
|
||||
@Test
|
||||
public void testMultiThreadedDeletes() throws Exception {
|
||||
Path testFile = new Path("test.dat");
|
||||
fs.create(testFile).close();
|
||||
|
||||
int threadCount = TEST_COUNT;
|
||||
DeleteHelperThread[] helperThreads = new DeleteHelperThread[threadCount];
|
||||
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
helperThreads[i] = new DeleteHelperThread(fs, testFile);
|
||||
}
|
||||
|
||||
Thread[] threads = new Thread[threadCount];
|
||||
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
threads[i] = new Thread(helperThreads[i]);
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
boolean deleteSuccess = false;
|
||||
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
|
||||
Assert.assertFalse("child thread has exception : " + helperThreads[i].getException(),
|
||||
helperThreads[i].getExceptionEncounteredFlag());
|
||||
|
||||
if (deleteSuccess) {
|
||||
Assert.assertFalse("More than one thread delete() retuhelperThreads[i].getDeleteSuccess()",
|
||||
helperThreads[i].getExceptionEncounteredFlag());
|
||||
} else {
|
||||
deleteSuccess = helperThreads[i].getDeleteSuccess();
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertTrue("No successfull delete found", deleteSuccess);
|
||||
}
|
||||
}
|
||||
|
||||
class DeleteHelperThread implements Runnable {
|
||||
|
||||
private FileSystem fs;
|
||||
private Path p;
|
||||
private boolean deleteSuccess;
|
||||
private boolean exceptionEncountered;
|
||||
private Exception ex;
|
||||
|
||||
public DeleteHelperThread(FileSystem fs, Path p) {
|
||||
this.fs = fs;
|
||||
this.p = p;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
deleteSuccess = fs.delete(p, false);
|
||||
} catch (Exception ioEx) {
|
||||
exceptionEncountered = true;
|
||||
this.ex = ioEx;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean getDeleteSuccess() {
|
||||
return deleteSuccess;
|
||||
}
|
||||
|
||||
public boolean getExceptionEncounteredFlag() {
|
||||
return exceptionEncountered;
|
||||
}
|
||||
|
||||
public Exception getException() {
|
||||
return ex;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user