HADOOP-10714. AmazonS3Client.deleteObjects() need to be limited to 1000 entries per call. Contributed by Juan Yu.
This commit is contained in:
parent
395275af86
commit
6ba52d88ec
1
.gitignore
vendored
1
.gitignore
vendored
@ -21,3 +21,4 @@ hadoop-common-project/hadoop-common/src/test/resources/contract-test-options.xml
|
||||
hadoop-tools/hadoop-openstack/src/test/resources/contract-test-options.xml
|
||||
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.toolbox
|
||||
yarnregistry.pdf
|
||||
hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml
|
||||
|
@ -406,6 +406,9 @@ Release 2.7.0 - UNRELEASED
|
||||
HADOOP-11267. TestSecurityUtil fails when run with JDK8 because of empty
|
||||
principal names. (Stephen Chu via wheat9)
|
||||
|
||||
HADOOP-10714. AmazonS3Client.deleteObjects() need to be limited to 1000
|
||||
entries per call. (Juan Yu via atm)
|
||||
|
||||
Release 2.6.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -28,53 +28,6 @@ These filesystem bindings must be defined in an XML configuration file, usually
|
||||
`hadoop-common-project/hadoop-common/src/test/resources/contract-test-options.xml`.
|
||||
This file is excluded should not be checked in.
|
||||
|
||||
### s3://
|
||||
|
||||
In `contract-test-options.xml`, the filesystem name must be defined in the property `fs.contract.test.fs.s3`. The standard configuration options to define the S3 authentication details must also be provided.
|
||||
|
||||
Example:
|
||||
|
||||
<configuration>
|
||||
<property>
|
||||
<name>fs.contract.test.fs.s3</name>
|
||||
<value>s3://tests3hdfs/</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3.awsAccessKeyId</name>
|
||||
<value>DONOTPCOMMITTHISKEYTOSCM</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3.awsSecretAccessKey</name>
|
||||
<value>DONOTEVERSHARETHISSECRETKEY!</value>
|
||||
</property>
|
||||
</configuration>
|
||||
|
||||
### s3n://
|
||||
|
||||
|
||||
In `contract-test-options.xml`, the filesystem name must be defined in the property `fs.contract.test.fs.s3n`. The standard configuration options to define the S3N authentication details muse also be provided.
|
||||
|
||||
Example:
|
||||
|
||||
|
||||
<configuration>
|
||||
<property>
|
||||
<name>fs.contract.test.fs.s3n</name>
|
||||
<value>s3n://tests3contract</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3n.awsAccessKeyId</name>
|
||||
<value>DONOTPCOMMITTHISKEYTOSCM</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3n.awsSecretAccessKey</name>
|
||||
<value>DONOTEVERSHARETHISSECRETKEY!</value>
|
||||
</property>
|
||||
|
||||
### ftp://
|
||||
|
||||
|
||||
|
@ -484,10 +484,10 @@ protected void createFile(Path path) throws IOException {
|
||||
out.close();
|
||||
}
|
||||
|
||||
private void rename(Path src, Path dst, boolean renameSucceeded,
|
||||
protected void rename(Path src, Path dst, boolean renameSucceeded,
|
||||
boolean srcExists, boolean dstExists) throws IOException {
|
||||
assertEquals("mv " + src + " " + dst,renameSucceeded, fs.rename(src, dst));
|
||||
assertEquals("Source exists: " + src, srcExists, fs.exists(src));
|
||||
assertEquals("Rename result", renameSucceeded, fs.rename(src, dst));
|
||||
assertEquals("Source exists", srcExists, fs.exists(src));
|
||||
assertEquals("Destination exists" + dst, dstExists, fs.exists(dst));
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.fs.contract;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -94,4 +95,30 @@ public void testDeleteNonEmptyDirRecursive() throws Throwable {
|
||||
ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "not deleted", file);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteDeepEmptyDir() throws Throwable {
|
||||
mkdirs(path("testDeleteDeepEmptyDir/d1/d2/d3/d4"));
|
||||
assertDeleted(path("testDeleteDeepEmptyDir/d1/d2/d3"), true);
|
||||
|
||||
FileSystem fs = getFileSystem();
|
||||
ContractTestUtils.assertPathDoesNotExist(fs,
|
||||
"not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3/d4"));
|
||||
ContractTestUtils.assertPathDoesNotExist(fs,
|
||||
"not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3"));
|
||||
ContractTestUtils.assertPathExists(fs, "parent dir is deleted",
|
||||
path("testDeleteDeepEmptyDir/d1/d2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteSingleFile() throws Throwable {
|
||||
// Test delete of just a file
|
||||
Path path = path("testDeleteSingleFile/d1/d2");
|
||||
mkdirs(path);
|
||||
Path file = new Path(path, "childfile");
|
||||
ContractTestUtils.writeTextFile(getFileSystem(), file,
|
||||
"single file to be deleted.", true);
|
||||
ContractTestUtils.assertPathExists(getFileSystem(),
|
||||
"single file not created", file);
|
||||
assertDeleted(file, false);
|
||||
}
|
||||
}
|
||||
|
@ -112,4 +112,23 @@ public void testMkdirOverParentFile() throws Throwable {
|
||||
assertPathExists("mkdir failed", path);
|
||||
assertDeleted(path, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMkdirSlashHandling() throws Throwable {
|
||||
describe("verify mkdir slash handling");
|
||||
FileSystem fs = getFileSystem();
|
||||
|
||||
// No trailing slash
|
||||
assertTrue(fs.mkdirs(path("testmkdir/a")));
|
||||
assertPathExists("mkdir without trailing slash failed",
|
||||
path("testmkdir/a"));
|
||||
|
||||
// With trailing slash
|
||||
assertTrue(fs.mkdirs(path("testmkdir/b/")));
|
||||
assertPathExists("mkdir with trailing slash failed", path("testmkdir/b/"));
|
||||
|
||||
// Mismatched slashes
|
||||
assertPathExists("check path existence without trailing slash failed",
|
||||
path("testmkdir/b"));
|
||||
}
|
||||
}
|
||||
|
@ -182,4 +182,45 @@ public void testRenameFileNonexistentDir() throws Throwable {
|
||||
assertFalse(renameCreatesDestDirs);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRenameWithNonEmptySubDir() throws Throwable {
|
||||
final Path renameTestDir = path("testRenameWithNonEmptySubDir");
|
||||
final Path srcDir = new Path(renameTestDir, "src1");
|
||||
final Path srcSubDir = new Path(srcDir, "sub");
|
||||
final Path finalDir = new Path(renameTestDir, "dest");
|
||||
FileSystem fs = getFileSystem();
|
||||
boolean renameRemoveEmptyDest = isSupported(RENAME_REMOVE_DEST_IF_EMPTY_DIR);
|
||||
ContractTestUtils.rm(fs, renameTestDir, true, false);
|
||||
|
||||
fs.mkdirs(srcDir);
|
||||
fs.mkdirs(finalDir);
|
||||
ContractTestUtils.writeTextFile(fs, new Path(srcDir, "source.txt"),
|
||||
"this is the file in src dir", false);
|
||||
ContractTestUtils.writeTextFile(fs, new Path(srcSubDir, "subfile.txt"),
|
||||
"this is the file in src/sub dir", false);
|
||||
|
||||
ContractTestUtils.assertPathExists(fs, "not created in src dir",
|
||||
new Path(srcDir, "source.txt"));
|
||||
ContractTestUtils.assertPathExists(fs, "not created in src/sub dir",
|
||||
new Path(srcSubDir, "subfile.txt"));
|
||||
|
||||
fs.rename(srcDir, finalDir);
|
||||
// Accept both POSIX rename behavior and CLI rename behavior
|
||||
if (renameRemoveEmptyDest) {
|
||||
// POSIX rename behavior
|
||||
ContractTestUtils.assertPathExists(fs, "not renamed into dest dir",
|
||||
new Path(finalDir, "source.txt"));
|
||||
ContractTestUtils.assertPathExists(fs, "not renamed into dest/sub dir",
|
||||
new Path(finalDir, "sub/subfile.txt"));
|
||||
} else {
|
||||
// CLI rename behavior
|
||||
ContractTestUtils.assertPathExists(fs, "not renamed into dest dir",
|
||||
new Path(finalDir, "src1/source.txt"));
|
||||
ContractTestUtils.assertPathExists(fs, "not renamed into dest/sub dir",
|
||||
new Path(finalDir, "src1/sub/subfile.txt"));
|
||||
}
|
||||
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted",
|
||||
new Path(srcDir, "source.txt"));
|
||||
}
|
||||
}
|
||||
|
@ -79,6 +79,13 @@ public interface ContractOptions {
|
||||
String RENAME_RETURNS_FALSE_IF_SOURCE_MISSING =
|
||||
"rename-returns-false-if-source-missing";
|
||||
|
||||
/**
|
||||
* Flag to indicate that the FS remove dest first if it is an empty directory
|
||||
* mean the FS honors POSIX rename behavior.
|
||||
* @{value}
|
||||
*/
|
||||
String RENAME_REMOVE_DEST_IF_EMPTY_DIR = "rename-remove-dest-if-empty-dir";
|
||||
|
||||
/**
|
||||
* Flag to indicate that append is supported
|
||||
* @{value}
|
||||
|
@ -31,8 +31,11 @@
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Utilities used across test cases
|
||||
@ -44,6 +47,13 @@ public class ContractTestUtils extends Assert {
|
||||
|
||||
public static final String IO_FILE_BUFFER_SIZE = "io.file.buffer.size";
|
||||
|
||||
// For scale testing, we can repeatedly write small chunk data to generate
|
||||
// a large file.
|
||||
public static final String IO_CHUNK_BUFFER_SIZE = "io.chunk.buffer.size";
|
||||
public static final int DEFAULT_IO_CHUNK_BUFFER_SIZE = 128;
|
||||
public static final String IO_CHUNK_MODULUS_SIZE = "io.chunk.modulus.size";
|
||||
public static final int DEFAULT_IO_CHUNK_MODULUS_SIZE = 128;
|
||||
|
||||
/**
|
||||
* Assert that a property in the property set matches the expected value
|
||||
* @param props property set
|
||||
@ -755,5 +765,134 @@ public static void validateFileContent(byte[] concat, byte[][] bytes) {
|
||||
mismatch);
|
||||
}
|
||||
|
||||
/**
|
||||
* Receives test data from the given input file and checks the size of the
|
||||
* data as well as the pattern inside the received data.
|
||||
*
|
||||
* @param fs FileSystem
|
||||
* @param path Input file to be checked
|
||||
* @param expectedSize the expected size of the data to be read from the
|
||||
* input file in bytes
|
||||
* @param bufferLen Pattern length
|
||||
* @param modulus Pattern modulus
|
||||
* @throws IOException
|
||||
* thrown if an error occurs while reading the data
|
||||
*/
|
||||
public static void verifyReceivedData(FileSystem fs, Path path,
|
||||
final long expectedSize,
|
||||
final int bufferLen,
|
||||
final int modulus) throws IOException {
|
||||
final byte[] testBuffer = new byte[bufferLen];
|
||||
|
||||
long totalBytesRead = 0;
|
||||
int nextExpectedNumber = 0;
|
||||
final InputStream inputStream = fs.open(path);
|
||||
try {
|
||||
while (true) {
|
||||
final int bytesRead = inputStream.read(testBuffer);
|
||||
if (bytesRead < 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
totalBytesRead += bytesRead;
|
||||
|
||||
for (int i = 0; i < bytesRead; ++i) {
|
||||
if (testBuffer[i] != nextExpectedNumber) {
|
||||
throw new IOException("Read number " + testBuffer[i]
|
||||
+ " but expected " + nextExpectedNumber);
|
||||
}
|
||||
|
||||
++nextExpectedNumber;
|
||||
|
||||
if (nextExpectedNumber == modulus) {
|
||||
nextExpectedNumber = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (totalBytesRead != expectedSize) {
|
||||
throw new IOException("Expected to read " + expectedSize +
|
||||
" bytes but only received " + totalBytesRead);
|
||||
}
|
||||
} finally {
|
||||
inputStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates test data of the given size according to some specific pattern
|
||||
* and writes it to the provided output file.
|
||||
*
|
||||
* @param fs FileSystem
|
||||
* @param path Test file to be generated
|
||||
* @param size The size of the test data to be generated in bytes
|
||||
* @param bufferLen Pattern length
|
||||
* @param modulus Pattern modulus
|
||||
* @throws IOException
|
||||
* thrown if an error occurs while writing the data
|
||||
*/
|
||||
public static long generateTestFile(FileSystem fs, Path path,
|
||||
final long size,
|
||||
final int bufferLen,
|
||||
final int modulus) throws IOException {
|
||||
final byte[] testBuffer = new byte[bufferLen];
|
||||
for (int i = 0; i < testBuffer.length; ++i) {
|
||||
testBuffer[i] = (byte) (i % modulus);
|
||||
}
|
||||
|
||||
final OutputStream outputStream = fs.create(path, false);
|
||||
long bytesWritten = 0;
|
||||
try {
|
||||
while (bytesWritten < size) {
|
||||
final long diff = size - bytesWritten;
|
||||
if (diff < testBuffer.length) {
|
||||
outputStream.write(testBuffer, 0, (int) diff);
|
||||
bytesWritten += diff;
|
||||
} else {
|
||||
outputStream.write(testBuffer);
|
||||
bytesWritten += testBuffer.length;
|
||||
}
|
||||
}
|
||||
|
||||
return bytesWritten;
|
||||
} finally {
|
||||
outputStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates and reads a file with the given size. The test file is generated
|
||||
* according to a specific pattern so it can be easily verified even if it's
|
||||
* a multi-GB one.
|
||||
* During the read phase the incoming data stream is also checked against
|
||||
* this pattern.
|
||||
*
|
||||
* @param fs FileSystem
|
||||
* @param parent Test file parent dir path
|
||||
* @throws IOException
|
||||
* thrown if an I/O error occurs while writing or reading the test file
|
||||
*/
|
||||
public static void createAndVerifyFile(FileSystem fs, Path parent, final long fileSize)
|
||||
throws IOException {
|
||||
int testBufferSize = fs.getConf()
|
||||
.getInt(IO_CHUNK_BUFFER_SIZE, DEFAULT_IO_CHUNK_BUFFER_SIZE);
|
||||
int modulus = fs.getConf()
|
||||
.getInt(IO_CHUNK_MODULUS_SIZE, DEFAULT_IO_CHUNK_MODULUS_SIZE);
|
||||
|
||||
final String objectName = UUID.randomUUID().toString();
|
||||
final Path objectPath = new Path(parent, objectName);
|
||||
|
||||
// Write test file in a specific pattern
|
||||
assertEquals(fileSize,
|
||||
generateTestFile(fs, objectPath, fileSize, testBufferSize, modulus));
|
||||
assertPathExists(fs, "not created successful", objectPath);
|
||||
|
||||
// Now read the same file back and verify its content
|
||||
try {
|
||||
verifyReceivedData(fs, objectPath, fileSize, testBufferSize, modulus);
|
||||
} finally {
|
||||
// Delete test file
|
||||
fs.delete(objectPath, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -57,6 +57,10 @@ case sensitivity and permission options are determined at run time from OS type
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.rename-remove-dest-if-empty-dir</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<!--
|
||||
checksummed filesystems do not support append; see HADOOP-4292
|
||||
|
@ -83,6 +83,13 @@
|
||||
<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
@ -61,10 +61,10 @@ public void initialize(URI uri, Configuration conf) {
|
||||
String secretAccessKeyProperty =
|
||||
String.format("fs.%s.awsSecretAccessKey", scheme);
|
||||
if (accessKey == null) {
|
||||
accessKey = conf.get(accessKeyProperty);
|
||||
accessKey = conf.getTrimmed(accessKeyProperty);
|
||||
}
|
||||
if (secretAccessKey == null) {
|
||||
secretAccessKey = conf.get(secretAccessKeyProperty);
|
||||
secretAccessKey = conf.getTrimmed(secretAccessKeyProperty);
|
||||
}
|
||||
if (accessKey == null && secretAccessKey == null) {
|
||||
throw new IllegalArgumentException("AWS " +
|
||||
|
@ -22,10 +22,11 @@
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
|
||||
private String accessKey;
|
||||
private String secretKey;
|
||||
private final String accessKey;
|
||||
private final String secretKey;
|
||||
|
||||
public BasicAWSCredentialsProvider(String accessKey, String secretKey) {
|
||||
this.accessKey = accessKey;
|
||||
@ -33,10 +34,9 @@ public BasicAWSCredentialsProvider(String accessKey, String secretKey) {
|
||||
}
|
||||
|
||||
public AWSCredentials getCredentials() {
|
||||
if (accessKey != null && secretKey != null) {
|
||||
if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) {
|
||||
return new BasicAWSCredentials(accessKey, secretKey);
|
||||
}
|
||||
|
||||
throw new AmazonClientException(
|
||||
"Access key or secret key is null");
|
||||
}
|
||||
|
@ -20,11 +20,6 @@
|
||||
|
||||
|
||||
public class Constants {
|
||||
// s3 access key
|
||||
public static final String ACCESS_KEY = "fs.s3a.access.key";
|
||||
|
||||
// s3 secret key
|
||||
public static final String SECRET_KEY = "fs.s3a.secret.key";
|
||||
|
||||
// number of simultaneous connections to s3
|
||||
public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
|
||||
@ -75,4 +70,6 @@ public class Constants {
|
||||
"fs.s3a.server-side-encryption-algorithm";
|
||||
|
||||
public static final String S3N_FOLDER_SUFFIX = "_$folder$";
|
||||
public static final String FS_S3A_BLOCK_SIZE = "fs.s3a.block.size";
|
||||
public static final String FS_S3A = "s3a";
|
||||
}
|
||||
|
@ -27,6 +27,8 @@
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.s3.S3Credentials;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
@ -80,6 +82,8 @@ public class S3AFileSystem extends FileSystem {
|
||||
private CannedAccessControlList cannedACL;
|
||||
private String serverSideEncryptionAlgorithm;
|
||||
|
||||
// The maximum number of entries that can be deleted in any call to s3
|
||||
private static final int MAX_ENTRIES_TO_DELETE = 1000;
|
||||
|
||||
/** Called after a new FileSystem instance is constructed.
|
||||
* @param name a uri whose authority section names the host, port, etc.
|
||||
@ -95,22 +99,12 @@ public void initialize(URI name, Configuration conf) throws IOException {
|
||||
this.getWorkingDirectory());
|
||||
|
||||
// Try to get our credentials or just connect anonymously
|
||||
String accessKey = conf.get(ACCESS_KEY, null);
|
||||
String secretKey = conf.get(SECRET_KEY, null);
|
||||
|
||||
String userInfo = name.getUserInfo();
|
||||
if (userInfo != null) {
|
||||
int index = userInfo.indexOf(':');
|
||||
if (index != -1) {
|
||||
accessKey = userInfo.substring(0, index);
|
||||
secretKey = userInfo.substring(index + 1);
|
||||
} else {
|
||||
accessKey = userInfo;
|
||||
}
|
||||
}
|
||||
S3Credentials s3Credentials = new S3Credentials();
|
||||
s3Credentials.initialize(name, conf);
|
||||
|
||||
AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
|
||||
new BasicAWSCredentialsProvider(accessKey, secretKey),
|
||||
new BasicAWSCredentialsProvider(s3Credentials.getAccessKey(),
|
||||
s3Credentials.getSecretAccessKey()),
|
||||
new InstanceProfileCredentialsProvider(),
|
||||
new AnonymousAWSCredentialsProvider()
|
||||
);
|
||||
@ -295,15 +289,12 @@ public boolean rename(Path src, Path dst) throws IOException {
|
||||
String dstKey = pathToKey(dst);
|
||||
|
||||
if (srcKey.length() == 0 || dstKey.length() == 0) {
|
||||
LOG.info("rename: src or dst are empty");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("rename: src or dst are empty");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
if (srcKey.equals(dstKey)) {
|
||||
LOG.info("rename: src and dst refer to the same file");
|
||||
return true;
|
||||
}
|
||||
|
||||
S3AFileStatus srcStatus;
|
||||
try {
|
||||
srcStatus = getFileStatus(src);
|
||||
@ -312,20 +303,27 @@ public boolean rename(Path src, Path dst) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (srcKey.equals(dstKey)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("rename: src and dst refer to the same file or directory");
|
||||
}
|
||||
return srcStatus.isFile();
|
||||
}
|
||||
|
||||
S3AFileStatus dstStatus = null;
|
||||
try {
|
||||
dstStatus = getFileStatus(dst);
|
||||
|
||||
if (srcStatus.isFile() && dstStatus.isDirectory()) {
|
||||
LOG.info("rename: src is a file and dst is a directory");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (srcStatus.isDirectory() && dstStatus.isFile()) {
|
||||
LOG.info("rename: src is a directory and dst is a file");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("rename: src is a directory and dst is a file");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
if (dstStatus.isDirectory() && !dstStatus.isEmptyDirectory()) {
|
||||
return false;
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
// Parent must exist
|
||||
Path parent = dst.getParent();
|
||||
@ -346,7 +344,18 @@ public boolean rename(Path src, Path dst) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("rename: renaming file " + src + " to " + dst);
|
||||
}
|
||||
copyFile(srcKey, dstKey);
|
||||
if (dstStatus != null && dstStatus.isDirectory()) {
|
||||
String newDstKey = dstKey;
|
||||
if (!newDstKey.endsWith("/")) {
|
||||
newDstKey = newDstKey + "/";
|
||||
}
|
||||
String filename =
|
||||
srcKey.substring(pathToKey(src.getParent()).length()+1);
|
||||
newDstKey = newDstKey + filename;
|
||||
copyFile(srcKey, newDstKey);
|
||||
} else {
|
||||
copyFile(srcKey, dstKey);
|
||||
}
|
||||
delete(src, false);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -362,12 +371,19 @@ public boolean rename(Path src, Path dst) throws IOException {
|
||||
srcKey = srcKey + "/";
|
||||
}
|
||||
|
||||
//Verify dest is not a child of the source directory
|
||||
if (dstKey.startsWith(srcKey)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("cannot rename a directory to a subdirectory of self");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
List<DeleteObjectsRequest.KeyVersion> keysToDelete =
|
||||
new ArrayList<DeleteObjectsRequest.KeyVersion>();
|
||||
if (dstStatus != null && dstStatus.isEmptyDirectory()) {
|
||||
copyFile(srcKey, dstKey);
|
||||
statistics.incrementWriteOps(1);
|
||||
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(srcKey));
|
||||
// delete unnecessary fake directory.
|
||||
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
|
||||
}
|
||||
|
||||
ListObjectsRequest request = new ListObjectsRequest();
|
||||
@ -383,23 +399,29 @@ public boolean rename(Path src, Path dst) throws IOException {
|
||||
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
|
||||
String newDstKey = dstKey + summary.getKey().substring(srcKey.length());
|
||||
copyFile(summary.getKey(), newDstKey);
|
||||
|
||||
if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
|
||||
DeleteObjectsRequest deleteRequest =
|
||||
new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
|
||||
s3.deleteObjects(deleteRequest);
|
||||
statistics.incrementWriteOps(1);
|
||||
keysToDelete.clear();
|
||||
}
|
||||
}
|
||||
|
||||
if (objects.isTruncated()) {
|
||||
objects = s3.listNextBatchOfObjects(objects);
|
||||
statistics.incrementReadOps(1);
|
||||
} else {
|
||||
if (keysToDelete.size() > 0) {
|
||||
DeleteObjectsRequest deleteRequest =
|
||||
new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
|
||||
s3.deleteObjects(deleteRequest);
|
||||
statistics.incrementWriteOps(1);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (!keysToDelete.isEmpty()) {
|
||||
DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket);
|
||||
deleteRequest.setKeys(keysToDelete);
|
||||
s3.deleteObjects(deleteRequest);
|
||||
statistics.incrementWriteOps(1);
|
||||
}
|
||||
}
|
||||
|
||||
if (src.getParent() != dst.getParent()) {
|
||||
@ -419,7 +441,9 @@ public boolean rename(Path src, Path dst) throws IOException {
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
LOG.info("Delete path " + f + " - recursive " + recursive);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Delete path " + f + " - recursive " + recursive);
|
||||
}
|
||||
S3AFileStatus status;
|
||||
try {
|
||||
status = getFileStatus(f);
|
||||
@ -479,18 +503,26 @@ public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Got object to delete " + summary.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket);
|
||||
deleteRequest.setKeys(keys);
|
||||
s3.deleteObjects(deleteRequest);
|
||||
statistics.incrementWriteOps(1);
|
||||
keys.clear();
|
||||
if (keys.size() == MAX_ENTRIES_TO_DELETE) {
|
||||
DeleteObjectsRequest deleteRequest =
|
||||
new DeleteObjectsRequest(bucket).withKeys(keys);
|
||||
s3.deleteObjects(deleteRequest);
|
||||
statistics.incrementWriteOps(1);
|
||||
keys.clear();
|
||||
}
|
||||
}
|
||||
|
||||
if (objects.isTruncated()) {
|
||||
objects = s3.listNextBatchOfObjects(objects);
|
||||
statistics.incrementReadOps(1);
|
||||
} else {
|
||||
if (keys.size() > 0) {
|
||||
DeleteObjectsRequest deleteRequest =
|
||||
new DeleteObjectsRequest(bucket).withKeys(keys);
|
||||
s3.deleteObjects(deleteRequest);
|
||||
statistics.incrementWriteOps(1);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -530,7 +562,9 @@ private void createFakeDirectoryIfNecessary(Path f) throws IOException {
|
||||
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
|
||||
IOException {
|
||||
String key = pathToKey(f);
|
||||
LOG.info("List status for path: " + f);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("List status for path: " + f);
|
||||
}
|
||||
|
||||
final List<FileStatus> result = new ArrayList<FileStatus>();
|
||||
final FileStatus fileStatus = getFileStatus(f);
|
||||
@ -640,7 +674,10 @@ public Path getWorkingDirectory() {
|
||||
// TODO: If we have created an empty file at /foo/bar and we then call
|
||||
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
|
||||
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||
LOG.info("Making directory: " + f);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Making directory: " + f);
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
FileStatus fileStatus = getFileStatus(f);
|
||||
@ -680,8 +717,10 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||
*/
|
||||
public S3AFileStatus getFileStatus(Path f) throws IOException {
|
||||
String key = pathToKey(f);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Getting path status for " + f + " (" + key + ")");
|
||||
}
|
||||
|
||||
LOG.info("Getting path status for " + f + " (" + key + ")");
|
||||
|
||||
if (!key.isEmpty()) {
|
||||
try {
|
||||
@ -723,7 +762,7 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
|
||||
}
|
||||
return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir));
|
||||
} else {
|
||||
LOG.warn("Found file (with /): real file? should not happen: " + key);
|
||||
LOG.warn("Found file (with /): real file? should not happen: {}", key);
|
||||
|
||||
return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()),
|
||||
f.makeQualified(uri, workingDir));
|
||||
@ -753,7 +792,8 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
|
||||
ObjectListing objects = s3.listObjects(request);
|
||||
statistics.incrementReadOps(1);
|
||||
|
||||
if (objects.getCommonPrefixes().size() > 0 || objects.getObjectSummaries().size() > 0) {
|
||||
if (!objects.getCommonPrefixes().isEmpty()
|
||||
|| objects.getObjectSummaries().size() > 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Found path as directory (with /): " +
|
||||
objects.getCommonPrefixes().size() + "/" +
|
||||
@ -806,8 +846,9 @@ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
|
||||
if (!overwrite && exists(dst)) {
|
||||
throw new IOException(dst + " already exists");
|
||||
}
|
||||
|
||||
LOG.info("Copying local file from " + src + " to " + dst);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Copying local file from " + src + " to " + dst);
|
||||
}
|
||||
|
||||
// Since we have a local file, we don't need to stream into a temporary file
|
||||
LocalFileSystem local = getLocal(getConf());
|
||||
@ -992,7 +1033,7 @@ public int read() throws IOException {
|
||||
@Deprecated
|
||||
public long getDefaultBlockSize() {
|
||||
// default to 32MB: large enough to minimize the impact of seeks
|
||||
return getConf().getLong("fs.s3a.block.size", 32 * 1024 * 1024);
|
||||
return getConf().getLong(FS_S3A_BLOCK_SIZE, 32 * 1024 * 1024);
|
||||
}
|
||||
|
||||
private void printAmazonServiceException(AmazonServiceException ase) {
|
||||
@ -1010,6 +1051,6 @@ private void printAmazonClientException(AmazonClientException ace) {
|
||||
LOG.info("Caught an AmazonClientException, which means the client encountered " +
|
||||
"a serious internal problem while trying to communicate with S3, " +
|
||||
"such as not being able to access the network.");
|
||||
LOG.info("Error Message: " + ace.getMessage());
|
||||
LOG.info("Error Message: {}" + ace, ace);
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.S3Object;
|
||||
import com.amazonaws.services.s3.model.S3ObjectInputStream;
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
@ -65,6 +66,7 @@ private void openIfNeeded() throws IOException {
|
||||
}
|
||||
|
||||
private synchronized void reopen(long pos) throws IOException {
|
||||
|
||||
if (wrappedStream != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Aborting old stream to open at pos " + pos);
|
||||
@ -73,15 +75,17 @@ private synchronized void reopen(long pos) throws IOException {
|
||||
}
|
||||
|
||||
if (pos < 0) {
|
||||
throw new EOFException("Trying to seek to a negative offset " + pos);
|
||||
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
|
||||
+" " + pos);
|
||||
}
|
||||
|
||||
if (contentLength > 0 && pos > contentLength-1) {
|
||||
throw new EOFException("Trying to seek to an offset " + pos +
|
||||
" past the end of the file");
|
||||
throw new EOFException(
|
||||
FSExceptionMessages.CANNOT_SEEK_PAST_EOF
|
||||
+ " " + pos);
|
||||
}
|
||||
|
||||
LOG.info("Actually opening file " + key + " at pos " + pos);
|
||||
LOG.debug("Actually opening file " + key + " at pos " + pos);
|
||||
|
||||
GetObjectRequest request = new GetObjectRequest(bucket, key);
|
||||
request.setRange(pos, contentLength-1);
|
||||
@ -103,11 +107,14 @@ public synchronized long getPos() throws IOException {
|
||||
|
||||
@Override
|
||||
public synchronized void seek(long pos) throws IOException {
|
||||
checkNotClosed();
|
||||
|
||||
if (this.pos == pos) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Reopening " + this.key + " to seek to new offset " + (pos - this.pos));
|
||||
LOG.debug(
|
||||
"Reopening " + this.key + " to seek to new offset " + (pos - this.pos));
|
||||
reopen(pos);
|
||||
}
|
||||
|
||||
@ -118,9 +125,7 @@ public boolean seekToNewSource(long targetPos) throws IOException {
|
||||
|
||||
@Override
|
||||
public synchronized int read() throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("Stream closed");
|
||||
}
|
||||
checkNotClosed();
|
||||
|
||||
openIfNeeded();
|
||||
|
||||
@ -148,10 +153,8 @@ public synchronized int read() throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int read(byte buf[], int off, int len) throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("Stream closed");
|
||||
}
|
||||
public synchronized int read(byte[] buf, int off, int len) throws IOException {
|
||||
checkNotClosed();
|
||||
|
||||
openIfNeeded();
|
||||
|
||||
@ -179,6 +182,12 @@ public synchronized int read(byte buf[], int off, int len) throws IOException {
|
||||
return byteRead;
|
||||
}
|
||||
|
||||
private void checkNotClosed() throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
super.close();
|
||||
@ -190,9 +199,8 @@ public synchronized void close() throws IOException {
|
||||
|
||||
@Override
|
||||
public synchronized int available() throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("Stream closed");
|
||||
}
|
||||
checkNotClosed();
|
||||
|
||||
long remaining = this.contentLength - this.pos;
|
||||
if (remaining > Integer.MAX_VALUE) {
|
||||
return Integer.MAX_VALUE;
|
||||
|
@ -87,7 +87,10 @@ public S3AOutputStream(Configuration conf, AmazonS3Client client,
|
||||
backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
|
||||
closed = false;
|
||||
|
||||
LOG.info("OutputStream for key '" + key + "' writing to tempfile: " + this.backupFile);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("OutputStream for key '" + key + "' writing to tempfile: " +
|
||||
this.backupFile);
|
||||
}
|
||||
|
||||
this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
|
||||
}
|
||||
@ -104,8 +107,10 @@ public synchronized void close() throws IOException {
|
||||
}
|
||||
|
||||
backupStream.close();
|
||||
LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload");
|
||||
LOG.info("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("OutputStream for key '" + key + "' closed. Now beginning upload");
|
||||
LOG.debug("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold);
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
@ -146,13 +151,14 @@ public synchronized void close() throws IOException {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (!backupFile.delete()) {
|
||||
LOG.warn("Could not delete temporary s3a file: " + backupFile);
|
||||
LOG.warn("Could not delete temporary s3a file: {}", backupFile);
|
||||
}
|
||||
super.close();
|
||||
closed = true;
|
||||
}
|
||||
|
||||
LOG.info("OutputStream for key '" + key + "' upload complete");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("OutputStream for key '" + key + "' upload complete");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,417 @@
|
||||
<!---
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
|
||||
# Hadoop-AWS module: Integration with Amazon Web Services
|
||||
|
||||
The `hadoop-aws` module provides support for AWS integration. The generated
|
||||
JAR file, `hadoop-aws.jar` also declares a transitive dependency on all
|
||||
external artifacts which are needed for this support —enabling downstream
|
||||
applications to easily use this support.
|
||||
|
||||
Features
|
||||
|
||||
1. The "classic" `s3:` filesystem for storing objects in Amazon S3 Storage
|
||||
1. The second-generation, `s3n:` filesystem, making it easy to share
|
||||
data between hadoop and other applications via the S3 object store
|
||||
1. The third generation, `s3a:` filesystem. Designed to be a switch in
|
||||
replacement for `s3n:`, this filesystem binding supports larger files and promises
|
||||
higher performance.
|
||||
|
||||
The specifics of using these filesystems are documented below.
|
||||
|
||||
## Warning: Object Stores are not filesystems.
|
||||
|
||||
Amazon S3 is an example of "an object store". In order to achieve scalalablity
|
||||
and especially high availability, S3 has —as many other cloud object stores have
|
||||
done— relaxed some of the constraints which classic "POSIX" filesystems promise.
|
||||
|
||||
Specifically
|
||||
|
||||
1. Files that are newly created from the Hadoop Filesystem APIs may not be
|
||||
immediately visible.
|
||||
2. File delete and update operations may not immediately propagate. Old
|
||||
copies of the file may exist for an indeterminate time period.
|
||||
3. Directory operations: `delete()` and `rename()` are implemented by
|
||||
recursive file-by-file operations. They take time at least proportional to
|
||||
the number of files, during which time partial updates may be visible. If
|
||||
the operations are interrupted, the filesystem is left in an intermediate state.
|
||||
|
||||
For further discussion on these topics, please consult
|
||||
[/filesystem](The Hadoop FileSystem API Definition).
|
||||
|
||||
## Warning #2: your AWS credentials are valuable
|
||||
|
||||
Your AWS credentials not only pay for services, they offer read and write
|
||||
access to the data. Anyone with the credentials can not only read your datasets
|
||||
—they can delete them.
|
||||
|
||||
Do not inadvertently share these credentials through means such as
|
||||
1. Checking in Hadoop configuration files containing the credentials.
|
||||
1. Logging them to a console, as they invariably end up being seen.
|
||||
|
||||
If you do any of these: change your credentials immediately!
|
||||
|
||||
|
||||
## S3
|
||||
|
||||
### Authentication properties
|
||||
|
||||
<property>
|
||||
<name>fs.s3.awsAccessKeyId</name>
|
||||
<description>AWS access key ID</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3.awsSecretAccessKey</name>
|
||||
<description>AWS secret key</description>
|
||||
</property>
|
||||
|
||||
|
||||
## S3N
|
||||
|
||||
### Authentication properties
|
||||
|
||||
<property>
|
||||
<name>fs.s3n.awsAccessKeyId</name>
|
||||
<description>AWS access key ID</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3n.awsSecretAccessKey</name>
|
||||
<description>AWS secret key</description>
|
||||
</property>
|
||||
|
||||
### Other properties
|
||||
|
||||
|
||||
<property>
|
||||
<name>fs.s3n.block.size</name>
|
||||
<value>67108864</value>
|
||||
<description>Block size to use when reading files using the native S3
|
||||
filesystem (s3n: URIs).</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3n.multipart.uploads.enabled</name>
|
||||
<value>false</value>
|
||||
<description>Setting this property to true enables multiple uploads to
|
||||
native S3 filesystem. When uploading a file, it is split into blocks
|
||||
if the size is larger than fs.s3n.multipart.uploads.block.size.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3n.multipart.uploads.block.size</name>
|
||||
<value>67108864</value>
|
||||
<description>The block size for multipart uploads to native S3 filesystem.
|
||||
Default size is 64MB.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3n.multipart.copy.block.size</name>
|
||||
<value>5368709120</value>
|
||||
<description>The block size for multipart copy in native S3 filesystem.
|
||||
Default size is 5GB.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3n.server-side-encryption-algorithm</name>
|
||||
<value></value>
|
||||
<description>Specify a server-side encryption algorithm for S3.
|
||||
The default is NULL, and the only other currently allowable value is AES256.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
## S3A
|
||||
|
||||
|
||||
### Authentication properties
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.awsAccessKeyId</name>
|
||||
<description>AWS access key ID. Omit for Role-based authentication.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.awsSecretAccessKey</name>
|
||||
<description>AWS secret key. Omit for Role-based authentication.</description>
|
||||
</property>
|
||||
|
||||
### Other properties
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.connection.maximum</name>
|
||||
<value>15</value>
|
||||
<description>Controls the maximum number of simultaneous connections to S3.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.connection.ssl.enabled</name>
|
||||
<value>true</value>
|
||||
<description>Enables or disables SSL connections to S3.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.attempts.maximum</name>
|
||||
<value>10</value>
|
||||
<description>How many times we should retry commands on transient errors.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.connection.timeout</name>
|
||||
<value>5000</value>
|
||||
<description>Socket connection timeout in seconds.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.paging.maximum</name>
|
||||
<value>5000</value>
|
||||
<description>How many keys to request from S3 when doing
|
||||
directory listings at a time.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.multipart.size</name>
|
||||
<value>104857600</value>
|
||||
<description>How big (in bytes) to split upload or copy operations up into.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.multipart.threshold</name>
|
||||
<value>2147483647</value>
|
||||
<description>Threshold before uploads or copies use parallel multipart operations.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.acl.default</name>
|
||||
<description>Set a canned ACL for newly created and copied objects. Value may be private,
|
||||
public-read, public-read-write, authenticated-read, log-delivery-write,
|
||||
bucket-owner-read, or bucket-owner-full-control.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.multipart.purge</name>
|
||||
<value>false</value>
|
||||
<description>True if you want to purge existing multipart uploads that may not have been
|
||||
completed/aborted correctly</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.multipart.purge.age</name>
|
||||
<value>86400</value>
|
||||
<description>Minimum age in seconds of multipart uploads to purge</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.buffer.dir</name>
|
||||
<value>${hadoop.tmp.dir}/s3a</value>
|
||||
<description>Comma separated list of directories that will be used to buffer file
|
||||
uploads to.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.impl</name>
|
||||
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
|
||||
<description>The implementation class of the S3A Filesystem</description>
|
||||
</property>
|
||||
|
||||
|
||||
## Testing the S3 filesystem clients
|
||||
|
||||
To test the S3* filesystem clients, you need to provide two files
|
||||
which pass in authentication details to the test runner
|
||||
|
||||
1. `auth-keys.xml`
|
||||
1. `core-site.xml`
|
||||
|
||||
These are both Hadoop XML configuration files, which must be placed into
|
||||
`hadoop-tools/hadoop-aws/src/test/resources`.
|
||||
|
||||
|
||||
### `auth-keys.xml`
|
||||
|
||||
The presence of this file triggers the testing of the S3 classes.
|
||||
|
||||
Without this file, *none of the tests in this module will be executed*
|
||||
|
||||
The XML file must contain all the ID/key information needed to connect
|
||||
each of the filesystem clients to the object stores, and a URL for
|
||||
each filesystem for its testing.
|
||||
|
||||
1. `test.fs.s3n.name` : the URL of the bucket for S3n tests
|
||||
1. `test.fs.s3a.name` : the URL of the bucket for S3a tests
|
||||
2. `test.fs.s3.name` : the URL of the bucket for "S3" tests
|
||||
|
||||
The contents of each bucket will be destroyed during the test process:
|
||||
do not use the bucket for any purpose other than testing.
|
||||
|
||||
Example:
|
||||
|
||||
<configuration>
|
||||
|
||||
<property>
|
||||
<name>test.fs.s3n.name</name>
|
||||
<value>s3n://test-aws-s3n/</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>test.fs.s3a.name</name>
|
||||
<value>s3a://test-aws-s3a/</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>test.fs.s3.name</name>
|
||||
<value>s3a://test-aws-s3/</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3.awsAccessKeyId</name>
|
||||
<value>DONOTPCOMMITTHISKEYTOSCM</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3.awsSecretAccessKey</name>
|
||||
<value>DONOTEVERSHARETHISSECRETKEY!</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3n.awsAccessKeyId</name>
|
||||
<value>DONOTPCOMMITTHISKEYTOSCM</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3n.awsSecretAccessKey</name>
|
||||
<value>DONOTEVERSHARETHISSECRETKEY!</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.awsAccessKeyId</name>
|
||||
<description>AWS access key ID. Omit for Role-based authentication.</description>
|
||||
<value>DONOTPCOMMITTHISKEYTOSCM</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.awsSecretAccessKey</name>
|
||||
<description>AWS secret key. Omit for Role-based authentication.</description>
|
||||
<value>DONOTEVERSHARETHISSECRETKEY!</value>
|
||||
</property>
|
||||
</configuration>
|
||||
|
||||
## File `contract-test-options.xml`
|
||||
|
||||
The file `hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml`
|
||||
must be created and configured for the test fileystems.
|
||||
|
||||
If a specific file `fs.contract.test.fs.*` test path is not defined for
|
||||
any of the filesystems, those tests will be skipped.
|
||||
|
||||
The standard S3 authentication details must also be provided. This can be
|
||||
through copy-and-paste of the `auth-keys.xml` credentials, or it can be
|
||||
through direct XInclude inclustion.
|
||||
|
||||
#### s3://
|
||||
|
||||
The filesystem name must be defined in the property `fs.contract.test.fs.s3`.
|
||||
|
||||
|
||||
Example:
|
||||
|
||||
<property>
|
||||
<name>fs.contract.test.fs.s3</name>
|
||||
<value>s3://test-aws-s3/</value>
|
||||
</property>
|
||||
|
||||
### s3n://
|
||||
|
||||
|
||||
In the file `src/test/resources/contract-test-options.xml`, the filesystem
|
||||
name must be defined in the property `fs.contract.test.fs.s3n`.
|
||||
The standard configuration options to define the S3N authentication details
|
||||
must also be provided.
|
||||
|
||||
Example:
|
||||
|
||||
<property>
|
||||
<name>fs.contract.test.fs.s3n</name>
|
||||
<value>s3n://test-aws-s3n/</value>
|
||||
</property>
|
||||
|
||||
### s3a://
|
||||
|
||||
|
||||
In the file `src/test/resources/contract-test-options.xml`, the filesystem
|
||||
name must be defined in the property `fs.contract.test.fs.s3a`.
|
||||
The standard configuration options to define the S3N authentication details
|
||||
must also be provided.
|
||||
|
||||
Example:
|
||||
|
||||
<property>
|
||||
<name>fs.contract.test.fs.s3a</name>
|
||||
<value>s3a://test-aws-s3a/</value>
|
||||
</property>
|
||||
|
||||
### Complete example of `contract-test-options.xml`
|
||||
|
||||
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
|
||||
<include xmlns="http://www.w3.org/2001/XInclude"
|
||||
href="auth-keys.xml"/>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.test.fs.s3</name>
|
||||
<value>s3://test-aws-s3/</value>
|
||||
</property>
|
||||
|
||||
|
||||
<property>
|
||||
<name>fs.contract.test.fs.s3a</name>
|
||||
<value>s3a://test-aws-s3a/</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.test.fs.s3n</name>
|
||||
<value>s3n://test-aws-s3n/</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
||||
This example pulls in the `auth-keys.xml` file for the credentials.
|
||||
This provides one single place to keep the keys up to date —and means
|
||||
that the file `contract-test-options.xml` does not contain any
|
||||
secret credentials itself.
|
@ -21,10 +21,10 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
||||
@ -51,14 +51,11 @@ public void testRenameDirIntoExistingDir() throws Throwable {
|
||||
|
||||
Path destFilePath = new Path(destDir, "dest-512.txt");
|
||||
byte[] destDateset = dataset(512, 'A', 'Z');
|
||||
writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024, false);
|
||||
writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024,
|
||||
false);
|
||||
assertIsFile(destFilePath);
|
||||
|
||||
boolean rename = fs.rename(srcDir, destDir);
|
||||
Path renamedSrcFilePath = new Path(destDir, "source-256.txt");
|
||||
assertIsFile(destFilePath);
|
||||
assertIsFile(renamedSrcFilePath);
|
||||
ContractTestUtils.verifyFileContents(fs, destFilePath, destDateset);
|
||||
assertTrue("rename returned false though the contents were copied", rename);
|
||||
assertFalse("s3a doesn't support rename to non-empty directory", rename);
|
||||
}
|
||||
}
|
||||
|
@ -21,13 +21,15 @@
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.internal.AssumptionViolatedException;
|
||||
|
||||
public abstract class S3FileSystemContractBaseTest
|
||||
extends FileSystemContractBaseTest {
|
||||
|
||||
public static final String KEY_TEST_FS = "test.fs.s3.name";
|
||||
private FileSystemStore store;
|
||||
|
||||
abstract FileSystemStore getFileSystemStore() throws IOException;
|
||||
@ -37,7 +39,12 @@ protected void setUp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
store = getFileSystemStore();
|
||||
fs = new S3FileSystem(store);
|
||||
fs.initialize(URI.create(conf.get("test.fs.s3.name")), conf);
|
||||
String fsname = conf.get(KEY_TEST_FS);
|
||||
if (StringUtils.isEmpty(fsname)) {
|
||||
throw new AssumptionViolatedException(
|
||||
"No test FS defined in :" + KEY_TEST_FS);
|
||||
}
|
||||
fs.initialize(URI.create(fsname), conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1,327 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import static org.junit.Assume.*;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Tests a live S3 system. If you keys and bucket aren't specified, all tests
|
||||
* are marked as passed
|
||||
*
|
||||
* This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from
|
||||
* TestCase which uses the old Junit3 runner that doesn't ignore assumptions
|
||||
* properly making it impossible to skip the tests if we don't have a valid
|
||||
* bucket.
|
||||
**/
|
||||
public class S3AFileSystemContractBaseTest extends FileSystemContractBaseTest {
|
||||
private static final int TEST_BUFFER_SIZE = 128;
|
||||
private static final int MODULUS = 128;
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(S3AFileSystemContractBaseTest.class);
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
URI testURI = URI.create(conf.get("test.fs.s3a.name"));
|
||||
|
||||
boolean liveTest = testURI != null && !testURI.equals("s3a:///");
|
||||
|
||||
// This doesn't work with our JUnit 3 style test cases, so instead we'll
|
||||
// make this whole class not run by default
|
||||
assumeTrue(liveTest);
|
||||
|
||||
fs = new S3AFileSystem();
|
||||
fs.initialize(testURI, conf);
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
if (fs != null) {
|
||||
fs.delete(path("/tests3a"), true);
|
||||
}
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testMkdirs() throws IOException {
|
||||
// No trailing slash
|
||||
assertTrue(fs.mkdirs(path("/tests3a/a")));
|
||||
assertTrue(fs.exists(path("/tests3a/a")));
|
||||
|
||||
// With trailing slash
|
||||
assertTrue(fs.mkdirs(path("/tests3a/b/")));
|
||||
assertTrue(fs.exists(path("/tests3a/b/")));
|
||||
|
||||
// Two levels deep
|
||||
assertTrue(fs.mkdirs(path("/tests3a/c/a/")));
|
||||
assertTrue(fs.exists(path("/tests3a/c/a/")));
|
||||
|
||||
// Mismatched slashes
|
||||
assertTrue(fs.exists(path("/tests3a/c/a")));
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testDelete() throws IOException {
|
||||
// Test deleting an empty directory
|
||||
assertTrue(fs.mkdirs(path("/tests3a/d")));
|
||||
assertTrue(fs.delete(path("/tests3a/d"), true));
|
||||
assertFalse(fs.exists(path("/tests3a/d")));
|
||||
|
||||
// Test deleting a deep empty directory
|
||||
assertTrue(fs.mkdirs(path("/tests3a/e/f/g/h")));
|
||||
assertTrue(fs.delete(path("/tests3a/e/f/g"), true));
|
||||
assertFalse(fs.exists(path("/tests3a/e/f/g/h")));
|
||||
assertFalse(fs.exists(path("/tests3a/e/f/g")));
|
||||
assertTrue(fs.exists(path("/tests3a/e/f")));
|
||||
|
||||
// Test delete of just a file
|
||||
writeFile(path("/tests3a/f/f/file"), 1000);
|
||||
assertTrue(fs.exists(path("/tests3a/f/f/file")));
|
||||
assertTrue(fs.delete(path("/tests3a/f/f/file"), false));
|
||||
assertFalse(fs.exists(path("/tests3a/f/f/file")));
|
||||
|
||||
|
||||
// Test delete of a path with files in various directories
|
||||
writeFile(path("/tests3a/g/h/i/file"), 1000);
|
||||
assertTrue(fs.exists(path("/tests3a/g/h/i/file")));
|
||||
writeFile(path("/tests3a/g/h/j/file"), 1000);
|
||||
assertTrue(fs.exists(path("/tests3a/g/h/j/file")));
|
||||
try {
|
||||
assertFalse(fs.delete(path("/tests3a/g/h"), false));
|
||||
fail("Expected delete to fail with recursion turned off");
|
||||
} catch (IOException e) {}
|
||||
assertTrue(fs.exists(path("/tests3a/g/h/j/file")));
|
||||
assertTrue(fs.delete(path("/tests3a/g/h"), true));
|
||||
assertFalse(fs.exists(path("/tests3a/g/h/j")));
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 3600000)
|
||||
public void testOpenCreate() throws IOException {
|
||||
try {
|
||||
createAndReadFileTest(1024);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
createAndReadFileTest(5 * 1024 * 1024);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
createAndReadFileTest(20 * 1024 * 1024);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
|
||||
/*
|
||||
Enable to test the multipart upload
|
||||
try {
|
||||
createAndReadFileTest((long)6 * 1024 * 1024 * 1024);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
@Test(timeout = 1200000)
|
||||
public void testRenameFile() throws IOException {
|
||||
Path srcPath = path("/tests3a/a/srcfile");
|
||||
|
||||
final OutputStream outputStream = fs.create(srcPath, false);
|
||||
generateTestData(outputStream, 11 * 1024 * 1024);
|
||||
outputStream.close();
|
||||
|
||||
assertTrue(fs.exists(srcPath));
|
||||
|
||||
Path dstPath = path("/tests3a/b/dstfile");
|
||||
|
||||
assertFalse(fs.rename(srcPath, dstPath));
|
||||
assertTrue(fs.mkdirs(dstPath.getParent()));
|
||||
assertTrue(fs.rename(srcPath, dstPath));
|
||||
assertTrue(fs.exists(dstPath));
|
||||
assertFalse(fs.exists(srcPath));
|
||||
assertTrue(fs.exists(srcPath.getParent()));
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testRenameDirectory() throws IOException {
|
||||
Path srcPath = path("/tests3a/a");
|
||||
|
||||
assertTrue(fs.mkdirs(srcPath));
|
||||
writeFile(new Path(srcPath, "b/testfile"), 1024);
|
||||
|
||||
Path nonEmptyPath = path("/tests3a/nonempty");
|
||||
writeFile(new Path(nonEmptyPath, "b/testfile"), 1024);
|
||||
|
||||
assertFalse(fs.rename(srcPath, nonEmptyPath));
|
||||
|
||||
Path dstPath = path("/tests3a/b");
|
||||
assertTrue(fs.rename(srcPath, dstPath));
|
||||
assertFalse(fs.exists(srcPath));
|
||||
assertTrue(fs.exists(new Path(dstPath, "b/testfile")));
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testSeek() throws IOException {
|
||||
Path path = path("/tests3a/testfile.seek");
|
||||
writeFile(path, TEST_BUFFER_SIZE * 10);
|
||||
|
||||
|
||||
FSDataInputStream inputStream = fs.open(path, TEST_BUFFER_SIZE);
|
||||
inputStream.seek(inputStream.getPos() + MODULUS);
|
||||
|
||||
testReceivedData(inputStream, TEST_BUFFER_SIZE * 10 - MODULUS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates and reads a file with the given size in S3. The test file is
|
||||
* generated according to a specific pattern.
|
||||
* During the read phase the incoming data stream is also checked against this pattern.
|
||||
*
|
||||
* @param fileSize
|
||||
* the size of the file to be generated in bytes
|
||||
* @throws IOException
|
||||
* thrown if an I/O error occurs while writing or reading the test file
|
||||
*/
|
||||
private void createAndReadFileTest(final long fileSize) throws IOException {
|
||||
final String objectName = UUID.randomUUID().toString();
|
||||
final Path objectPath = new Path("/tests3a/", objectName);
|
||||
|
||||
// Write test file to S3
|
||||
final OutputStream outputStream = fs.create(objectPath, false);
|
||||
generateTestData(outputStream, fileSize);
|
||||
outputStream.close();
|
||||
|
||||
// Now read the same file back from S3
|
||||
final InputStream inputStream = fs.open(objectPath);
|
||||
testReceivedData(inputStream, fileSize);
|
||||
inputStream.close();
|
||||
|
||||
// Delete test file
|
||||
fs.delete(objectPath, false);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Receives test data from the given input stream and checks the size of the
|
||||
* data as well as the pattern inside the received data.
|
||||
*
|
||||
* @param inputStream
|
||||
* the input stream to read the test data from
|
||||
* @param expectedSize
|
||||
* the expected size of the data to be read from the input stream in bytes
|
||||
* @throws IOException
|
||||
* thrown if an error occurs while reading the data
|
||||
*/
|
||||
private void testReceivedData(final InputStream inputStream,
|
||||
final long expectedSize) throws IOException {
|
||||
final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
|
||||
|
||||
long totalBytesRead = 0;
|
||||
int nextExpectedNumber = 0;
|
||||
while (true) {
|
||||
final int bytesRead = inputStream.read(testBuffer);
|
||||
if (bytesRead < 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
totalBytesRead += bytesRead;
|
||||
|
||||
for (int i = 0; i < bytesRead; ++i) {
|
||||
if (testBuffer[i] != nextExpectedNumber) {
|
||||
throw new IOException("Read number " + testBuffer[i] + " but expected "
|
||||
+ nextExpectedNumber);
|
||||
}
|
||||
|
||||
++nextExpectedNumber;
|
||||
|
||||
if (nextExpectedNumber == MODULUS) {
|
||||
nextExpectedNumber = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (totalBytesRead != expectedSize) {
|
||||
throw new IOException("Expected to read " + expectedSize +
|
||||
" bytes but only received " + totalBytesRead);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Generates test data of the given size according to some specific pattern
|
||||
* and writes it to the provided output stream.
|
||||
*
|
||||
* @param outputStream
|
||||
* the output stream to write the data to
|
||||
* @param size
|
||||
* the size of the test data to be generated in bytes
|
||||
* @throws IOException
|
||||
* thrown if an error occurs while writing the data
|
||||
*/
|
||||
private void generateTestData(final OutputStream outputStream,
|
||||
final long size) throws IOException {
|
||||
|
||||
final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
|
||||
for (int i = 0; i < testBuffer.length; ++i) {
|
||||
testBuffer[i] = (byte) (i % MODULUS);
|
||||
}
|
||||
|
||||
long bytesWritten = 0;
|
||||
while (bytesWritten < size) {
|
||||
|
||||
final long diff = size - bytesWritten;
|
||||
if (diff < testBuffer.length) {
|
||||
outputStream.write(testBuffer, 0, (int)diff);
|
||||
bytesWritten += diff;
|
||||
} else {
|
||||
outputStream.write(testBuffer);
|
||||
bytesWritten += testBuffer.length;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeFile(Path name, int fileSize) throws IOException {
|
||||
final OutputStream outputStream = fs.create(name, false);
|
||||
generateTestData(outputStream, fileSize);
|
||||
outputStream.close();
|
||||
}
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.junit.internal.AssumptionViolatedException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
public class S3ATestUtils {
|
||||
|
||||
public static S3AFileSystem createTestFileSystem(Configuration conf) throws
|
||||
IOException {
|
||||
String fsname = conf.getTrimmed(TestS3AFileSystemContract.TEST_FS_S3A_NAME, "");
|
||||
|
||||
|
||||
boolean liveTest = !StringUtils.isEmpty(fsname);
|
||||
URI testURI = null;
|
||||
if (liveTest) {
|
||||
testURI = URI.create(fsname);
|
||||
liveTest = testURI.getScheme().equals(Constants.FS_S3A);
|
||||
}
|
||||
if (!liveTest) {
|
||||
// This doesn't work with our JUnit 3 style test cases, so instead we'll
|
||||
// make this whole class not run by default
|
||||
throw new AssumptionViolatedException(
|
||||
"No test filesystem in " + TestS3AFileSystemContract.TEST_FS_S3A_NAME);
|
||||
}
|
||||
S3AFileSystem fs1 = new S3AFileSystem();
|
||||
fs1.initialize(testURI, conf);
|
||||
return fs1;
|
||||
}
|
||||
}
|
@ -0,0 +1,105 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
/**
|
||||
* Tests a live S3 system. If your keys and bucket aren't specified, all tests
|
||||
* are marked as passed.
|
||||
*
|
||||
* This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from
|
||||
* TestCase which uses the old Junit3 runner that doesn't ignore assumptions
|
||||
* properly making it impossible to skip the tests if we don't have a valid
|
||||
* bucket.
|
||||
**/
|
||||
public class TestS3AFileSystemContract extends FileSystemContractBaseTest {
|
||||
|
||||
protected static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestS3AFileSystemContract.class);
|
||||
public static final String TEST_FS_S3A_NAME = "test.fs.s3a.name";
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
if (fs != null) {
|
||||
fs.delete(path("test"), true);
|
||||
}
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testMkdirsWithUmask() throws Exception {
|
||||
// not supported
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testRenameFileAsExistingFile() throws Exception {
|
||||
if (!renameSupported()) return;
|
||||
|
||||
Path src = path("/test/hadoop/file");
|
||||
createFile(src);
|
||||
Path dst = path("/test/new/newfile");
|
||||
createFile(dst);
|
||||
// s3 doesn't support rename option
|
||||
// rename-overwrites-dest is always allowed.
|
||||
rename(src, dst, true, false, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testRenameDirectoryAsExistingDirectory() throws Exception {
|
||||
if (!renameSupported()) {
|
||||
return;
|
||||
}
|
||||
|
||||
Path src = path("/test/hadoop/dir");
|
||||
fs.mkdirs(src);
|
||||
createFile(path("/test/hadoop/dir/file1"));
|
||||
createFile(path("/test/hadoop/dir/subdir/file2"));
|
||||
|
||||
Path dst = path("/test/new/newdir");
|
||||
fs.mkdirs(dst);
|
||||
rename(src, dst, true, false, true);
|
||||
assertFalse("Nested file1 exists",
|
||||
fs.exists(path("/test/hadoop/dir/file1")));
|
||||
assertFalse("Nested file2 exists",
|
||||
fs.exists(path("/test/hadoop/dir/subdir/file2")));
|
||||
assertTrue("Renamed nested file1 exists",
|
||||
fs.exists(path("/test/new/newdir/file1")));
|
||||
assertTrue("Renamed nested exists",
|
||||
fs.exists(path("/test/new/newdir/subdir/file2")));
|
||||
}
|
||||
|
||||
// @Override
|
||||
public void testMoveDirUnderParent() throws Throwable {
|
||||
// not support because
|
||||
// Fails if dst is a directory that is not empty.
|
||||
}
|
||||
}
|
@ -0,0 +1,89 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.scale;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
/**
|
||||
* Base class for scale tests; here is where the common scale configuration
|
||||
* keys are defined
|
||||
*/
|
||||
public class S3AScaleTestBase {
|
||||
|
||||
public static final String SCALE_TEST = "scale.test.";
|
||||
public static final String KEY_OPERATION_COUNT =
|
||||
SCALE_TEST + "operation.count";
|
||||
public static final long DEFAULT_OPERATION_COUNT = 2005;
|
||||
|
||||
protected S3AFileSystem fs;
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(S3AScaleTestBase.class);
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
/**
|
||||
* Configuration generator. May be overridden to inject
|
||||
* some custom options
|
||||
* @return a configuration with which to create FS instances
|
||||
*/
|
||||
protected Configuration createConfiguration() {
|
||||
return new Configuration();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the configuration used to set up the FS
|
||||
* @return the configuration
|
||||
*/
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = createConfiguration();
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
ContractTestUtils.rm(fs, getTestPath(), true, true);
|
||||
}
|
||||
|
||||
protected Path getTestPath() {
|
||||
return new Path("/tests3a");
|
||||
}
|
||||
|
||||
protected long getOperationCount() {
|
||||
return getConf().getLong(KEY_OPERATION_COUNT, DEFAULT_OPERATION_COUNT);
|
||||
}
|
||||
}
|
@ -0,0 +1,131 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.scale;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestS3ADeleteManyFiles extends S3AScaleTestBase {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestS3ADeleteManyFiles.class);
|
||||
|
||||
|
||||
@Rule
|
||||
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
|
||||
|
||||
@Test
|
||||
public void testBulkRenameAndDelete() throws Throwable {
|
||||
final Path scaleTestDir = getTestPath();
|
||||
final Path srcDir = new Path(scaleTestDir, "src");
|
||||
final Path finalDir = new Path(scaleTestDir, "final");
|
||||
final long count = getOperationCount();
|
||||
ContractTestUtils.rm(fs, scaleTestDir, true, false);
|
||||
|
||||
fs.mkdirs(srcDir);
|
||||
fs.mkdirs(finalDir);
|
||||
|
||||
int testBufferSize = fs.getConf()
|
||||
.getInt(ContractTestUtils.IO_CHUNK_BUFFER_SIZE,
|
||||
ContractTestUtils.DEFAULT_IO_CHUNK_BUFFER_SIZE);
|
||||
// use Executor to speed up file creation
|
||||
ExecutorService exec = Executors.newFixedThreadPool(16);
|
||||
final ExecutorCompletionService<Boolean> completionService =
|
||||
new ExecutorCompletionService<Boolean>(exec);
|
||||
try {
|
||||
final byte[] data = ContractTestUtils.dataset(testBufferSize, 'a', 'z');
|
||||
|
||||
for (int i = 0; i < count; ++i) {
|
||||
final String fileName = "foo-" + i;
|
||||
completionService.submit(new Callable<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() throws IOException {
|
||||
ContractTestUtils.createFile(fs, new Path(srcDir, fileName),
|
||||
false, data);
|
||||
return fs.exists(new Path(srcDir, fileName));
|
||||
}
|
||||
});
|
||||
}
|
||||
for (int i = 0; i < count; ++i) {
|
||||
final Future<Boolean> future = completionService.take();
|
||||
try {
|
||||
if (!future.get()) {
|
||||
LOG.warn("cannot create file");
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
LOG.warn("Error while uploading file", e.getCause());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
exec.shutdown();
|
||||
}
|
||||
|
||||
int nSrcFiles = fs.listStatus(srcDir).length;
|
||||
fs.rename(srcDir, finalDir);
|
||||
assertEquals(nSrcFiles, fs.listStatus(finalDir).length);
|
||||
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
|
||||
new Path(srcDir, "foo-" + 0));
|
||||
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
|
||||
new Path(srcDir, "foo-" + count / 2));
|
||||
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
|
||||
new Path(srcDir, "foo-" + (count - 1)));
|
||||
ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
|
||||
new Path(finalDir, "foo-" + 0));
|
||||
ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
|
||||
new Path(finalDir, "foo-" + count/2));
|
||||
ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
|
||||
new Path(finalDir, "foo-" + (count-1)));
|
||||
|
||||
ContractTestUtils.assertDeleted(fs, finalDir, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOpenCreate() throws IOException {
|
||||
Path dir = new Path("/tests3a");
|
||||
ContractTestUtils.createAndVerifyFile(fs, dir, 1024);
|
||||
ContractTestUtils.createAndVerifyFile(fs, dir, 5 * 1024 * 1024);
|
||||
ContractTestUtils.createAndVerifyFile(fs, dir, 20 * 1024 * 1024);
|
||||
|
||||
|
||||
/*
|
||||
Enable to test the multipart upload
|
||||
try {
|
||||
ContractTestUtils.createAndVerifyFile(fs, dir,
|
||||
(long)6 * 1024 * 1024 * 1024);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
@ -22,15 +22,17 @@
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3native.NativeS3FileSystem.NativeS3FsInputStream;
|
||||
import org.junit.internal.AssumptionViolatedException;
|
||||
|
||||
public abstract class NativeS3FileSystemContractBaseTest
|
||||
extends FileSystemContractBaseTest {
|
||||
|
||||
public static final String KEY_TEST_FS = "test.fs.s3n.name";
|
||||
private NativeFileSystemStore store;
|
||||
|
||||
abstract NativeFileSystemStore getNativeFileSystemStore() throws IOException;
|
||||
@ -40,7 +42,12 @@ protected void setUp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
store = getNativeFileSystemStore();
|
||||
fs = new NativeS3FileSystem(store);
|
||||
fs.initialize(URI.create(conf.get("test.fs.s3n.name")), conf);
|
||||
String fsname = conf.get(KEY_TEST_FS);
|
||||
if (StringUtils.isEmpty(fsname)) {
|
||||
throw new AssumptionViolatedException(
|
||||
"No test FS defined in :" + KEY_TEST_FS);
|
||||
}
|
||||
fs.initialize(URI.create(fsname), conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -117,10 +117,13 @@ public void testMediumUpload() throws IOException, NoSuchAlgorithmException {
|
||||
writeRenameReadCompare(new Path("/test/medium"), 33554432); // 100 MB
|
||||
}
|
||||
|
||||
/*
|
||||
Enable Multipart upload to run this test
|
||||
@Test
|
||||
public void testExtraLargeUpload()
|
||||
throws IOException, NoSuchAlgorithmException {
|
||||
// Multipart upload, multipart copy
|
||||
writeRenameReadCompare(new Path("/test/xlarge"), 5368709121L); // 5GB+1byte
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
@ -47,6 +47,11 @@
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.rename-remove-dest-if-empty-dir</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-append</name>
|
||||
<value>false</value>
|
||||
|
51
hadoop-tools/hadoop-aws/src/test/resources/core-site.xml
Normal file
51
hadoop-tools/hadoop-aws/src/test/resources/core-site.xml
Normal file
@ -0,0 +1,51 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- Values used when running unit tests. Specify any values in here that
|
||||
should override the default values. -->
|
||||
|
||||
<configuration>
|
||||
|
||||
<property>
|
||||
<name>hadoop.tmp.dir</name>
|
||||
<value>target/build/test</value>
|
||||
<description>A base for other temporary directories.</description>
|
||||
<final>true</final>
|
||||
</property>
|
||||
|
||||
<!-- Turn security off for tests by default -->
|
||||
<property>
|
||||
<name>hadoop.security.authentication</name>
|
||||
<value>simple</value>
|
||||
</property>
|
||||
|
||||
<!--
|
||||
To run these tests.
|
||||
|
||||
# Create a file auth-keys.xml - DO NOT ADD TO REVISION CONTROL
|
||||
# add the property test.fs.s3n.name to point to an S3 filesystem URL
|
||||
# Add the credentials for the service you are testing against
|
||||
-->
|
||||
<include xmlns="http://www.w3.org/2001/XInclude"
|
||||
href="auth-keys.xml"/>
|
||||
|
||||
|
||||
|
||||
</configuration>
|
Loading…
Reference in New Issue
Block a user