HADOOP-14432. S3A copyFromLocalFile to be robust, tested. Contributed by Steve Loughran

This commit is contained in:
Mingliang Liu 2017-05-19 11:51:43 -07:00
parent 19482e0d09
commit 6672810eea
2 changed files with 175 additions and 7 deletions

View File

@ -1736,28 +1736,43 @@ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
* delSrc indicates if the source should be removed * delSrc indicates if the source should be removed
* @param delSrc whether to delete the src * @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file * @param overwrite whether to overwrite an existing file
* @param src path * @param src Source path: must be on local filesystem
* @param dst path * @param dst path
* @throws IOException IO problem * @throws IOException IO problem
* @throws FileAlreadyExistsException the destination file exists and * @throws FileAlreadyExistsException the destination file exists and
* overwrite==false * overwrite==false, or if the destination is a directory.
* @throws FileNotFoundException if the source file does not exit
* @throws AmazonClientException failure in the AWS SDK * @throws AmazonClientException failure in the AWS SDK
* @throws IllegalArgumentException if the source path is not on the local FS
*/ */
private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite, private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst) Path src, Path dst)
throws IOException, FileAlreadyExistsException, AmazonClientException { throws IOException, FileAlreadyExistsException, AmazonClientException {
incrementStatistic(INVOCATION_COPY_FROM_LOCAL_FILE); incrementStatistic(INVOCATION_COPY_FROM_LOCAL_FILE);
final String key = pathToKey(dst);
if (!overwrite && exists(dst)) {
throw new FileAlreadyExistsException(dst + " already exists");
}
LOG.debug("Copying local file from {} to {}", src, dst); LOG.debug("Copying local file from {} to {}", src, dst);
// Since we have a local file, we don't need to stream into a temporary file // Since we have a local file, we don't need to stream into a temporary file
LocalFileSystem local = getLocal(getConf()); LocalFileSystem local = getLocal(getConf());
File srcfile = local.pathToFile(src); File srcfile = local.pathToFile(src);
if (!srcfile.exists()) {
throw new FileNotFoundException("No file: " + src);
}
if (!srcfile.isFile()) {
throw new FileNotFoundException("Not a file: " + src);
}
try {
FileStatus status = getFileStatus(dst);
if (!status.isFile()) {
throw new FileAlreadyExistsException(dst + " exists and is not a file");
}
if (!overwrite) {
throw new FileAlreadyExistsException(dst + " already exists");
}
} catch (FileNotFoundException e) {
// no destination, all is well
}
final String key = pathToKey(dst);
final ObjectMetadata om = newObjectMetadata(srcfile.length()); final ObjectMetadata om = newObjectMetadata(srcfile.length());
PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile); PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
Upload up = putObject(putObjectRequest); Upload up = putObject(putObjectRequest);

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.Charset;
import org.junit.Test;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test {@link S3AFileSystem#copyFromLocalFile(boolean, boolean, Path, Path)}.
*/
public class ITestS3ACopyFromLocalFile extends AbstractS3ATestBase {
private static final Charset ASCII = Charsets.US_ASCII;
private File file;
@Override
public void teardown() throws Exception {
super.teardown();
if (file != null) {
file.delete();
}
}
@Test
public void testCopyEmptyFile() throws Throwable {
file = File.createTempFile("test", ".txt");
Path dest = upload(file, true);
assertPathExists("uploaded file", dest);
}
@Test
public void testCopyFile() throws Throwable {
String message = "hello";
file = createTempFile(message);
Path dest = upload(file, true);
assertPathExists("uploaded file not found", dest);
S3AFileSystem fs = getFileSystem();
S3AFileStatus status = fs.getFileStatus(dest);
assertEquals("File length of " + status,
message.getBytes(ASCII).length, status.getLen());
assertFileTextEquals(dest, message);
}
public void assertFileTextEquals(Path path, String expected)
throws IOException {
assertEquals("Wrong data in " + path,
expected, IOUtils.toString(getFileSystem().open(path), ASCII));
}
@Test
public void testCopyFileNoOverwrite() throws Throwable {
file = createTempFile("hello");
Path dest = upload(file, true);
intercept(FileAlreadyExistsException.class,
() -> upload(file, false));
}
@Test
public void testCopyFileOverwrite() throws Throwable {
file = createTempFile("hello");
Path dest = upload(file, true);
String updated = "updated";
FileUtils.write(file, updated, ASCII);
upload(file, true);
assertFileTextEquals(dest, updated);
}
@Test
public void testCopyFileNoOverwriteDirectory() throws Throwable {
file = createTempFile("hello");
Path dest = upload(file, true);
S3AFileSystem fs = getFileSystem();
fs.delete(dest, false);
fs.mkdirs(dest);
intercept(FileAlreadyExistsException.class,
() -> upload(file, true));
}
@Test
public void testCopyMissingFile() throws Throwable {
file = File.createTempFile("test", ".txt");
file.delete();
// first upload to create
intercept(FileNotFoundException.class, "No file",
() -> upload(file, true));
}
@Test
public void testCopyDirectoryFile() throws Throwable {
file = File.createTempFile("test", ".txt");
// first upload to create
intercept(FileNotFoundException.class, "Not a file",
() -> upload(file.getParentFile(), true));
}
@Test
public void testLocalFilesOnly() throws Throwable {
Path dst = path("testLocalFilesOnly");
intercept(IllegalArgumentException.class,
() -> {
getFileSystem().copyFromLocalFile(false, true, dst, dst);
return "copy successful";
});
}
public Path upload(File srcFile, boolean overwrite) throws IOException {
Path src = new Path(srcFile.toURI());
Path dst = path(srcFile.getName());
getFileSystem().copyFromLocalFile(false, overwrite, src, dst);
return dst;
}
/**
* Create a temp file with some text.
* @param text text for the file
* @return the file
* @throws IOException on a failure
*/
public File createTempFile(String text) throws IOException {
File f = File.createTempFile("test", ".txt");
FileUtils.write(f, text, ASCII);
return f;
}
}