HADOOP-18340. deleteOnExit does not work with S3AFileSystem (#4608)
Contributed by Huaxiang Sun
This commit is contained in:
parent
c0bbdca97e
commit
e9509ac467
@ -32,12 +32,14 @@
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
@ -386,6 +388,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||||||
*/
|
*/
|
||||||
private ArnResource accessPoint;
|
private ArnResource accessPoint;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A cache of files that should be deleted when the FileSystem is closed
|
||||||
|
* or the JVM is exited.
|
||||||
|
*/
|
||||||
|
private final Set<Path> deleteOnExit = new TreeSet<>();
|
||||||
|
|
||||||
/** Add any deprecated keys. */
|
/** Add any deprecated keys. */
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private static void addDeprecatedKeys() {
|
private static void addDeprecatedKeys() {
|
||||||
@ -3064,6 +3072,24 @@ public void removeKeys(
|
|||||||
@AuditEntryPoint
|
@AuditEntryPoint
|
||||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||||
checkNotClosed();
|
checkNotClosed();
|
||||||
|
return deleteWithoutCloseCheck(f, recursive);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Same as delete(), except that it does not check if fs is closed.
|
||||||
|
*
|
||||||
|
* @param f the path to delete.
|
||||||
|
* @param recursive if path is a directory and set to
|
||||||
|
* true, the directory is deleted else throws an exception. In
|
||||||
|
* case of a file the recursive can be set to either true or false.
|
||||||
|
* @return true if the path existed and then was deleted; false if there
|
||||||
|
* was no path in the first place, or the corner cases of root path deletion
|
||||||
|
* have surfaced.
|
||||||
|
* @throws IOException due to inability to delete a directory or file.
|
||||||
|
*/
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
|
||||||
final Path path = qualify(f);
|
final Path path = qualify(f);
|
||||||
// span covers delete, getFileStatus, fake directory operations.
|
// span covers delete, getFileStatus, fake directory operations.
|
||||||
try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(),
|
try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(),
|
||||||
@ -3805,6 +3831,61 @@ UploadResult waitForUploadCompletion(String key, UploadInfo uploadInfo)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This override bypasses checking for existence.
|
||||||
|
*
|
||||||
|
* @param f the path to delete; this may be unqualified.
|
||||||
|
* @return true, always. * @param f the path to delete.
|
||||||
|
* @return true if deleteOnExit is successful, otherwise false.
|
||||||
|
* @throws IOException IO failure
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean deleteOnExit(Path f) throws IOException {
|
||||||
|
Path qualifedPath = makeQualified(f);
|
||||||
|
synchronized (deleteOnExit) {
|
||||||
|
deleteOnExit.add(qualifedPath);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel the scheduled deletion of the path when the FileSystem is closed.
|
||||||
|
* @param f the path to cancel deletion
|
||||||
|
* @return true if the path was found in the delete-on-exit list.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean cancelDeleteOnExit(Path f) {
|
||||||
|
Path qualifedPath = makeQualified(f);
|
||||||
|
synchronized (deleteOnExit) {
|
||||||
|
return deleteOnExit.remove(qualifedPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete all paths that were marked as delete-on-exit. This recursively
|
||||||
|
* deletes all files and directories in the specified paths. It does not
|
||||||
|
* check if file exists and filesystem is closed.
|
||||||
|
*
|
||||||
|
* The time to process this operation is {@code O(paths)}, with the actual
|
||||||
|
* time dependent on the time for existence and deletion operations to
|
||||||
|
* complete, successfully or not.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void processDeleteOnExit() {
|
||||||
|
synchronized (deleteOnExit) {
|
||||||
|
for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
|
||||||
|
Path path = iter.next();
|
||||||
|
try {
|
||||||
|
deleteWithoutCloseCheck(path, true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info("Ignoring failure to deleteOnExit for path {}", path);
|
||||||
|
LOG.debug("The exception for deleteOnExit is {}", e);
|
||||||
|
}
|
||||||
|
iter.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close the filesystem. This shuts down all transfers.
|
* Close the filesystem. This shuts down all transfers.
|
||||||
* @throws IOException IO problem
|
* @throws IOException IO problem
|
||||||
|
@ -0,0 +1,105 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test deleteOnExit for S3A.
|
||||||
|
* The following cases for deleteOnExit are tested:
|
||||||
|
* 1. A nonexist file, which is added to deleteOnExit set.
|
||||||
|
* 2. An existing file
|
||||||
|
* 3. A file is added to deleteOnExist set first, then created.
|
||||||
|
* 4. A directory with some files under it.
|
||||||
|
*/
|
||||||
|
public class ITestS3ADeleteOnExit extends AbstractS3ATestBase {
|
||||||
|
|
||||||
|
private static final String PARENT_DIR_PATH_STR = "testDeleteOnExitDir";
|
||||||
|
private static final String NON_EXIST_FILE_PATH_STR =
|
||||||
|
PARENT_DIR_PATH_STR + "/nonExistFile";
|
||||||
|
private static final String INORDER_FILE_PATH_STR =
|
||||||
|
PARENT_DIR_PATH_STR + "/inOrderFile";
|
||||||
|
private static final String OUT_OF_ORDER_FILE_PATH_STR =
|
||||||
|
PARENT_DIR_PATH_STR + "/outOfOrderFile";
|
||||||
|
private static final String SUBDIR_PATH_STR =
|
||||||
|
PARENT_DIR_PATH_STR + "/subDir";
|
||||||
|
private static final String FILE_UNDER_SUBDIR_PATH_STR =
|
||||||
|
SUBDIR_PATH_STR + "/subDirFile";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteOnExit() throws Exception {
|
||||||
|
FileSystem fs = getFileSystem();
|
||||||
|
|
||||||
|
// Get a new filesystem object which is same as fs.
|
||||||
|
FileSystem s3aFs = new S3AFileSystem();
|
||||||
|
s3aFs.initialize(fs.getUri(), fs.getConf());
|
||||||
|
Path nonExistFilePath = path(NON_EXIST_FILE_PATH_STR);
|
||||||
|
Path inOrderFilePath = path(INORDER_FILE_PATH_STR);
|
||||||
|
Path outOfOrderFilePath = path(OUT_OF_ORDER_FILE_PATH_STR);
|
||||||
|
Path subDirPath = path(SUBDIR_PATH_STR);
|
||||||
|
Path fileUnderSubDirPath = path(FILE_UNDER_SUBDIR_PATH_STR);
|
||||||
|
|
||||||
|
// 1. set up the test directory.
|
||||||
|
Path dir = path("testDeleteOnExitDir");
|
||||||
|
s3aFs.mkdirs(dir);
|
||||||
|
|
||||||
|
// 2. Add a nonexisting file to DeleteOnExit set.
|
||||||
|
s3aFs.deleteOnExit(nonExistFilePath);
|
||||||
|
assertPathDoesNotExist("File " + NON_EXIST_FILE_PATH_STR + " should not exist",
|
||||||
|
nonExistFilePath);
|
||||||
|
|
||||||
|
// 3. create a file and then add it to DeleteOnExit set.
|
||||||
|
byte[] data = dataset(16, 'a', 26);
|
||||||
|
createFile(s3aFs, inOrderFilePath, true, data);
|
||||||
|
assertPathExists("File " + INORDER_FILE_PATH_STR + " should exist", inOrderFilePath);
|
||||||
|
s3aFs.deleteOnExit(inOrderFilePath);
|
||||||
|
|
||||||
|
// 4. add a path to DeleteOnExit set first, then create it.
|
||||||
|
s3aFs.deleteOnExit(outOfOrderFilePath);
|
||||||
|
createFile(s3aFs, outOfOrderFilePath, true, data);
|
||||||
|
assertPathExists("File " + OUT_OF_ORDER_FILE_PATH_STR + " should exist",
|
||||||
|
outOfOrderFilePath);
|
||||||
|
|
||||||
|
// 5. create a subdirectory, a file under it, and add subdirectory DeleteOnExit set.
|
||||||
|
s3aFs.mkdirs(subDirPath);
|
||||||
|
s3aFs.deleteOnExit(subDirPath);
|
||||||
|
createFile(s3aFs, fileUnderSubDirPath, true, data);
|
||||||
|
assertPathExists("Directory " + SUBDIR_PATH_STR + " should exist", subDirPath);
|
||||||
|
assertPathExists("File " + FILE_UNDER_SUBDIR_PATH_STR + " should exist",
|
||||||
|
fileUnderSubDirPath);
|
||||||
|
|
||||||
|
s3aFs.close();
|
||||||
|
|
||||||
|
// After s3aFs is closed, make sure that all files/directories in deleteOnExit
|
||||||
|
// set are deleted.
|
||||||
|
assertPathDoesNotExist("File " + NON_EXIST_FILE_PATH_STR + " should not exist",
|
||||||
|
nonExistFilePath);
|
||||||
|
assertPathDoesNotExist("File " + INORDER_FILE_PATH_STR + " should not exist",
|
||||||
|
inOrderFilePath);
|
||||||
|
assertPathDoesNotExist("File " + OUT_OF_ORDER_FILE_PATH_STR + " should not exist",
|
||||||
|
outOfOrderFilePath);
|
||||||
|
assertPathDoesNotExist("Directory " + SUBDIR_PATH_STR + " should not exist",
|
||||||
|
subDirPath);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,98 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.ArgumentMatchers.argThat;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.AmazonS3;
|
||||||
|
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
||||||
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.ArgumentMatcher;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* deleteOnExit test for S3A.
|
||||||
|
*/
|
||||||
|
public class TestS3ADeleteOnExit extends AbstractS3AMockTest {
|
||||||
|
|
||||||
|
static class TestS3AFileSystem extends S3AFileSystem {
|
||||||
|
private int deleteOnDnExitCount;
|
||||||
|
|
||||||
|
public int getDeleteOnDnExitCount() {
|
||||||
|
return deleteOnDnExitCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean deleteOnExit(Path f) throws IOException {
|
||||||
|
deleteOnDnExitCount++;
|
||||||
|
return super.deleteOnExit(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is specifically designed for deleteOnExit processing.
|
||||||
|
// In this specific case, deleteWithoutCloseCheck() will only be called in the path of
|
||||||
|
// processDeleteOnExit.
|
||||||
|
@Override
|
||||||
|
protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
|
||||||
|
boolean result = super.deleteWithoutCloseCheck(f, recursive);
|
||||||
|
deleteOnDnExitCount--;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteOnExit() throws Exception {
|
||||||
|
Configuration conf = createConfiguration();
|
||||||
|
TestS3AFileSystem testFs = new TestS3AFileSystem();
|
||||||
|
URI uri = URI.create(FS_S3A + "://" + BUCKET);
|
||||||
|
// unset S3CSE property from config to avoid pathIOE.
|
||||||
|
conf.unset(Constants.S3_ENCRYPTION_ALGORITHM);
|
||||||
|
testFs.initialize(uri, conf);
|
||||||
|
AmazonS3 testS3 = testFs.getAmazonS3ClientForTesting("mocking");
|
||||||
|
|
||||||
|
Path path = new Path("/file");
|
||||||
|
String key = path.toUri().getPath().substring(1);
|
||||||
|
ObjectMetadata meta = new ObjectMetadata();
|
||||||
|
meta.setContentLength(1L);
|
||||||
|
meta.setLastModified(new Date(2L));
|
||||||
|
when(testS3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
|
||||||
|
.thenReturn(meta);
|
||||||
|
|
||||||
|
testFs.deleteOnExit(path);
|
||||||
|
testFs.close();
|
||||||
|
assertEquals(0, testFs.getDeleteOnDnExitCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
private ArgumentMatcher<GetObjectMetadataRequest> correctGetMetadataRequest(
|
||||||
|
String bucket, String key) {
|
||||||
|
return request -> request != null
|
||||||
|
&& request.getBucketName().equals(bucket)
|
||||||
|
&& request.getKey().equals(key);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user