From a218038960e905e6c9eae80e118575a7743cae7a Mon Sep 17 00:00:00 2001 From: Petre Bogdan Stolojan Date: Fri, 30 Jul 2021 19:42:08 +0100 Subject: [PATCH] HADOOP-17139 Re-enable optimized copyFromLocal implementation in S3AFileSystem (#3101) This work * Defines the behavior of FileSystem.copyFromLocal in filesystem.md * Implements a high performance implementation of copyFromLocalOperation for S3 * Adds a contract test for the operation: AbstractContractCopyFromLocalTest * Implements the contract tests for Local and S3A FileSystems Contributed by: Bogdan Stolojan --- .../java/org/apache/hadoop/fs/FileUtil.java | 3 + .../site/markdown/filesystem/filesystem.md | 106 ++++ .../hadoop/fs/TestLocalFSCopyFromLocal.java | 98 ++++ .../AbstractContractCopyFromLocalTest.java | 336 +++++++++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 128 +++-- .../fs/s3a/impl/CopyFromLocalOperation.java | 540 ++++++++++++++++++ .../fs/s3a/ITestS3ACopyFromLocalFile.java | 142 +---- 7 files changed, 1172 insertions(+), 181 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFSCopyFromLocal.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java index 63cbd6212b..6671bf1e50 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java @@ -524,6 +524,9 @@ private static Path checkDest(String srcName, FileSystem dstFS, Path dst, if (null != sdst) { if (sdst.isDirectory()) { if (null == srcName) { + if (overwrite) { + return dst; + } throw new PathIsDirectoryException(dst.toString()); } return checkDest(null, dstFS, new Path(dst, srcName), overwrite); diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index a5a35df30c..2eb0bc0719 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -1419,6 +1419,112 @@ operations related to the part of the file being truncated is undefined. +### `boolean copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)` + +The source file or directory at `src` is on the local disk and is copied into the file system at +destination `dst`. If the source must be deleted after the move then `delSrc` flag must be +set to TRUE. If destination already exists, and the destination contents must be overwritten +then `overwrite` flag must be set to TRUE. + +#### Preconditions +Source and destination must be different +```python +if src = dest : raise FileExistsException +``` + +Destination and source must not be descendants one another +```python +if isDescendant(src, dest) or isDescendant(dest, src) : raise IOException +``` + +The source file or directory must exist locally: +```python +if not exists(LocalFS, src) : raise FileNotFoundException +``` + +Directories cannot be copied into files regardless to what the overwrite flag is set to: + +```python +if isDir(LocalFS, src) and isFile(FS, dst) : raise PathExistsException +``` + +For all cases, except the one for which the above precondition throws, the overwrite flag must be +set to TRUE for the operation to succeed if destination exists. This will also overwrite any files + / directories at the destination: + +```python +if exists(FS, dst) and not overwrite : raise PathExistsException +``` + +#### Determining the final name of the copy +Given a base path on the source `base` and a child path `child` where `base` is in +`ancestors(child) + child`: + +```python +def final_name(base, child, dest): + is base = child: + return dest + else: + return dest + childElements(base, child) +``` + +#### Outcome where source is a file `isFile(LocalFS, src)` +For a file, data at destination becomes that of the source. All ancestors are directories. +```python +if isFile(LocalFS, src) and (not exists(FS, dest) or (exists(FS, dest) and overwrite)): + FS' = FS where: + FS'.Files[dest] = LocalFS.Files[src] + FS'.Directories = FS.Directories + ancestors(FS, dest) + LocalFS' = LocalFS where + not delSrc or (delSrc = true and delete(LocalFS, src, false)) +else if isFile(LocalFS, src) and isDir(FS, dest): + FS' = FS where: + let d = final_name(src, dest) + FS'.Files[d] = LocalFS.Files[src] + LocalFS' = LocalFS where: + not delSrc or (delSrc = true and delete(LocalFS, src, false)) +``` +There are no expectations that the file changes are atomic for both local `LocalFS` and remote `FS`. + +#### Outcome where source is a directory `isDir(LocalFS, src)` +```python +if isDir(LocalFS, src) and (isFile(FS, dest) or isFile(FS, dest + childElements(src))): + raise FileAlreadyExistsException +else if isDir(LocalFS, src): + if exists(FS, dest): + dest' = dest + childElements(src) + if exists(FS, dest') and not overwrite: + raise PathExistsException + else: + dest' = dest + + FS' = FS where: + forall c in descendants(LocalFS, src): + not exists(FS', final_name(c)) or overwrite + and forall c in descendants(LocalFS, src) where isDir(LocalFS, c): + FS'.Directories = FS'.Directories + (dest' + childElements(src, c)) + and forall c in descendants(LocalFS, src) where isFile(LocalFS, c): + FS'.Files[final_name(c, dest')] = LocalFS.Files[c] + LocalFS' = LocalFS where + not delSrc or (delSrc = true and delete(LocalFS, src, true)) +``` +There are no expectations of operation isolation / atomicity. +This means files can change in source or destination while the operation is executing. +No guarantees are made for the final state of the file or directory after a copy other than it is +best effort. E.g.: when copying a directory, one file can be moved from source to destination but +there's nothing stopping the new file at destination being updated while the copy operation is still +in place. + +#### Implementation + +The default HDFS implementation, is to recurse through each file and folder, found at `src`, and +copy them sequentially to their final destination (relative to `dst`). + +Object store based file systems should be mindful of what limitations arise from the above +implementation and could take advantage of parallel uploads and possible re-ordering of files copied +into the store to maximize throughput. + + ## interface `RemoteIterator` The `RemoteIterator` interface is used as a remote-access equivalent diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFSCopyFromLocal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFSCopyFromLocal.java new file mode 100644 index 0000000000..7acb39c6e1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFSCopyFromLocal.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.File; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.localfs.LocalFSContract; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +public class TestLocalFSCopyFromLocal extends AbstractContractCopyFromLocalTest { + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new LocalFSContract(conf); + } + + @Test + public void testDestinationFileIsToParentDirectory() throws Throwable { + describe("Source is a file and destination is its own parent directory"); + + File file = createTempFile("local"); + Path dest = new Path(file.getParentFile().toURI()); + Path src = new Path(file.toURI()); + + intercept(PathOperationException.class, + () -> getFileSystem().copyFromLocalFile( true, true, src, dest)); + } + + @Test + public void testDestinationDirectoryToSelf() throws Throwable { + describe("Source is a directory and it is copied into itself with " + + "delSrc flag set, destination must not exist"); + + File source = createTempDirectory("srcDir"); + Path dest = new Path(source.toURI()); + getFileSystem().copyFromLocalFile( true, true, dest, dest); + + assertPathDoesNotExist("Source found", dest); + } + + @Test + public void testSourceIntoDestinationSubDirectoryWithDelSrc() throws Throwable { + describe("Copying a parent folder inside a child folder with" + + " delSrc=TRUE"); + File parent = createTempDirectory("parent"); + File child = createTempDirectory(parent, "child"); + + Path src = new Path(parent.toURI()); + Path dest = new Path(child.toURI()); + getFileSystem().copyFromLocalFile(true, true, src, dest); + + assertPathDoesNotExist("Source found", src); + assertPathDoesNotExist("Destination found", dest); + } + + @Test + public void testSourceIntoDestinationSubDirectory() throws Throwable { + describe("Copying a parent folder inside a child folder with" + + " delSrc=FALSE"); + File parent = createTempDirectory("parent"); + File child = createTempDirectory(parent, "child"); + + Path src = new Path(parent.toURI()); + Path dest = new Path(child.toURI()); + getFileSystem().copyFromLocalFile(false, true, src, dest); + + Path recursiveParent = new Path(dest, parent.getName()); + Path recursiveChild = new Path(recursiveParent, child.getName()); + + // This definitely counts as interesting behaviour which needs documented + // Depending on the underlying system this can recurse 15+ times + recursiveParent = new Path(recursiveChild, parent.getName()); + recursiveChild = new Path(recursiveParent, child.getName()); + assertPathExists("Recursive parent not found", recursiveParent); + assertPathExists("Recursive child not found", recursiveChild); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java new file mode 100644 index 0000000000..e24eb7181e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; + +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathExistsException; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +public abstract class AbstractContractCopyFromLocalTest extends + AbstractFSContractTestBase { + + private static final Charset ASCII = StandardCharsets.US_ASCII; + private File file; + + @Override + public void teardown() throws Exception { + super.teardown(); + if (file != null) { + file.delete(); + } + } + + @Test + public void testCopyEmptyFile() throws Throwable { + file = File.createTempFile("test", ".txt"); + Path dest = copyFromLocal(file, true); + assertPathExists("uploaded file not found", dest); + } + + @Test + public void testCopyFile() throws Throwable { + String message = "hello"; + file = createTempFile(message); + Path dest = copyFromLocal(file, true); + + assertPathExists("uploaded file not found", dest); + assertTrue("source file deleted", Files.exists(file.toPath())); + + FileSystem fs = getFileSystem(); + FileStatus status = fs.getFileStatus(dest); + assertEquals("File length not equal " + status, + message.getBytes(ASCII).length, status.getLen()); + assertFileTextEquals(dest, message); + } + + @Test + public void testCopyFileNoOverwrite() throws Throwable { + file = createTempFile("hello"); + copyFromLocal(file, true); + intercept(PathExistsException.class, + () -> copyFromLocal(file, false)); + } + + @Test + public void testCopyFileOverwrite() throws Throwable { + file = createTempFile("hello"); + Path dest = copyFromLocal(file, true); + String updated = "updated"; + FileUtils.write(file, updated, ASCII); + copyFromLocal(file, true); + assertFileTextEquals(dest, updated); + } + + @Test + public void testCopyMissingFile() throws Throwable { + describe("Copying a file that's not there must fail."); + file = createTempFile("test"); + file.delete(); + // first upload to create + intercept(FileNotFoundException.class, "", + () -> copyFromLocal(file, true)); + } + + @Test + public void testSourceIsFileAndDelSrcTrue() throws Throwable { + describe("Source is a file delSrc flag is set to true"); + + file = createTempFile("test"); + copyFromLocal(file, false, true); + + assertFalse("Source file not deleted", Files.exists(file.toPath())); + } + + @Test + public void testSourceIsFileAndDestinationIsDirectory() throws Throwable { + describe("Source is a file and destination is a directory. " + + "File must be copied inside the directory."); + + file = createTempFile("test"); + Path source = new Path(file.toURI()); + FileSystem fs = getFileSystem(); + File dir = createTempDirectory("test"); + Path destination = fileToPath(dir); + + // Make sure there's nothing already existing at destination + fs.delete(destination, false); + mkdirs(destination); + fs.copyFromLocalFile(source, destination); + + Path expectedFile = path(dir.getName() + "/" + source.getName()); + assertPathExists("File not copied into directory", expectedFile); + } + + @Test + public void testSourceIsFileAndDestinationIsNonExistentDirectory() + throws Throwable { + describe("Source is a file and destination directory does not exist. " + + "Copy operation must still work."); + + file = createTempFile("test"); + Path source = new Path(file.toURI()); + FileSystem fs = getFileSystem(); + + File dir = createTempDirectory("test"); + Path destination = fileToPath(dir); + fs.delete(destination, false); + assertPathDoesNotExist("Destination not deleted", destination); + + fs.copyFromLocalFile(source, destination); + assertPathExists("Destination doesn't exist.", destination); + } + + @Test + public void testSrcIsDirWithFilesAndCopySuccessful() throws Throwable { + describe("Source is a directory with files, copy must copy all" + + " dir contents to destination"); + String firstChild = "childOne"; + String secondChild = "childTwo"; + File parent = createTempDirectory("parent"); + File root = parent.getParentFile(); + File childFile = createTempFile(parent, firstChild, firstChild); + File secondChildFile = createTempFile(parent, secondChild, secondChild); + + copyFromLocal(parent, false); + + assertPathExists("Parent directory not copied", fileToPath(parent)); + assertFileTextEquals(fileToPath(childFile, root), firstChild); + assertFileTextEquals(fileToPath(secondChildFile, root), secondChild); + } + + @Test + public void testSrcIsEmptyDirWithCopySuccessful() throws Throwable { + describe("Source is an empty directory, copy must succeed"); + File source = createTempDirectory("source"); + Path dest = copyFromLocal(source, false); + + assertPathExists("Empty directory not copied", dest); + } + + @Test + public void testSrcIsDirWithOverwriteOptions() throws Throwable { + describe("Source is a directory, destination exists and " + + "must be overwritten."); + + FileSystem fs = getFileSystem(); + File source = createTempDirectory("source"); + Path sourcePath = new Path(source.toURI()); + String contents = "test file"; + File child = createTempFile(source, "child", contents); + + Path dest = path(source.getName()).getParent(); + fs.copyFromLocalFile(sourcePath, dest); + intercept(PathExistsException.class, + () -> fs.copyFromLocalFile(false, false, + sourcePath, dest)); + + String updated = "updated contents"; + FileUtils.write(child, updated, ASCII); + fs.copyFromLocalFile(sourcePath, dest); + + assertPathExists("Parent directory not copied", fileToPath(source)); + assertFileTextEquals(fileToPath(child, source.getParentFile()), + updated); + } + + @Test + public void testSrcIsDirWithDelSrcOptions() throws Throwable { + describe("Source is a directory containing a file and delSrc flag is set" + + ", this must delete the source after the copy."); + File source = createTempDirectory("source"); + String contents = "child file"; + File child = createTempFile(source, "child", contents); + + copyFromLocal(source, false, true); + Path dest = fileToPath(child, source.getParentFile()); + + assertFalse("Directory not deleted", Files.exists(source.toPath())); + assertFileTextEquals(dest, contents); + } + + /* + * The following path is being created on disk and copied over + * /parent/ (directory) + * /parent/test1.txt + * /parent/child/test.txt + * /parent/secondChild/ (directory) + */ + @Test + public void testCopyTreeDirectoryWithoutDelete() throws Throwable { + File srcDir = createTempDirectory("parent"); + File childDir = createTempDirectory(srcDir, "child"); + File secondChild = createTempDirectory(srcDir, "secondChild"); + File parentFile = createTempFile(srcDir, "test1", ".txt"); + File childFile = createTempFile(childDir, "test2", ".txt"); + + copyFromLocal(srcDir, false, false); + File root = srcDir.getParentFile(); + + assertPathExists("Parent directory not found", + fileToPath(srcDir)); + assertPathExists("Child directory not found", + fileToPath(childDir, root)); + assertPathExists("Second child directory not found", + fileToPath(secondChild, root)); + assertPathExists("Parent file not found", + fileToPath(parentFile, root)); + assertPathExists("Child file not found", + fileToPath(childFile, root)); + } + + @Test + public void testCopyDirectoryWithDelete() throws Throwable { + java.nio.file.Path srcDir = Files.createTempDirectory("parent"); + Files.createTempFile(srcDir, "test1", ".txt"); + + Path src = new Path(srcDir.toUri()); + Path dst = path(srcDir.getFileName().toString()); + getFileSystem().copyFromLocalFile(true, true, src, dst); + + assertFalse("Source directory was not deleted", + Files.exists(srcDir)); + } + + @Test + public void testSourceIsDirectoryAndDestinationIsFile() throws Throwable { + describe("Source is a directory and destination is a file must fail"); + + File file = createTempFile("local"); + File source = createTempDirectory("srcDir"); + Path destination = copyFromLocal(file, false); + Path sourcePath = new Path(source.toURI()); + + intercept(FileAlreadyExistsException.class, + () -> getFileSystem().copyFromLocalFile(false, true, + sourcePath, destination)); + } + + protected Path fileToPath(File file) throws IOException { + return path(file.getName()); + } + + protected Path fileToPath(File file, File parent) throws IOException { + return path(parent + .toPath() + .relativize(file.toPath()) + .toString()); + } + + protected File createTempDirectory(String name) throws IOException { + return Files.createTempDirectory(name).toFile(); + } + + protected Path copyFromLocal(File srcFile, boolean overwrite) throws + IOException { + return copyFromLocal(srcFile, overwrite, false); + } + + protected Path copyFromLocal(File srcFile, boolean overwrite, boolean delSrc) + throws IOException { + Path src = new Path(srcFile.toURI()); + Path dst = path(srcFile.getName()); + getFileSystem().copyFromLocalFile(delSrc, overwrite, src, dst); + return dst; + } + + /** + * Create a temp file with some text. + * @param text text for the file + * @return the file + * @throws IOException on a failure + */ + protected File createTempFile(String text) throws IOException { + File f = File.createTempFile("test", ".txt"); + FileUtils.write(f, text, ASCII); + return f; + } + + protected File createTempFile(File parent, String name, String text) + throws IOException { + File f = File.createTempFile(name, ".txt", parent); + FileUtils.write(f, text, ASCII); + return f; + } + + protected File createTempDirectory(File parent, String name) + throws IOException { + return Files.createTempDirectory(parent.toPath(), name).toFile(); + } + + private void assertFileTextEquals(Path path, String expected) + throws IOException { + assertEquals("Wrong data in " + path, + expected, IOUtils.toString(getFileSystem().open(path), ASCII)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 255254983a..3c066f151e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -81,6 +81,7 @@ import com.amazonaws.event.ProgressListener; import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; +import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.fs.store.audit.ActiveThreadSpanSource; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -3838,73 +3839,82 @@ private boolean s3Exists(final Path path, final Set probes) @Override @AuditEntryPoint public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, - Path dst) throws IOException { + Path dst) throws IOException { checkNotClosed(); LOG.debug("Copying local file from {} to {}", src, dst); - trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> { - // innerCopyFromLocalFile(delSrc, overwrite, src, dst); - super.copyFromLocalFile(delSrc, overwrite, src, dst); - return null; - }); + trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, + () -> new CopyFromLocalOperation( + createStoreContext(), + src, + dst, + delSrc, + overwrite, + createCopyFromLocalCallbacks()).execute()); } - /** - * The src file is on the local disk. Add it to FS at - * the given dst name. - * - * This version doesn't need to create a temporary file to calculate the md5. - * Sadly this doesn't seem to be used by the shell cp :( - * - * HADOOP-15932: this method has been unwired from - * {@link #copyFromLocalFile(boolean, boolean, Path, Path)} until - * it is extended to list and copy whole directories. - * delSrc indicates if the source should be removed - * @param delSrc whether to delete the src - * @param overwrite whether to overwrite an existing file - * @param src Source path: must be on local filesystem - * @param dst path - * @throws IOException IO problem - * @throws FileAlreadyExistsException the destination file exists and - * overwrite==false, or if the destination is a directory. - * @throws FileNotFoundException if the source file does not exit - * @throws AmazonClientException failure in the AWS SDK - * @throws IllegalArgumentException if the source path is not on the local FS - */ - @Retries.RetryTranslated - private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite, - Path src, Path dst) - throws IOException, FileAlreadyExistsException, AmazonClientException { - LOG.debug("Copying local file from {} to {}", src, dst); - - // Since we have a local file, we don't need to stream into a temporary file + protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks + createCopyFromLocalCallbacks() throws IOException { LocalFileSystem local = getLocal(getConf()); - File srcfile = local.pathToFile(src); - if (!srcfile.exists()) { - throw new FileNotFoundException("No file: " + src); - } - if (!srcfile.isFile()) { - throw new FileNotFoundException("Not a file: " + src); + return new CopyFromLocalCallbacksImpl(local); + } + + protected class CopyFromLocalCallbacksImpl implements + CopyFromLocalOperation.CopyFromLocalOperationCallbacks { + private final LocalFileSystem local; + + private CopyFromLocalCallbacksImpl(LocalFileSystem local) { + this.local = local; } - try { - FileStatus status = innerGetFileStatus(dst, false, StatusProbeEnum.ALL); - if (!status.isFile()) { - throw new FileAlreadyExistsException(dst + " exists and is not a file"); - } - if (!overwrite) { - throw new FileAlreadyExistsException(dst + " already exists"); - } - } catch (FileNotFoundException e) { - // no destination, all is well + @Override + public RemoteIterator listLocalStatusIterator( + final Path path) throws IOException { + return local.listLocatedStatus(path); } - final String key = pathToKey(dst); - final ObjectMetadata om = newObjectMetadata(srcfile.length()); - Progressable progress = null; - PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile); - invoker.retry("copyFromLocalFile(" + src + ")", dst.toString(), true, - () -> executePut(putObjectRequest, progress)); - if (delSrc) { - local.delete(src, false); + + @Override + public File pathToLocalFile(Path path) { + return local.pathToFile(path); + } + + @Override + public boolean deleteLocal(Path path, boolean recursive) throws IOException { + return local.delete(path, recursive); + } + + @Override + public void copyLocalFileFromTo(File file, Path from, Path to) throws IOException { + trackDurationAndSpan( + OBJECT_PUT_REQUESTS, + to, + () -> { + final String key = pathToKey(to); + final ObjectMetadata om = newObjectMetadata(file.length()); + Progressable progress = null; + PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, file); + S3AFileSystem.this.invoker.retry( + "putObject(" + "" + ")", to.toString(), + true, + () -> executePut(putObjectRequest, progress)); + + return null; + }); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return S3AFileSystem.this.getFileStatus(f); + } + + @Override + public boolean createEmptyDir(Path path, StoreContext storeContext) + throws IOException { + return trackDuration(getDurationTrackerFactory(), + INVOCATION_MKDIRS.getSymbol(), + new MkdirOperation( + storeContext, + path, + createMkdirOperationCallbacks())); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java new file mode 100644 index 0000000000..0a665cd33f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; +import java.util.Stack; +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.collections.comparators.ReverseComparator; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; + +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; +import static org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan; + +/** + * Implementation of CopyFromLocalOperation. + *

+ * This operation copies a file or directory (recursively) from a local + * FS to an object store. Initially, this operation has been developed for + * S3 (s3a) interaction, however, there's minimal work needed for it to + * work with other stores. + *

+ *

How the uploading of files works:

+ *
    + *
  • all source files and directories are scanned through;
  • + *
  • the LARGEST_N_FILES start uploading;
  • + *
  • the remaining files are shuffled and uploaded;
  • + *
  • + * any remaining empty directory is uploaded too to preserve local + * tree structure. + *
  • + *
+ */ +public class CopyFromLocalOperation extends ExecutingStoreOperation { + + /** + * Largest N files to be uploaded first. + */ + private static final int LARGEST_N_FILES = 5; + + private static final Logger LOG = LoggerFactory.getLogger( + CopyFromLocalOperation.class); + + /** + * Callbacks to be used by this operation for external / IO actions. + */ + private final CopyFromLocalOperationCallbacks callbacks; + + /** + * Delete source after operation finishes. + */ + private final boolean deleteSource; + + /** + * Overwrite destination files / folders. + */ + private final boolean overwrite; + + /** + * Source path to file / directory. + */ + private final Path source; + + /** + * Async operations executor. + */ + private final ListeningExecutorService executor; + + /** + * Destination path. + */ + private Path destination; + + /** + * Destination file status. + */ + private FileStatus destStatus; + + public CopyFromLocalOperation( + final StoreContext storeContext, + Path source, + Path destination, + boolean deleteSource, + boolean overwrite, + CopyFromLocalOperationCallbacks callbacks) { + super(storeContext); + this.callbacks = callbacks; + this.deleteSource = deleteSource; + this.overwrite = overwrite; + this.source = source; + this.destination = destination; + + // Capacity of 1 is a safe default for now since transfer manager can also + // spawn threads when uploading bigger files. + this.executor = MoreExecutors.listeningDecorator( + storeContext.createThrottledExecutor(1) + ); + } + + /** + * Executes the {@link CopyFromLocalOperation}. + * + * @throws IOException - if there are any failures with upload or deletion + * of files. Check {@link CopyFromLocalOperationCallbacks} for specifics. + * @throws PathExistsException - if the path exists and no overwrite flag + * is set OR if the source is file and destination is a directory + */ + @Override + @Retries.RetryTranslated + public Void execute() + throws IOException, PathExistsException { + LOG.debug("Copying local file from {} to {}", source, destination); + File sourceFile = callbacks.pathToLocalFile(source); + updateDestStatus(destination); + + // Handles bar/ -> foo/ => foo/bar and bar/ -> foo/bar/ => foo/bar/bar + if (getDestStatus().isPresent() && getDestStatus().get().isDirectory() + && sourceFile.isDirectory()) { + destination = new Path(destination, sourceFile.getName()); + LOG.debug("Destination updated to: {}", destination); + updateDestStatus(destination); + } + + checkSource(sourceFile); + checkDestination(destination, sourceFile, overwrite); + uploadSourceFromFS(); + + if (deleteSource) { + callbacks.deleteLocal(source, true); + } + + return null; + } + + /** + * Does a {@link CopyFromLocalOperationCallbacks#getFileStatus(Path)} + * operation on the provided destination and updates the internal status of + * destStatus field. + * + * @param dest - destination Path + * @throws IOException if getFileStatus fails + */ + private void updateDestStatus(Path dest) throws IOException { + try { + destStatus = callbacks.getFileStatus(dest); + } catch (FileNotFoundException e) { + destStatus = null; + } + } + + /** + * Starts async upload operations for files. Creating an empty directory + * classifies as a "file upload". + * + * Check {@link CopyFromLocalOperation} for details on the order of + * operations. + * + * @throws IOException - if listing or upload fail + */ + private void uploadSourceFromFS() throws IOException { + RemoteIterator localFiles = listFilesAndDirs(source); + List> activeOps = new ArrayList<>(); + + // After all files are traversed, this set will contain only emptyDirs + Set emptyDirs = new HashSet<>(); + List entries = new ArrayList<>(); + while (localFiles.hasNext()) { + LocatedFileStatus sourceFile = localFiles.next(); + Path sourceFilePath = sourceFile.getPath(); + + // Directory containing this file / directory isn't empty + emptyDirs.remove(sourceFilePath.getParent()); + + if (sourceFile.isDirectory()) { + emptyDirs.add(sourceFilePath); + continue; + } + + Path destPath = getFinalPath(sourceFilePath); + // UploadEntries: have a destination path, a file size + entries.add(new UploadEntry( + sourceFilePath, + destPath, + sourceFile.getLen())); + } + + if (localFiles instanceof Closeable) { + IOUtils.closeStream((Closeable) localFiles); + } + + // Sort all upload entries based on size + entries.sort(new ReverseComparator(new UploadEntry.SizeComparator())); + + // Take only top most N entries and upload + final int sortedUploadsCount = Math.min(LARGEST_N_FILES, entries.size()); + List markedForUpload = new ArrayList<>(); + + for (int uploadNo = 0; uploadNo < sortedUploadsCount; uploadNo++) { + UploadEntry uploadEntry = entries.get(uploadNo); + File file = callbacks.pathToLocalFile(uploadEntry.source); + activeOps.add(submitUpload(file, uploadEntry)); + markedForUpload.add(uploadEntry); + } + + // No files found, it's empty source directory + if (entries.isEmpty()) { + emptyDirs.add(source); + } + + // Shuffle all remaining entries and upload them + entries.removeAll(markedForUpload); + Collections.shuffle(entries); + for (UploadEntry uploadEntry : entries) { + File file = callbacks.pathToLocalFile(uploadEntry.source); + activeOps.add(submitUpload(file, uploadEntry)); + } + + for (Path emptyDir : emptyDirs) { + Path emptyDirPath = getFinalPath(emptyDir); + activeOps.add(submitCreateEmptyDir(emptyDirPath)); + } + + waitForCompletion(activeOps); + } + + /** + * Async call to create an empty directory. + * + * @param dir directory path + * @return the submitted future + */ + private CompletableFuture submitCreateEmptyDir(Path dir) { + return submit(executor, callableWithinAuditSpan( + getAuditSpan(), () -> { + callbacks.createEmptyDir(dir, getStoreContext()); + return null; + } + )); + } + + /** + * Async call to upload a file. + * + * @param file - File to be uploaded + * @param uploadEntry - Upload entry holding the source and destination + * @return the submitted future + */ + private CompletableFuture submitUpload( + File file, + UploadEntry uploadEntry) { + return submit(executor, callableWithinAuditSpan( + getAuditSpan(), () -> { + callbacks.copyLocalFileFromTo( + file, + uploadEntry.source, + uploadEntry.destination); + return null; + } + )); + } + + /** + * Checks the source before upload starts. + * + * @param src - Source file + * @throws FileNotFoundException - if the file isn't found + */ + private void checkSource(File src) + throws FileNotFoundException { + if (!src.exists()) { + throw new FileNotFoundException("No file: " + src.getPath()); + } + } + + /** + * Check the destination path and make sure it's compatible with the source, + * i.e. source and destination are both files / directories. + * + * @param dest - Destination path + * @param src - Source file + * @param overwrite - Should source overwrite destination + * @throws PathExistsException - If the destination path exists and no + * overwrite flag is set + * @throws FileAlreadyExistsException - If source is file and destination is path + */ + private void checkDestination( + Path dest, + File src, + boolean overwrite) throws PathExistsException, + FileAlreadyExistsException { + if (!getDestStatus().isPresent()) { + return; + } + + if (src.isDirectory() && getDestStatus().get().isFile()) { + throw new FileAlreadyExistsException( + "Source '" + src.getPath() + "' is directory and " + + "destination '" + dest + "' is file"); + } + + if (!overwrite) { + throw new PathExistsException(dest + " already exists"); + } + } + + /** + * Get the final path of a source file with regards to its destination. + * + * @param src - source path + * @return - the final path for the source file to be uploaded to + * @throws PathIOException - if a relative path can't be created + */ + private Path getFinalPath(Path src) throws PathIOException { + URI currentSrcUri = src.toUri(); + URI relativeSrcUri = source.toUri().relativize(currentSrcUri); + if (relativeSrcUri.equals(currentSrcUri)) { + throw new PathIOException("Cannot get relative path for URI:" + + relativeSrcUri); + } + + Optional status = getDestStatus(); + if (!relativeSrcUri.getPath().isEmpty()) { + return new Path(destination, relativeSrcUri.getPath()); + } else if (status.isPresent() && status.get().isDirectory()) { + // file to dir + return new Path(destination, src.getName()); + } else { + // file to file + return destination; + } + } + + private Optional getDestStatus() { + return Optional.ofNullable(destStatus); + } + + /** + * {@link RemoteIterator} which lists all of the files and directories for + * a given path. It's strikingly similar to + * {@link org.apache.hadoop.fs.LocalFileSystem#listFiles(Path, boolean)} + * however with the small addition that it includes directories. + * + * @param path - Path to list files and directories from + * @return - an iterator + * @throws IOException - if listing of a path file fails + */ + private RemoteIterator listFilesAndDirs(Path path) + throws IOException { + return new RemoteIterator() { + private final Stack> iterators = + new Stack<>(); + private RemoteIterator current = + callbacks.listLocalStatusIterator(path); + private LocatedFileStatus curFile; + + @Override + public boolean hasNext() throws IOException { + while (curFile == null) { + if (current.hasNext()) { + handleFileStat(current.next()); + } else if (!iterators.empty()) { + current = iterators.pop(); + } else { + return false; + } + } + return true; + } + + /** + * Process the input stat. + * If it is a file or directory return the file stat. + * If it is a directory, traverse the directory; + * @param stat input status + * @throws IOException if any IO error occurs + */ + private void handleFileStat(LocatedFileStatus stat) + throws IOException { + if (stat.isFile()) { // file + curFile = stat; + } else { // directory + curFile = stat; + iterators.push(current); + current = callbacks.listLocalStatusIterator(stat.getPath()); + } + } + + @Override + public LocatedFileStatus next() throws IOException { + if (hasNext()) { + LocatedFileStatus result = curFile; + curFile = null; + return result; + } + throw new NoSuchElementException("No more entry in " + + path); + } + }; + } + + /** + *

Represents an entry for a file to be moved.

+ *

+ * Helpful with sorting files by their size and keeping track of path + * information for the upload. + *

+ */ + private static final class UploadEntry { + private final Path source; + private final Path destination; + private final long size; + + private UploadEntry(Path source, Path destination, long size) { + this.source = source; + this.destination = destination; + this.size = size; + } + + /** + * Compares {@link UploadEntry} objects and produces DESC ordering. + */ + static class SizeComparator implements Comparator, + Serializable { + @Override + public int compare(UploadEntry entry1, UploadEntry entry2) { + return Long.compare(entry1.size, entry2.size); + } + } + } + + /** + * Define the contract for {@link CopyFromLocalOperation} to interact + * with any external resources. + */ + public interface CopyFromLocalOperationCallbacks { + /** + * List all entries (files AND directories) for a path. + * + * @param path - path to list + * @return an iterator for all entries + * @throws IOException - for any failure + */ + RemoteIterator listLocalStatusIterator(Path path) + throws IOException; + + /** + * Get the file status for a path. + * + * @param path - target path + * @return FileStatus + * @throws IOException - for any failure + */ + FileStatus getFileStatus(Path path) throws IOException; + + /** + * Get the file from a path. + * + * @param path - target path + * @return file at path + */ + File pathToLocalFile(Path path); + + /** + * Delete file / directory at path. + * + * @param path - target path + * @param recursive - recursive deletion + * @return boolean result of operation + * @throws IOException for any failure + */ + boolean deleteLocal(Path path, boolean recursive) throws IOException; + + /** + * Copy / Upload a file from a source path to a destination path. + * + * @param file - target file + * @param source - source path + * @param destination - destination path + * @throws IOException for any failure + */ + void copyLocalFileFromTo( + File file, + Path source, + Path destination) throws IOException; + + /** + * Create empty directory at path. Most likely an upload operation. + * + * @param path - target path + * @param storeContext - store context + * @return boolean result of operation + * @throws IOException for any failure + */ + boolean createEmptyDir(Path path, StoreContext storeContext) + throws IOException; + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java index 668e129d57..dfac771dd7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java @@ -19,143 +19,41 @@ package org.apache.hadoop.fs.s3a; import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import org.junit.Ignore; -import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.s3a.S3AContract; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathExistsException; +import org.junit.Test; import static org.apache.hadoop.test.LambdaTestUtils.intercept; -/** - * Test {@link S3AFileSystem#copyFromLocalFile(boolean, boolean, Path, Path)}. - * Some of the tests have been disabled pending a fix for HADOOP-15932 and - * recursive directory copying; the test cases themselves may be obsolete. - */ -public class ITestS3ACopyFromLocalFile extends AbstractS3ATestBase { - private static final Charset ASCII = StandardCharsets.US_ASCII; - - private File file; +public class ITestS3ACopyFromLocalFile extends + AbstractContractCopyFromLocalTest { @Override - public void teardown() throws Exception { - super.teardown(); - if (file != null) { - file.delete(); - } + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); } - @Test - public void testCopyEmptyFile() throws Throwable { - file = File.createTempFile("test", ".txt"); - Path dest = upload(file, true); - assertPathExists("uploaded file", dest); - } - - @Test - public void testCopyFile() throws Throwable { - String message = "hello"; - file = createTempFile(message); - Path dest = upload(file, true); - assertPathExists("uploaded file not found", dest); - S3AFileSystem fs = getFileSystem(); - FileStatus status = fs.getFileStatus(dest); - assertEquals("File length of " + status, - message.getBytes(ASCII).length, status.getLen()); - assertFileTextEquals(dest, message); - } - - public void assertFileTextEquals(Path path, String expected) - throws IOException { - assertEquals("Wrong data in " + path, - expected, IOUtils.toString(getFileSystem().open(path), ASCII)); - } - - @Test - public void testCopyFileNoOverwrite() throws Throwable { - file = createTempFile("hello"); - Path dest = upload(file, true); - // HADOOP-15932: the exception type changes here - intercept(PathExistsException.class, - () -> upload(file, false)); - } - - @Test - public void testCopyFileOverwrite() throws Throwable { - file = createTempFile("hello"); - Path dest = upload(file, true); - String updated = "updated"; - FileUtils.write(file, updated, ASCII); - upload(file, true); - assertFileTextEquals(dest, updated); - } - - @Test - @Ignore("HADOOP-15932") - public void testCopyFileNoOverwriteDirectory() throws Throwable { - file = createTempFile("hello"); - Path dest = upload(file, true); - S3AFileSystem fs = getFileSystem(); - fs.delete(dest, false); - fs.mkdirs(dest); - intercept(FileAlreadyExistsException.class, - () -> upload(file, true)); - } - - @Test - public void testCopyMissingFile() throws Throwable { - file = File.createTempFile("test", ".txt"); - file.delete(); - // first upload to create - intercept(FileNotFoundException.class, "", - () -> upload(file, true)); - } - - @Test - @Ignore("HADOOP-15932") - public void testCopyDirectoryFile() throws Throwable { - file = File.createTempFile("test", ".txt"); - // first upload to create - intercept(FileNotFoundException.class, "Not a file", - () -> upload(file.getParentFile(), true)); - } - - @Test public void testLocalFilesOnly() throws Throwable { - Path dst = path("testLocalFilesOnly"); + describe("Copying into other file systems must fail"); + Path dest = fileToPath(createTempDirectory("someDir")); + intercept(IllegalArgumentException.class, - () -> { - getFileSystem().copyFromLocalFile(false, true, dst, dst); - return "copy successful"; - }); + () -> getFileSystem().copyFromLocalFile(false, true, dest, dest)); } - public Path upload(File srcFile, boolean overwrite) throws IOException { - Path src = new Path(srcFile.toURI()); - Path dst = path(srcFile.getName()); - getFileSystem().copyFromLocalFile(false, overwrite, src, dst); - return dst; - } + @Test + public void testOnlyFromLocal() throws Throwable { + describe("Copying must be from a local file system"); + File source = createTempFile("someFile"); + Path dest = copyFromLocal(source, true); - /** - * Create a temp file with some text. - * @param text text for the file - * @return the file - * @throws IOException on a failure - */ - public File createTempFile(String text) throws IOException { - File f = File.createTempFile("test", ".txt"); - FileUtils.write(f, text, ASCII); - return f; + intercept(IllegalArgumentException.class, + () -> getFileSystem().copyFromLocalFile(true, true, dest, dest)); } }