diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 051eac19bb..c2ae5ede1a 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -342,6 +342,9 @@ Release 2.6.0 - UNRELEASED HADOOP-10893. isolated classloader on the client side (Sangjin Lee via jlowe) + HADOOP-10400. Incorporate new S3A FileSystem implementation. (Jordan + Mendelson and Dave Wang via atm) + IMPROVEMENTS HADOOP-10808. Remove unused native code for munlock. (cnauroth) diff --git a/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties b/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties index ef9acbf13b..5fa21fad9e 100644 --- a/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties +++ b/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties @@ -174,6 +174,11 @@ log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex} # Jets3t library log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR +# AWS SDK & S3A FileSystem +log4j.logger.com.amazonaws=ERROR +log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR +log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN + # # Event Counter Appender # Sends counts of logging messages at different severity levels to Hadoop Metrics. diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 3cc7545b94..828dec25ff 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -689,6 +689,92 @@ for ldap providers in the same way as above does. + + fs.s3a.access.key + AWS access key ID. Omit for Role-based authentication. + + + + fs.s3a.secret.key + AWS secret key. Omit for Role-based authentication. + + + + fs.s3a.connection.maximum + 15 + Controls the maximum number of simultaneous connections to S3. + + + + fs.s3a.connection.ssl.enabled + true + Enables or disables SSL connections to S3. + + + + fs.s3a.attempts.maximum + 10 + How many times we should retry commands on transient errors. + + + + fs.s3a.connection.timeout + 5000 + Socket connection timeout in seconds. + + + + fs.s3a.paging.maximum + 5000 + How many keys to request from S3 when doing + directory listings at a time. + + + + fs.s3a.multipart.size + 104857600 + How big (in bytes) to split upload or copy operations up into. + + + + fs.s3a.multipart.threshold + 2147483647 + Threshold before uploads or copies use parallel multipart operations. + + + + fs.s3a.acl.default + Set a canned ACL for newly created and copied objects. Value may be private, + public-read, public-read-write, authenticated-read, log-delivery-write, + bucket-owner-read, or bucket-owner-full-control. + + + + fs.s3a.multipart.purge + false + True if you want to purge existing multipart uploads that may not have been + completed/aborted correctly + + + + fs.s3a.multipart.purge.age + 86400 + Minimum age in seconds of multipart uploads to purge + + + + fs.s3a.buffer.dir + ${hadoop.tmp.dir}/s3a + Comma separated list of directories that will be used to buffer file + uploads to. + + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + The implementation class of the S3A Filesystem + + io.seqfile.compress.blocksize 1000000 diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml index 24fa87b8b5..a44f68625b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml @@ -130,6 +130,10 @@ net.java.dev.jets3t jets3t + + com.amazonaws + aws-java-sdk + org.eclipse.jdt core @@ -169,6 +173,10 @@ net.java.dev.jets3t jets3t + + com.amazonaws + aws-java-sdk + org.eclipse.jdt core diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index ad8422ff5e..502655f709 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -61,8 +61,9 @@ 1.9 - + 1.9.13 + 2.2.3 @@ -581,13 +582,7 @@ com.amazonaws aws-java-sdk - 1.7.2 - - - com.fasterxml.jackson.core - jackson-core - - + 1.7.4 org.apache.mina @@ -674,6 +669,21 @@ jackson-xc ${jackson.version} + + com.fasterxml.jackson.core + jackson-core + ${jackson2.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson2.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackson2.version} + org.mockito mockito-all diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index c01a33ddd4..61a5e84e9e 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -100,6 +100,16 @@ test-jar + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-annotations + + com.amazonaws aws-java-sdk diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java new file mode 100644 index 0000000000..2a242736bf --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AnonymousAWSCredentials; +import com.amazonaws.auth.AWSCredentials; + +public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider { + public AWSCredentials getCredentials() { + return new AnonymousAWSCredentials(); + } + + public void refresh() {} + + @Override + public String toString() { + return getClass().getSimpleName(); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java new file mode 100644 index 0000000000..8d45bc6e66 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.AWSCredentials; + +public class BasicAWSCredentialsProvider implements AWSCredentialsProvider { + private String accessKey; + private String secretKey; + + public BasicAWSCredentialsProvider(String accessKey, String secretKey) { + this.accessKey = accessKey; + this.secretKey = secretKey; + } + + public AWSCredentials getCredentials() { + if (accessKey != null && secretKey != null) { + return new BasicAWSCredentials(accessKey, secretKey); + } + + throw new AmazonClientException( + "Access key or secret key is null"); + } + + public void refresh() {} + + @Override + public String toString() { + return getClass().getSimpleName(); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java new file mode 100644 index 0000000000..9723b82724 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + + +public class Constants { + // s3 access key + public static final String OLD_ACCESS_KEY = "fs.s3a.awsAccessKeyId"; + public static final String NEW_ACCESS_KEY = "fs.s3a.access.key"; + + // s3 secret key + public static final String OLD_SECRET_KEY = "fs.s3a.awsSecretAccessKey"; + public static final String NEW_SECRET_KEY = "fs.s3a.secret.key"; + + // number of simultaneous connections to s3 + public static final String OLD_MAXIMUM_CONNECTIONS = "fs.s3a.maxConnections"; + public static final String NEW_MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum"; + public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15; + + // connect to s3 over ssl? + public static final String OLD_SECURE_CONNECTIONS = "fs.s3a.secureConnections"; + public static final String NEW_SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled"; + public static final boolean DEFAULT_SECURE_CONNECTIONS = true; + + // number of times we should retry errors + public static final String OLD_MAX_ERROR_RETRIES = "fs.s3a.maxErrorRetries"; + public static final String NEW_MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum"; + public static final int DEFAULT_MAX_ERROR_RETRIES = 10; + + // seconds until we give up on a connection to s3 + public static final String OLD_SOCKET_TIMEOUT = "fs.s3a.socketTimeout"; + public static final String NEW_SOCKET_TIMEOUT = "fs.s3a.connection.timeout"; + public static final int DEFAULT_SOCKET_TIMEOUT = 50000; + + // number of records to get while paging through a directory listing + public static final String OLD_MAX_PAGING_KEYS = "fs.s3a.maxPagingKeys"; + public static final String NEW_MAX_PAGING_KEYS = "fs.s3a.paging.maximum"; + public static final int DEFAULT_MAX_PAGING_KEYS = 5000; + + // size of each of or multipart pieces in bytes + public static final String OLD_MULTIPART_SIZE = "fs.s3a.multipartSize"; + public static final String NEW_MULTIPART_SIZE = "fs.s3a.multipart.size"; + public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB + + // minimum size in bytes before we start a multipart uploads or copy + public static final String OLD_MIN_MULTIPART_THRESHOLD = "fs.s3a.minMultipartSize"; + public static final String NEW_MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold"; + public static final int DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE; + + // comma separated list of directories + public static final String BUFFER_DIR = "fs.s3a.buffer.dir"; + + // private | public-read | public-read-write | authenticated-read | + // log-delivery-write | bucket-owner-read | bucket-owner-full-control + public static final String OLD_CANNED_ACL = "fs.s3a.cannedACL"; + public static final String NEW_CANNED_ACL = "fs.s3a.acl.default"; + public static final String DEFAULT_CANNED_ACL = ""; + + // should we try to purge old multipart uploads when starting up + public static final String OLD_PURGE_EXISTING_MULTIPART = "fs.s3a.purgeExistingMultiPart"; + public static final String NEW_PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge"; + public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false; + + // purge any multipart uploads older than this number of seconds + public static final String OLD_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.purgeExistingMultiPartAge"; + public static final String NEW_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age"; + public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400; + + // s3 server-side encryption + public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = + "fs.s3a.server-side-encryption-algorithm"; + + public static final String S3N_FOLDER_SUFFIX = "_$folder$"; +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java new file mode 100644 index 0000000000..eb6449264b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +public class S3AFileStatus extends FileStatus { + private boolean isEmptyDirectory; + + // Directories + public S3AFileStatus(boolean isdir, boolean isemptydir, Path path) { + super(0, isdir, 1, 0, 0, path); + isEmptyDirectory = isemptydir; + } + + // Files + public S3AFileStatus(long length, long modification_time, Path path) { + super(length, false, 1, 0, modification_time, path); + isEmptyDirectory = false; + } + + public boolean isEmptyDirectory() { + return isEmptyDirectory; + } + + /** Compare if this object is equal to another object + * @param o the object to be compared. + * @return true if two file status has the same path name; false if not. + */ + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + /** + * Returns a hash code value for the object, which is defined as + * the hash code of the path name. + * + * @return a hash code value for the path name. + */ + @Override + public int hashCode() { + return super.hashCode(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java new file mode 100644 index 0000000000..a597e62275 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -0,0 +1,1019 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentialsProviderChain; + +import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.transfer.Copy; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; +import com.amazonaws.services.s3.transfer.Upload; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.event.ProgressEvent; + +import org.apache.commons.lang.StringUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import static org.apache.hadoop.fs.s3a.Constants.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3AFileSystem extends FileSystem { + private URI uri; + private Path workingDir; + private AmazonS3Client s3; + private String bucket; + private int maxKeys; + private long partSize; + private int partSizeThreshold; + public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); + private CannedAccessControlList cannedACL; + private String serverSideEncryptionAlgorithm; + + + /** Called after a new FileSystem instance is constructed. + * @param name a uri whose authority section names the host, port, etc. + * for this FileSystem + * @param conf the configuration + */ + public void initialize(URI name, Configuration conf) throws IOException { + super.initialize(name, conf); + + + uri = URI.create(name.getScheme() + "://" + name.getAuthority()); + workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, + this.getWorkingDirectory()); + + // Try to get our credentials or just connect anonymously + String accessKey = conf.get(NEW_ACCESS_KEY, conf.get(OLD_ACCESS_KEY, null)); + String secretKey = conf.get(NEW_SECRET_KEY, conf.get(OLD_SECRET_KEY, null)); + + String userInfo = name.getUserInfo(); + if (userInfo != null) { + int index = userInfo.indexOf(':'); + if (index != -1) { + accessKey = userInfo.substring(0, index); + secretKey = userInfo.substring(index + 1); + } else { + accessKey = userInfo; + } + } + + AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain( + new BasicAWSCredentialsProvider(accessKey, secretKey), + new InstanceProfileCredentialsProvider(), + new AnonymousAWSCredentialsProvider() + ); + + bucket = name.getHost(); + + ClientConfiguration awsConf = new ClientConfiguration(); + awsConf.setMaxConnections(conf.getInt(NEW_MAXIMUM_CONNECTIONS, + conf.getInt(OLD_MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS))); + awsConf.setProtocol(conf.getBoolean(NEW_SECURE_CONNECTIONS, + conf.getBoolean(OLD_SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) ? + Protocol.HTTPS : Protocol.HTTP); + awsConf.setMaxErrorRetry(conf.getInt(NEW_MAX_ERROR_RETRIES, + conf.getInt(OLD_MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES))); + awsConf.setSocketTimeout(conf.getInt(NEW_SOCKET_TIMEOUT, + conf.getInt(OLD_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT))); + + s3 = new AmazonS3Client(credentials, awsConf); + + maxKeys = conf.getInt(NEW_MAX_PAGING_KEYS, + conf.getInt(OLD_MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS)); + partSize = conf.getLong(NEW_MULTIPART_SIZE, + conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE)); + partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD, + conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD)); + + if (partSize < 5 * 1024 * 1024) { + LOG.error(NEW_MULTIPART_SIZE + " must be at least 5 MB"); + partSize = 5 * 1024 * 1024; + } + + if (partSizeThreshold < 5 * 1024 * 1024) { + LOG.error(NEW_MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); + partSizeThreshold = 5 * 1024 * 1024; + } + + String cannedACLName = conf.get(NEW_CANNED_ACL, + conf.get(OLD_CANNED_ACL, DEFAULT_CANNED_ACL)); + if (!cannedACLName.isEmpty()) { + cannedACL = CannedAccessControlList.valueOf(cannedACLName); + } else { + cannedACL = null; + } + + if (!s3.doesBucketExist(bucket)) { + throw new IOException("Bucket " + bucket + " does not exist"); + } + + boolean purgeExistingMultipart = conf.getBoolean(NEW_PURGE_EXISTING_MULTIPART, + conf.getBoolean(OLD_PURGE_EXISTING_MULTIPART, DEFAULT_PURGE_EXISTING_MULTIPART)); + long purgeExistingMultipartAge = conf.getLong(NEW_PURGE_EXISTING_MULTIPART_AGE, + conf.getLong(OLD_PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE)); + + if (purgeExistingMultipart) { + TransferManager transferManager = new TransferManager(s3); + Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000); + + transferManager.abortMultipartUploads(bucket, purgeBefore); + transferManager.shutdownNow(false); + } + + serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); + + setConf(conf); + } + + /** + * Return the protocol scheme for the FileSystem. + * + * @return "s3a" + */ + public String getScheme() { + return "s3a"; + } + + /** Returns a URI whose scheme and authority identify this FileSystem.*/ + public URI getUri() { + return uri; + } + + + public S3AFileSystem() { + super(); + } + + /* Turns a path (relative or otherwise) into an S3 key + */ + private String pathToKey(Path path) { + if (!path.isAbsolute()) { + path = new Path(workingDir, path); + } + + if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { + return ""; + } + + return path.toUri().getPath().substring(1); + } + + private Path keyToPath(String key) { + return new Path("/" + key); + } + + /** + * Opens an FSDataInputStream at the indicated Path. + * @param f the file name to open + * @param bufferSize the size of the buffer to be used. + */ + public FSDataInputStream open(Path f, int bufferSize) + throws IOException { + + LOG.info("Opening '" + f + "' for reading"); + final FileStatus fileStatus = getFileStatus(f); + if (fileStatus.isDirectory()) { + throw new FileNotFoundException("Can't open " + f + " because it is a directory"); + } + + return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), + fileStatus.getLen(), s3, statistics)); + } + + /** + * Create an FSDataOutputStream at the indicated Path with write-progress + * reporting. + * @param f the file name to open + * @param permission + * @param overwrite if a file with this name already exists, then if true, + * the file will be overwritten, and if false an error will be thrown. + * @param bufferSize the size of the buffer to be used. + * @param replication required block replication for the file. + * @param blockSize + * @param progress + * @throws IOException + * @see #setPermission(Path, FsPermission) + */ + public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, + int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { + String key = pathToKey(f); + + if (!overwrite && exists(f)) { + throw new FileAlreadyExistsException(f + " already exists"); + } + + // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file + return new FSDataOutputStream(new S3AOutputStream(getConf(), s3, this, + bucket, key, progress, cannedACL, statistics, + serverSideEncryptionAlgorithm), null); + } + + /** + * Append to an existing file (optional operation). + * @param f the existing file to be appended. + * @param bufferSize the size of the buffer to be used. + * @param progress for reporting progress if it is not null. + * @throws IOException + */ + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) throws IOException { + throw new IOException("Not supported"); + } + + + /** + * Renames Path src to Path dst. Can take place on local fs + * or remote DFS. + * + * Warning: S3 does not support renames. This method does a copy which can + * take S3 some time to execute with large files and directories. Since + * there is no Progressable passed in, this can time out jobs. + * + * Note: This implementation differs with other S3 drivers. Specifically: + * Fails if src is a file and dst is a directory. + * Fails if src is a directory and dst is a file. + * Fails if the parent of dst does not exist or is a file. + * Fails if dst is a directory that is not empty. + * + * @param src path to be renamed + * @param dst new path after rename + * @throws IOException on failure + * @return true if rename is successful + */ + public boolean rename(Path src, Path dst) throws IOException { + LOG.info("Rename path " + src + " to " + dst); + + String srcKey = pathToKey(src); + String dstKey = pathToKey(dst); + + if (srcKey.length() == 0 || dstKey.length() == 0) { + LOG.info("rename: src or dst are empty"); + return false; + } + + if (srcKey.equals(dstKey)) { + LOG.info("rename: src and dst refer to the same file"); + return true; + } + + S3AFileStatus srcStatus; + try { + srcStatus = getFileStatus(src); + } catch (FileNotFoundException e) { + LOG.info("rename: src not found " + src); + return false; + } + + S3AFileStatus dstStatus = null; + try { + dstStatus = getFileStatus(dst); + + if (srcStatus.isFile() && dstStatus.isDirectory()) { + LOG.info("rename: src is a file and dst is a directory"); + return false; + } + + if (srcStatus.isDirectory() && dstStatus.isFile()) { + LOG.info("rename: src is a directory and dst is a file"); + return false; + } + + } catch (FileNotFoundException e) { + // Parent must exist + Path parent = dst.getParent(); + if (!pathToKey(parent).isEmpty()) { + try { + S3AFileStatus dstParentStatus = getFileStatus(dst.getParent()); + if (!dstParentStatus.isDirectory()) { + return false; + } + } catch (FileNotFoundException e2) { + return false; + } + } + } + + // Ok! Time to start + if (srcStatus.isFile()) { + if (LOG.isDebugEnabled()) { + LOG.debug("rename: renaming file " + src + " to " + dst); + } + copyFile(srcKey, dstKey); + delete(src, false); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("rename: renaming directory " + src + " to " + dst); + } + + // This is a directory to directory copy + if (!dstKey.endsWith("/")) { + dstKey = dstKey + "/"; + } + + if (!srcKey.endsWith("/")) { + srcKey = srcKey + "/"; + } + + List keysToDelete = + new ArrayList(); + if (dstStatus != null && dstStatus.isEmptyDirectory()) { + copyFile(srcKey, dstKey); + statistics.incrementWriteOps(1); + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(srcKey)); + } + + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(srcKey); + request.setMaxKeys(maxKeys); + + ObjectListing objects = s3.listObjects(request); + statistics.incrementReadOps(1); + + while (true) { + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); + String newDstKey = dstKey + summary.getKey().substring(srcKey.length()); + copyFile(summary.getKey(), newDstKey); + } + + if (objects.isTruncated()) { + objects = s3.listNextBatchOfObjects(objects); + statistics.incrementReadOps(1); + } else { + break; + } + } + + + if (!keysToDelete.isEmpty()) { + DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket); + deleteRequest.setKeys(keysToDelete); + s3.deleteObjects(deleteRequest); + statistics.incrementWriteOps(1); + } + } + + if (src.getParent() != dst.getParent()) { + deleteUnnecessaryFakeDirectories(dst.getParent()); + createFakeDirectoryIfNecessary(src.getParent()); + } + return true; + } + + /** Delete a file. + * + * @param f the path to delete. + * @param recursive if path is a directory and set to + * true, the directory is deleted else throws an exception. In + * case of a file the recursive can be set to either true or false. + * @return true if delete is successful else false. + * @throws IOException + */ + public boolean delete(Path f, boolean recursive) throws IOException { + LOG.info("Delete path " + f + " - recursive " + recursive); + S3AFileStatus status; + try { + status = getFileStatus(f); + } catch (FileNotFoundException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Couldn't delete " + f + " - does not exist"); + } + return false; + } + + String key = pathToKey(f); + + if (status.isDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("delete: Path is a directory"); + } + + if (!recursive && !status.isEmptyDirectory()) { + throw new IOException("Path is a folder: " + f + + " and it is not an empty directory"); + } + + if (!key.endsWith("/")) { + key = key + "/"; + } + + if (key.equals("/")) { + LOG.info("s3a cannot delete the root directory"); + return false; + } + + if (status.isEmptyDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting fake empty directory"); + } + s3.deleteObject(bucket, key); + statistics.incrementWriteOps(1); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Getting objects for directory prefix " + key + " to delete"); + } + + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(key); + // Hopefully not setting a delimiter will cause this to find everything + //request.setDelimiter("/"); + request.setMaxKeys(maxKeys); + + List keys = + new ArrayList(); + ObjectListing objects = s3.listObjects(request); + statistics.incrementReadOps(1); + while (true) { + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); + if (LOG.isDebugEnabled()) { + LOG.debug("Got object to delete " + summary.getKey()); + } + } + + DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket); + deleteRequest.setKeys(keys); + s3.deleteObjects(deleteRequest); + statistics.incrementWriteOps(1); + keys.clear(); + + if (objects.isTruncated()) { + objects = s3.listNextBatchOfObjects(objects); + statistics.incrementReadOps(1); + } else { + break; + } + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("delete: Path is a file"); + } + s3.deleteObject(bucket, key); + statistics.incrementWriteOps(1); + } + + createFakeDirectoryIfNecessary(f.getParent()); + + return true; + } + + private void createFakeDirectoryIfNecessary(Path f) throws IOException { + String key = pathToKey(f); + if (!key.isEmpty() && !exists(f)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating new fake directory at " + f); + } + createFakeDirectory(bucket, key); + } + } + + /** + * List the statuses of the files/directories in the given path if the path is + * a directory. + * + * @param f given path + * @return the statuses of the files/directories in the given patch + * @throws FileNotFoundException when the path does not exist; + * IOException see specific implementation + */ + public FileStatus[] listStatus(Path f) throws FileNotFoundException, + IOException { + String key = pathToKey(f); + LOG.info("List status for path: " + f); + + final List result = new ArrayList(); + final FileStatus fileStatus = getFileStatus(f); + + if (fileStatus.isDirectory()) { + if (!key.isEmpty()) { + key = key + "/"; + } + + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(key); + request.setDelimiter("/"); + request.setMaxKeys(maxKeys); + + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: doing listObjects for directory " + key); + } + + ObjectListing objects = s3.listObjects(request); + statistics.incrementReadOps(1); + + while (true) { + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir); + // Skip over keys that are ourselves and old S3N _$folder$ files + if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring: " + keyPath); + } + continue; + } + + if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) { + result.add(new S3AFileStatus(true, true, keyPath)); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: fd: " + keyPath); + } + } else { + result.add(new S3AFileStatus(summary.getSize(), + dateToLong(summary.getLastModified()), keyPath)); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: fi: " + keyPath); + } + } + } + + for (String prefix : objects.getCommonPrefixes()) { + Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); + if (keyPath.equals(f)) { + continue; + } + result.add(new S3AFileStatus(true, false, keyPath)); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: rd: " + keyPath); + } + } + + if (objects.isTruncated()) { + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: list truncated - getting next batch"); + } + + objects = s3.listNextBatchOfObjects(objects); + statistics.incrementReadOps(1); + } else { + break; + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: rd (not a dir): " + f); + } + result.add(fileStatus); + } + + return result.toArray(new FileStatus[result.size()]); + } + + + + /** + * Set the current working directory for the given file system. All relative + * paths will be resolved relative to it. + * + * @param new_dir + */ + public void setWorkingDirectory(Path new_dir) { + workingDir = new_dir; + } + + /** + * Get the current working directory for the given file system + * @return the directory pathname + */ + public Path getWorkingDirectory() { + return workingDir; + } + + /** + * Make the given file and all non-existent parents into + * directories. Has the semantics of Unix 'mkdir -p'. + * Existence of the directory hierarchy is not an error. + * @param f path to create + * @param permission to apply to f + */ + // TODO: If we have created an empty file at /foo/bar and we then call + // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/? + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + LOG.info("Making directory: " + f); + + try { + FileStatus fileStatus = getFileStatus(f); + + if (fileStatus.isDirectory()) { + return true; + } else { + throw new FileAlreadyExistsException("Path is a file: " + f); + } + } catch (FileNotFoundException e) { + Path fPart = f; + do { + try { + FileStatus fileStatus = getFileStatus(fPart); + if (fileStatus.isFile()) { + throw new FileAlreadyExistsException(String.format( + "Can't make directory for path '%s' since it is a file.", + fPart)); + } + } catch (FileNotFoundException fnfe) { + } + fPart = fPart.getParent(); + } while (fPart != null); + + String key = pathToKey(f); + createFakeDirectory(bucket, key); + return true; + } + } + + /** + * Return a file status object that represents the path. + * @param f The path we want information from + * @return a FileStatus object + * @throws java.io.FileNotFoundException when the path does not exist; + * IOException see specific implementation + */ + public S3AFileStatus getFileStatus(Path f) throws IOException { + String key = pathToKey(f); + + LOG.info("Getting path status for " + f + " (" + key + ")"); + + if (!key.isEmpty()) { + try { + ObjectMetadata meta = s3.getObjectMetadata(bucket, key); + statistics.incrementReadOps(1); + + if (objectRepresentsDirectory(key, meta.getContentLength())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found exact file: fake directory"); + } + return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Found exact file: normal file"); + } + return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()), + f.makeQualified(uri, workingDir)); + } + } catch (AmazonServiceException e) { + if (e.getStatusCode() != 404) { + printAmazonServiceException(e); + throw e; + } + } catch (AmazonClientException e) { + printAmazonClientException(e); + throw e; + } + + // Necessary? + if (!key.endsWith("/")) { + try { + String newKey = key + "/"; + ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey); + statistics.incrementReadOps(1); + + if (objectRepresentsDirectory(newKey, meta.getContentLength())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found file (with /): fake directory"); + } + return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); + } else { + LOG.warn("Found file (with /): real file? should not happen: " + key); + + return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()), + f.makeQualified(uri, workingDir)); + } + } catch (AmazonServiceException e) { + if (e.getStatusCode() != 404) { + printAmazonServiceException(e); + throw e; + } + } catch (AmazonClientException e) { + printAmazonClientException(e); + throw e; + } + } + } + + try { + if (!key.isEmpty() && !key.endsWith("/")) { + key = key + "/"; + } + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(key); + request.setDelimiter("/"); + request.setMaxKeys(1); + + ObjectListing objects = s3.listObjects(request); + statistics.incrementReadOps(1); + + if (objects.getCommonPrefixes().size() > 0 || objects.getObjectSummaries().size() > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found path as directory (with /): " + + objects.getCommonPrefixes().size() + "/" + + objects.getObjectSummaries().size()); + + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize()); + } + for (String prefix : objects.getCommonPrefixes()) { + LOG.debug("Prefix: " + prefix); + } + } + + return new S3AFileStatus(true, false, f.makeQualified(uri, workingDir)); + } + } catch (AmazonServiceException e) { + if (e.getStatusCode() != 404) { + printAmazonServiceException(e); + throw e; + } + } catch (AmazonClientException e) { + printAmazonClientException(e); + throw e; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Not Found: " + f); + } + throw new FileNotFoundException("No such file or directory: " + f); + } + + /** + * The src file is on the local disk. Add it to FS at + * the given dst name. + * + * This version doesn't need to create a temporary file to calculate the md5. + * Sadly this doesn't seem to be used by the shell cp :( + * + * delSrc indicates if the source should be removed + * @param delSrc whether to delete the src + * @param overwrite whether to overwrite an existing file + * @param src path + * @param dst path + */ + @Override + public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, + Path dst) throws IOException { + String key = pathToKey(dst); + + if (!overwrite && exists(dst)) { + throw new IOException(dst + " already exists"); + } + + LOG.info("Copying local file from " + src + " to " + dst); + + // Since we have a local file, we don't need to stream into a temporary file + LocalFileSystem local = getLocal(getConf()); + File srcfile = local.pathToFile(src); + + TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); + transferConfiguration.setMinimumUploadPartSize(partSize); + transferConfiguration.setMultipartUploadThreshold(partSizeThreshold); + + TransferManager transfers = new TransferManager(s3); + transfers.setConfiguration(transferConfiguration); + + final ObjectMetadata om = new ObjectMetadata(); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + om.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile); + putObjectRequest.setCannedAcl(cannedACL); + putObjectRequest.setMetadata(om); + + ProgressListener progressListener = new ProgressListener() { + public void progressChanged(ProgressEvent progressEvent) { + switch (progressEvent.getEventCode()) { + case ProgressEvent.PART_COMPLETED_EVENT_CODE: + statistics.incrementWriteOps(1); + break; + } + } + }; + + Upload up = transfers.upload(putObjectRequest); + up.addProgressListener(progressListener); + try { + up.waitForUploadResult(); + statistics.incrementWriteOps(1); + } catch (InterruptedException e) { + throw new IOException("Got interrupted, cancelling"); + } finally { + transfers.shutdownNow(false); + } + + // This will delete unnecessary fake parent directories + finishedWrite(key); + + if (delSrc) { + local.delete(src, false); + } + } + + /** + * Override getCononicalServiceName because we don't support token in S3A + */ + @Override + public String getCanonicalServiceName() { + // Does not support Token + return null; + } + + private void copyFile(String srcKey, String dstKey) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("copyFile " + srcKey + " -> " + dstKey); + } + + TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); + transferConfiguration.setMultipartCopyPartSize(partSize); + + TransferManager transfers = new TransferManager(s3); + transfers.setConfiguration(transferConfiguration); + + ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); + final ObjectMetadata dstom = srcom.clone(); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + dstom.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey); + copyObjectRequest.setCannedAccessControlList(cannedACL); + copyObjectRequest.setNewObjectMetadata(dstom); + + ProgressListener progressListener = new ProgressListener() { + public void progressChanged(ProgressEvent progressEvent) { + switch (progressEvent.getEventCode()) { + case ProgressEvent.PART_COMPLETED_EVENT_CODE: + statistics.incrementWriteOps(1); + break; + } + } + }; + + Copy copy = transfers.copy(copyObjectRequest); + copy.addProgressListener(progressListener); + try { + copy.waitForCopyResult(); + statistics.incrementWriteOps(1); + } catch (InterruptedException e) { + throw new IOException("Got interrupted, cancelling"); + } finally { + transfers.shutdownNow(false); + } + } + + private boolean objectRepresentsDirectory(final String name, final long size) { + return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L; + } + + // Handles null Dates that can be returned by AWS + private static long dateToLong(final Date date) { + if (date == null) { + return 0L; + } + + return date.getTime(); + } + + public void finishedWrite(String key) throws IOException { + deleteUnnecessaryFakeDirectories(keyToPath(key).getParent()); + } + + private void deleteUnnecessaryFakeDirectories(Path f) throws IOException { + while (true) { + try { + String key = pathToKey(f); + if (key.isEmpty()) { + break; + } + + S3AFileStatus status = getFileStatus(f); + + if (status.isDirectory() && status.isEmptyDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting fake directory " + key + "/"); + } + s3.deleteObject(bucket, key + "/"); + statistics.incrementWriteOps(1); + } + } catch (FileNotFoundException e) { + } catch (AmazonServiceException e) {} + + if (f.isRoot()) { + break; + } + + f = f.getParent(); + } + } + + + private void createFakeDirectory(final String bucketName, final String objectName) + throws AmazonClientException, AmazonServiceException { + if (!objectName.endsWith("/")) { + createEmptyObject(bucketName, objectName + "/"); + } else { + createEmptyObject(bucketName, objectName); + } + } + + // Used to create an empty file that represents an empty directory + private void createEmptyObject(final String bucketName, final String objectName) + throws AmazonClientException, AmazonServiceException { + final InputStream im = new InputStream() { + @Override + public int read() throws IOException { + return -1; + } + }; + + final ObjectMetadata om = new ObjectMetadata(); + om.setContentLength(0L); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + om.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om); + putObjectRequest.setCannedAcl(cannedACL); + s3.putObject(putObjectRequest); + statistics.incrementWriteOps(1); + } + + /** + * Return the number of bytes that large input files should be optimally + * be split into to minimize i/o time. + * @deprecated use {@link #getDefaultBlockSize(Path)} instead + */ + @Deprecated + public long getDefaultBlockSize() { + // default to 32MB: large enough to minimize the impact of seeks + return getConf().getLong("fs.s3a.block.size", 32 * 1024 * 1024); + } + + private void printAmazonServiceException(AmazonServiceException ase) { + LOG.info("Caught an AmazonServiceException, which means your request made it " + + "to Amazon S3, but was rejected with an error response for some reason."); + LOG.info("Error Message: " + ase.getMessage()); + LOG.info("HTTP Status Code: " + ase.getStatusCode()); + LOG.info("AWS Error Code: " + ase.getErrorCode()); + LOG.info("Error Type: " + ase.getErrorType()); + LOG.info("Request ID: " + ase.getRequestId()); + LOG.info("Class Name: " + ase.getClass().getName()); + } + + private void printAmazonClientException(AmazonClientException ace) { + LOG.info("Caught an AmazonClientException, which means the client encountered " + + "a serious internal problem while trying to communicate with S3, " + + "such as not being able to access the network."); + LOG.info("Error Message: " + ace.getMessage()); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java new file mode 100644 index 0000000000..f65a5b0c5b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem; + +import org.slf4j.Logger; + +import java.io.EOFException; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.net.SocketException; + +public class S3AInputStream extends FSInputStream { + private long pos; + private boolean closed; + private S3ObjectInputStream wrappedStream; + private S3Object wrappedObject; + private FileSystem.Statistics stats; + private AmazonS3Client client; + private String bucket; + private String key; + private long contentLength; + public static final Logger LOG = S3AFileSystem.LOG; + + + public S3AInputStream(String bucket, String key, long contentLength, AmazonS3Client client, + FileSystem.Statistics stats) { + this.bucket = bucket; + this.key = key; + this.contentLength = contentLength; + this.client = client; + this.stats = stats; + this.pos = 0; + this.closed = false; + this.wrappedObject = null; + this.wrappedStream = null; + } + + private void openIfNeeded() throws IOException { + if (wrappedObject == null) { + reopen(0); + } + } + + private synchronized void reopen(long pos) throws IOException { + if (wrappedStream != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Aborting old stream to open at pos " + pos); + } + wrappedStream.abort(); + } + + if (pos < 0) { + throw new EOFException("Trying to seek to a negative offset " + pos); + } + + if (contentLength > 0 && pos > contentLength-1) { + throw new EOFException("Trying to seek to an offset " + pos + + " past the end of the file"); + } + + LOG.info("Actually opening file " + key + " at pos " + pos); + + GetObjectRequest request = new GetObjectRequest(bucket, key); + request.setRange(pos, contentLength-1); + + wrappedObject = client.getObject(request); + wrappedStream = wrappedObject.getObjectContent(); + + if (wrappedStream == null) { + throw new IOException("Null IO stream"); + } + + this.pos = pos; + } + + @Override + public synchronized long getPos() throws IOException { + return pos; + } + + @Override + public synchronized void seek(long pos) throws IOException { + if (this.pos == pos) { + return; + } + + LOG.info("Reopening " + this.key + " to seek to new offset " + (pos - this.pos)); + reopen(pos); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public synchronized int read() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + openIfNeeded(); + + int byteRead; + try { + byteRead = wrappedStream.read(); + } catch (SocketTimeoutException e) { + LOG.info("Got timeout while trying to read from stream, trying to recover " + e); + reopen(pos); + byteRead = wrappedStream.read(); + } catch (SocketException e) { + LOG.info("Got socket exception while trying to read from stream, trying to recover " + e); + reopen(pos); + byteRead = wrappedStream.read(); + } + + if (byteRead >= 0) { + pos++; + } + + if (stats != null && byteRead >= 0) { + stats.incrementBytesRead(1); + } + return byteRead; + } + + @Override + public synchronized int read(byte buf[], int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + openIfNeeded(); + + int byteRead; + try { + byteRead = wrappedStream.read(buf, off, len); + } catch (SocketTimeoutException e) { + LOG.info("Got timeout while trying to read from stream, trying to recover " + e); + reopen(pos); + byteRead = wrappedStream.read(buf, off, len); + } catch (SocketException e) { + LOG.info("Got socket exception while trying to read from stream, trying to recover " + e); + reopen(pos); + byteRead = wrappedStream.read(buf, off, len); + } + + if (byteRead > 0) { + pos += byteRead; + } + + if (stats != null && byteRead > 0) { + stats.incrementBytesRead(byteRead); + } + + return byteRead; + } + + @Override + public synchronized void close() throws IOException { + super.close(); + closed = true; + if (wrappedObject != null) { + wrappedObject.close(); + } + } + + @Override + public synchronized int available() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + long remaining = this.contentLength - this.pos; + if (remaining > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int)remaining; + } + + @Override + public boolean markSupported() { + return false; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java new file mode 100644 index 0000000000..bdb723e920 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; +import com.amazonaws.services.s3.transfer.Upload; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.util.Progressable; + +import org.slf4j.Logger; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import static org.apache.hadoop.fs.s3a.Constants.*; + +public class S3AOutputStream extends OutputStream { + private OutputStream backupStream; + private File backupFile; + private boolean closed; + private String key; + private String bucket; + private AmazonS3Client client; + private Progressable progress; + private long partSize; + private int partSizeThreshold; + private S3AFileSystem fs; + private CannedAccessControlList cannedACL; + private FileSystem.Statistics statistics; + private LocalDirAllocator lDirAlloc; + private String serverSideEncryptionAlgorithm; + + public static final Logger LOG = S3AFileSystem.LOG; + + public S3AOutputStream(Configuration conf, AmazonS3Client client, + S3AFileSystem fs, String bucket, String key, Progressable progress, + CannedAccessControlList cannedACL, FileSystem.Statistics statistics, + String serverSideEncryptionAlgorithm) + throws IOException { + this.bucket = bucket; + this.key = key; + this.client = client; + this.progress = progress; + this.fs = fs; + this.cannedACL = cannedACL; + this.statistics = statistics; + this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; + + partSize = conf.getLong(NEW_MULTIPART_SIZE, + conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE)); + partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD, + conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD)); + + if (conf.get(BUFFER_DIR, null) != null) { + lDirAlloc = new LocalDirAllocator(BUFFER_DIR); + } else { + lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a"); + } + + backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); + closed = false; + + LOG.info("OutputStream for key '" + key + "' writing to tempfile: " + this.backupFile); + + this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); + } + + @Override + public void flush() throws IOException { + backupStream.flush(); + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + backupStream.close(); + LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload"); + LOG.info("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold); + + + try { + TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); + transferConfiguration.setMinimumUploadPartSize(partSize); + transferConfiguration.setMultipartUploadThreshold(partSizeThreshold); + + TransferManager transfers = new TransferManager(client); + transfers.setConfiguration(transferConfiguration); + + final ObjectMetadata om = new ObjectMetadata(); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + om.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, backupFile); + putObjectRequest.setCannedAcl(cannedACL); + putObjectRequest.setMetadata(om); + + Upload upload = transfers.upload(putObjectRequest); + + ProgressableProgressListener listener = + new ProgressableProgressListener(upload, progress, statistics); + upload.addProgressListener(listener); + + upload.waitForUploadResult(); + + long delta = upload.getProgress().getBytesTransferred() - listener.getLastBytesTransferred(); + if (statistics != null && delta != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("S3A write delta changed after finished: " + delta + " bytes"); + } + statistics.incrementBytesWritten(delta); + } + + // This will delete unnecessary fake parent directories + fs.finishedWrite(key); + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + if (!backupFile.delete()) { + LOG.warn("Could not delete temporary s3a file: " + backupFile); + } + super.close(); + closed = true; + } + + LOG.info("OutputStream for key '" + key + "' upload complete"); + } + + @Override + public void write(int b) throws IOException { + backupStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + backupStream.write(b, off, len); + } + + public static class ProgressableProgressListener implements ProgressListener { + private Progressable progress; + private FileSystem.Statistics statistics; + private long lastBytesTransferred; + private Upload upload; + + public ProgressableProgressListener(Upload upload, Progressable progress, + FileSystem.Statistics statistics) { + this.upload = upload; + this.progress = progress; + this.statistics = statistics; + this.lastBytesTransferred = 0; + } + + public void progressChanged(ProgressEvent progressEvent) { + if (progress != null) { + progress.progress(); + } + + // There are 3 http ops here, but this should be close enough for now + if (progressEvent.getEventCode() == ProgressEvent.PART_STARTED_EVENT_CODE || + progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) { + statistics.incrementWriteOps(1); + } + + long transferred = upload.getProgress().getBytesTransferred(); + long delta = transferred - lastBytesTransferred; + if (statistics != null && delta != 0) { + statistics.incrementBytesWritten(delta); + } + + lastBytesTransferred = transferred; + } + + public long getLastBytesTransferred() { + return lastBytesTransferred; + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem index 3cd1d6b2b8..0e3c42af66 100644 --- a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem +++ b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -15,3 +15,4 @@ org.apache.hadoop.fs.s3.S3FileSystem org.apache.hadoop.fs.s3native.NativeS3FileSystem +org.apache.hadoop.fs.s3a.S3AFileSystem diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java new file mode 100644 index 0000000000..cbdb3bdb2c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractBondedFSContract; + +/** + * The contract of S3A: only enabled if the test bucket is provided + */ +public class S3AContract extends AbstractBondedFSContract { + + public static final String CONTRACT_XML = "contract/s3a.xml"; + + + public S3AContract(Configuration conf) { + super(conf); + //insert the base features + addConfResource(CONTRACT_XML); + } + + @Override + public String getScheme() { + return "s3a"; + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java new file mode 100644 index 0000000000..1d95ddfdb4 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractCreateTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; + +public class TestS3AContractCreate extends AbstractContractCreateTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + @Override + public void testOverwriteEmptyDirectory() throws Throwable { + ContractTestUtils.skip( + "blobstores can't distinguish empty directories from files"); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java new file mode 100644 index 0000000000..733a517b8e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractDeleteTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestS3AContractDelete extends AbstractContractDeleteTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java new file mode 100644 index 0000000000..a312782bcb --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractMkdirTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Test dir operations on S3 + */ +public class TestS3AContractMkdir extends AbstractContractMkdirTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java new file mode 100644 index 0000000000..f735deb145 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractOpenTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestS3AContractOpen extends AbstractContractOpenTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java new file mode 100644 index 0000000000..88ed6d6a7b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRenameTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; + +public class TestS3AContractRename extends AbstractContractRenameTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + @Override + public void testRenameDirIntoExistingDir() throws Throwable { + describe("Verify renaming a dir into an existing dir puts the files" + +" from the source dir into the existing dir" + +" and leaves existing files alone"); + FileSystem fs = getFileSystem(); + String sourceSubdir = "source"; + Path srcDir = path(sourceSubdir); + Path srcFilePath = new Path(srcDir, "source-256.txt"); + byte[] srcDataset = dataset(256, 'a', 'z'); + writeDataset(fs, srcFilePath, srcDataset, srcDataset.length, 1024, false); + Path destDir = path("dest"); + + Path destFilePath = new Path(destDir, "dest-512.txt"); + byte[] destDateset = dataset(512, 'A', 'Z'); + writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024, false); + assertIsFile(destFilePath); + + boolean rename = fs.rename(srcDir, destDir); + Path renamedSrcFilePath = new Path(destDir, "source-256.txt"); + assertIsFile(destFilePath); + assertIsFile(renamedSrcFilePath); + ContractTestUtils.verifyFileContents(fs, destFilePath, destDateset); + assertTrue("rename returned false though the contents were copied", rename); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java new file mode 100644 index 0000000000..5e2352c7eb --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * root dir operations against an S3 bucket + */ +public class TestS3AContractRootDir extends + AbstractContractRootDirectoryTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractSeek.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractSeek.java new file mode 100644 index 0000000000..d677ec423f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractSeek.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractSeekTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestS3AContractSeek extends AbstractContractSeekTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3AFileSystemContractBaseTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3AFileSystemContractBaseTest.java new file mode 100644 index 0000000000..8455233466 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3AFileSystemContractBaseTest.java @@ -0,0 +1,327 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import static org.junit.Assume.*; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystemContractBaseTest; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.UUID; + +/** + * Tests a live S3 system. If you keys and bucket aren't specified, all tests + * are marked as passed + * + * This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from + * TestCase which uses the old Junit3 runner that doesn't ignore assumptions + * properly making it impossible to skip the tests if we don't have a valid + * bucket. + **/ +public class S3AFileSystemContractBaseTest extends FileSystemContractBaseTest { + private static final int TEST_BUFFER_SIZE = 128; + private static final int MODULUS = 128; + + protected static final Logger LOG = LoggerFactory.getLogger(S3AFileSystemContractBaseTest.class); + + @Override + public void setUp() throws Exception { + Configuration conf = new Configuration(); + + URI testURI = URI.create(conf.get("test.fs.s3a.name")); + + boolean liveTest = testURI != null && !testURI.equals("s3a:///"); + + // This doesn't work with our JUnit 3 style test cases, so instead we'll + // make this whole class not run by default + assumeTrue(liveTest); + + fs = new S3AFileSystem(); + fs.initialize(testURI, conf); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + if (fs != null) { + fs.delete(path("/tests3a"), true); + } + super.tearDown(); + } + + @Test(timeout = 10000) + public void testMkdirs() throws IOException { + // No trailing slash + assertTrue(fs.mkdirs(path("/tests3a/a"))); + assertTrue(fs.exists(path("/tests3a/a"))); + + // With trailing slash + assertTrue(fs.mkdirs(path("/tests3a/b/"))); + assertTrue(fs.exists(path("/tests3a/b/"))); + + // Two levels deep + assertTrue(fs.mkdirs(path("/tests3a/c/a/"))); + assertTrue(fs.exists(path("/tests3a/c/a/"))); + + // Mismatched slashes + assertTrue(fs.exists(path("/tests3a/c/a"))); + } + + + @Test(timeout=20000) + public void testDelete() throws IOException { + // Test deleting an empty directory + assertTrue(fs.mkdirs(path("/tests3a/d"))); + assertTrue(fs.delete(path("/tests3a/d"), true)); + assertFalse(fs.exists(path("/tests3a/d"))); + + // Test deleting a deep empty directory + assertTrue(fs.mkdirs(path("/tests3a/e/f/g/h"))); + assertTrue(fs.delete(path("/tests3a/e/f/g"), true)); + assertFalse(fs.exists(path("/tests3a/e/f/g/h"))); + assertFalse(fs.exists(path("/tests3a/e/f/g"))); + assertTrue(fs.exists(path("/tests3a/e/f"))); + + // Test delete of just a file + writeFile(path("/tests3a/f/f/file"), 1000); + assertTrue(fs.exists(path("/tests3a/f/f/file"))); + assertTrue(fs.delete(path("/tests3a/f/f/file"), false)); + assertFalse(fs.exists(path("/tests3a/f/f/file"))); + + + // Test delete of a path with files in various directories + writeFile(path("/tests3a/g/h/i/file"), 1000); + assertTrue(fs.exists(path("/tests3a/g/h/i/file"))); + writeFile(path("/tests3a/g/h/j/file"), 1000); + assertTrue(fs.exists(path("/tests3a/g/h/j/file"))); + try { + assertFalse(fs.delete(path("/tests3a/g/h"), false)); + fail("Expected delete to fail with recursion turned off"); + } catch (IOException e) {} + assertTrue(fs.exists(path("/tests3a/g/h/j/file"))); + assertTrue(fs.delete(path("/tests3a/g/h"), true)); + assertFalse(fs.exists(path("/tests3a/g/h/j"))); + } + + + @Test(timeout = 3600000) + public void testOpenCreate() throws IOException { + try { + createAndReadFileTest(1024); + } catch (IOException e) { + fail(e.getMessage()); + } + + try { + createAndReadFileTest(5 * 1024 * 1024); + } catch (IOException e) { + fail(e.getMessage()); + } + + try { + createAndReadFileTest(20 * 1024 * 1024); + } catch (IOException e) { + fail(e.getMessage()); + } + + /* + Enable to test the multipart upload + try { + createAndReadFileTest((long)6 * 1024 * 1024 * 1024); + } catch (IOException e) { + fail(e.getMessage()); + } + */ + } + + @Test(timeout = 1200000) + public void testRenameFile() throws IOException { + Path srcPath = path("/tests3a/a/srcfile"); + + final OutputStream outputStream = fs.create(srcPath, false); + generateTestData(outputStream, 11 * 1024 * 1024); + outputStream.close(); + + assertTrue(fs.exists(srcPath)); + + Path dstPath = path("/tests3a/b/dstfile"); + + assertFalse(fs.rename(srcPath, dstPath)); + assertTrue(fs.mkdirs(dstPath.getParent())); + assertTrue(fs.rename(srcPath, dstPath)); + assertTrue(fs.exists(dstPath)); + assertFalse(fs.exists(srcPath)); + assertTrue(fs.exists(srcPath.getParent())); + } + + + @Test(timeout = 10000) + public void testRenameDirectory() throws IOException { + Path srcPath = path("/tests3a/a"); + + assertTrue(fs.mkdirs(srcPath)); + writeFile(new Path(srcPath, "b/testfile"), 1024); + + Path nonEmptyPath = path("/tests3a/nonempty"); + writeFile(new Path(nonEmptyPath, "b/testfile"), 1024); + + assertFalse(fs.rename(srcPath, nonEmptyPath)); + + Path dstPath = path("/tests3a/b"); + assertTrue(fs.rename(srcPath, dstPath)); + assertFalse(fs.exists(srcPath)); + assertTrue(fs.exists(new Path(dstPath, "b/testfile"))); + } + + + @Test(timeout=10000) + public void testSeek() throws IOException { + Path path = path("/tests3a/testfile.seek"); + writeFile(path, TEST_BUFFER_SIZE * 10); + + + FSDataInputStream inputStream = fs.open(path, TEST_BUFFER_SIZE); + inputStream.seek(inputStream.getPos() + MODULUS); + + testReceivedData(inputStream, TEST_BUFFER_SIZE * 10 - MODULUS); + } + + /** + * Creates and reads a file with the given size in S3. The test file is + * generated according to a specific pattern. + * During the read phase the incoming data stream is also checked against this pattern. + * + * @param fileSize + * the size of the file to be generated in bytes + * @throws IOException + * thrown if an I/O error occurs while writing or reading the test file + */ + private void createAndReadFileTest(final long fileSize) throws IOException { + final String objectName = UUID.randomUUID().toString(); + final Path objectPath = new Path("/tests3a/", objectName); + + // Write test file to S3 + final OutputStream outputStream = fs.create(objectPath, false); + generateTestData(outputStream, fileSize); + outputStream.close(); + + // Now read the same file back from S3 + final InputStream inputStream = fs.open(objectPath); + testReceivedData(inputStream, fileSize); + inputStream.close(); + + // Delete test file + fs.delete(objectPath, false); + } + + + /** + * Receives test data from the given input stream and checks the size of the + * data as well as the pattern inside the received data. + * + * @param inputStream + * the input stream to read the test data from + * @param expectedSize + * the expected size of the data to be read from the input stream in bytes + * @throws IOException + * thrown if an error occurs while reading the data + */ + private void testReceivedData(final InputStream inputStream, + final long expectedSize) throws IOException { + final byte[] testBuffer = new byte[TEST_BUFFER_SIZE]; + + long totalBytesRead = 0; + int nextExpectedNumber = 0; + while (true) { + final int bytesRead = inputStream.read(testBuffer); + if (bytesRead < 0) { + break; + } + + totalBytesRead += bytesRead; + + for (int i = 0; i < bytesRead; ++i) { + if (testBuffer[i] != nextExpectedNumber) { + throw new IOException("Read number " + testBuffer[i] + " but expected " + + nextExpectedNumber); + } + + ++nextExpectedNumber; + + if (nextExpectedNumber == MODULUS) { + nextExpectedNumber = 0; + } + } + } + + if (totalBytesRead != expectedSize) { + throw new IOException("Expected to read " + expectedSize + + " bytes but only received " + totalBytesRead); + } + } + + + /** + * Generates test data of the given size according to some specific pattern + * and writes it to the provided output stream. + * + * @param outputStream + * the output stream to write the data to + * @param size + * the size of the test data to be generated in bytes + * @throws IOException + * thrown if an error occurs while writing the data + */ + private void generateTestData(final OutputStream outputStream, + final long size) throws IOException { + + final byte[] testBuffer = new byte[TEST_BUFFER_SIZE]; + for (int i = 0; i < testBuffer.length; ++i) { + testBuffer[i] = (byte) (i % MODULUS); + } + + long bytesWritten = 0; + while (bytesWritten < size) { + + final long diff = size - bytesWritten; + if (diff < testBuffer.length) { + outputStream.write(testBuffer, 0, (int)diff); + bytesWritten += diff; + } else { + outputStream.write(testBuffer); + bytesWritten += testBuffer.length; + } + } + } + + private void writeFile(Path name, int fileSize) throws IOException { + final OutputStream outputStream = fs.create(name, false); + generateTestData(outputStream, fileSize); + outputStream.close(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml new file mode 100644 index 0000000000..4142471d17 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml @@ -0,0 +1,105 @@ + + + + + + + fs.contract.test.root-tests-enabled + true + + + + fs.contract.test.random-seek-count + 10 + + + + fs.contract.is-blobstore + true + + + + fs.contract.is-case-sensitive + true + + + + fs.contract.rename-returns-false-if-source-missing + true + + + + fs.contract.supports-append + false + + + + fs.contract.supports-atomic-directory-delete + false + + + + fs.contract.supports-atomic-rename + false + + + + fs.contract.supports-block-locality + false + + + + fs.contract.supports-concat + false + + + + fs.contract.supports-seek + true + + + + fs.contract.supports-seek-on-closed-file + true + + + + fs.contract.rejects-seek-past-eof + true + + + + fs.contract.supports-strict-exceptions + true + + + + fs.contract.supports-unix-permissions + false + + + + fs.contract.rename-overwrites-dest + true + + + diff --git a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3n.xml b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3n.xml index ab46178090..cb8aca7d64 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3n.xml +++ b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3n.xml @@ -77,6 +77,11 @@ true + + fs.contract.supports-seek-on-closed-file + true + + fs.contract.rejects-seek-past-eof true @@ -92,4 +97,4 @@ false - \ No newline at end of file + diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index a20fb86703..fbdc132e28 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -132,14 +132,8 @@ - org.codehaus.jackson - jackson-mapper-asl - compile - - - - org.codehaus.jackson - jackson-core-asl + com.fasterxml.jackson.core + jackson-core compile