From d33e928fbeb1764a724c8f3c051bb0d8be82bbff Mon Sep 17 00:00:00 2001 From: Mingfei Date: Fri, 26 Aug 2016 11:00:03 +0800 Subject: [PATCH] HADOOP-13529. Do some code refactoring. Contributed by Genmao Yu. --- hadoop-tools/hadoop-aliyun/pom.xml | 23 +- .../fs/aliyun/oss/AliyunOSSFileSystem.java | 455 +++------------- .../aliyun/oss/AliyunOSSFileSystemStore.java | 486 ++++++++++++++++++ .../fs/aliyun/oss/AliyunOSSInputStream.java | 60 +-- .../fs/aliyun/oss/AliyunOSSOutputStream.java | 129 +---- .../hadoop/fs/aliyun/oss/AliyunOSSUtils.java | 193 +++---- .../hadoop/fs/aliyun/oss/Constants.java | 3 +- .../aliyun/oss/TestOSSFileSystemContract.java | 10 - .../fs/aliyun/oss/TestOSSFileSystemStore.java | 121 +++++ .../fs/aliyun/oss/contract/OSSContract.java | 1 - .../oss/contract/TestOSSContractDispCp.java | 44 ++ .../TestOSSContractGetFileStatus.java | 35 ++ .../oss/contract/TestOSSContractRootDir.java | 69 +++ 13 files changed, 968 insertions(+), 661 deletions(-) create mode 100644 hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemStore.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractDispCp.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractGetFileStatus.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractRootDir.java diff --git a/hadoop-tools/hadoop-aliyun/pom.xml b/hadoop-tools/hadoop-aliyun/pom.xml index c87d13f24c..358b18b1c4 100644 --- a/hadoop-tools/hadoop-aliyun/pom.xml +++ b/hadoop-tools/hadoop-aliyun/pom.xml @@ -128,6 +128,27 @@ test test-jar - + + org.apache.hadoop + hadoop-distcp + test + + + org.apache.hadoop + hadoop-distcp + test + test-jar + + + org.apache.hadoop + hadoop-yarn-server-tests + test + test-jar + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + test + diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index afe7242780..ad321bd203 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -20,18 +20,12 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*; -import com.aliyun.oss.ClientException; -import com.aliyun.oss.common.auth.CredentialsProvider; -import com.aliyun.oss.common.auth.DefaultCredentialProvider; -import com.aliyun.oss.common.auth.DefaultCredentials; -import java.io.ByteArrayInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -39,30 +33,13 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.UserInfo; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.util.Progressable; -import com.aliyun.oss.ClientConfiguration; -import com.aliyun.oss.OSSClient; -import com.aliyun.oss.OSSException; -import com.aliyun.oss.common.comm.Protocol; -import com.aliyun.oss.model.AbortMultipartUploadRequest; -import com.aliyun.oss.model.CannedAccessControlList; -import com.aliyun.oss.model.CompleteMultipartUploadRequest; -import com.aliyun.oss.model.CompleteMultipartUploadResult; -import com.aliyun.oss.model.CopyObjectResult; -import com.aliyun.oss.model.DeleteObjectsRequest; -import com.aliyun.oss.model.InitiateMultipartUploadRequest; -import com.aliyun.oss.model.InitiateMultipartUploadResult; -import com.aliyun.oss.model.ListObjectsRequest; import com.aliyun.oss.model.OSSObjectSummary; import com.aliyun.oss.model.ObjectListing; import com.aliyun.oss.model.ObjectMetadata; -import com.aliyun.oss.model.PartETag; -import com.aliyun.oss.model.UploadPartCopyRequest; -import com.aliyun.oss.model.UploadPartCopyResult; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,12 +52,8 @@ public class AliyunOSSFileSystem extends FileSystem { LoggerFactory.getLogger(AliyunOSSFileSystem.class); private URI uri; private Path workingDir; - private OSSClient ossClient; - private String bucketName; - private long uploadPartSize; - private long multipartThreshold; + private AliyunOSSFileSystemStore store; private int maxKeys; - private String serverSideEncryptionAlgorithm; @Override public FSDataOutputStream append(Path path, int bufferSize, @@ -91,9 +64,7 @@ public FSDataOutputStream append(Path path, int bufferSize, @Override public void close() throws IOException { try { - if (ossClient != null) { - ossClient.shutdown(); - } + store.close(); } finally { super.close(); } @@ -125,23 +96,33 @@ public FSDataOutputStream create(Path path, FsPermission permission, } return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(), - ossClient, bucketName, key, progress, statistics, - serverSideEncryptionAlgorithm), (Statistics)(null)); + store, key, progress, statistics), (Statistics)(null)); } @Override public boolean delete(Path path, boolean recursive) throws IOException { - FileStatus status; try { - status = getFileStatus(path); + return innerDelete(getFileStatus(path), recursive); } catch (FileNotFoundException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Couldn't delete " + path + ": Does not exist!"); - } + LOG.debug("Couldn't delete {} - does not exist", path); return false; } + } - String key = pathToKey(status.getPath()); + /** + * Delete an object. See {@link #delete(Path, boolean)}. + * + * @param status fileStatus object + * @param recursive if path is a directory and set to + * true, the directory is deleted else throws an exception. In + * case of a file the recursive can be set to either true or false. + * @return true if delete is successful else false. + * @throws IOException due to inability to delete a directory or file. + */ + private boolean innerDelete(FileStatus status, boolean recursive) + throws IOException { + Path f = status.getPath(); + String key = pathToKey(f); if (status.isDirectory()) { if (!key.endsWith("/")) { key += "/"; @@ -150,54 +131,34 @@ public boolean delete(Path path, boolean recursive) throws IOException { FileStatus[] statuses = listStatus(status.getPath()); // Check whether it is an empty directory or not if (statuses.length > 0) { - throw new IOException("Cannot remove directory" + path + + throw new IOException("Cannot remove directory " + f + ": It is not empty!"); } else { // Delete empty directory without '-r' - ossClient.deleteObject(bucketName, key); - statistics.incrementWriteOps(1); + store.deleteObject(key); } } else { - ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); - listRequest.setPrefix(key); - listRequest.setMaxKeys(maxKeys); - - while (true) { - ObjectListing objects = ossClient.listObjects(listRequest); - statistics.incrementReadOps(1); - List keysToDelete = new ArrayList(); - for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { - keysToDelete.add(objectSummary.getKey()); - } - DeleteObjectsRequest deleteRequest = - new DeleteObjectsRequest(bucketName); - deleteRequest.setKeys(keysToDelete); - ossClient.deleteObjects(deleteRequest); - statistics.incrementWriteOps(1); - if (objects.isTruncated()) { - listRequest.setMarker(objects.getNextMarker()); - } else { - break; - } - } + store.deleteDirs(key); } } else { - ossClient.deleteObject(bucketName, key); - statistics.incrementWriteOps(1); + store.deleteObject(key); } - //TODO: optimize logic here + + createFakeDirectoryIfNecessary(f); + return true; + } + + private void createFakeDirectoryIfNecessary(Path f) throws IOException { try { - Path pPath = status.getPath().getParent(); + Path pPath = f.getParent(); FileStatus pStatus = getFileStatus(pPath); - if (pStatus.isDirectory()) { - return true; - } else { + if (pStatus.isFile()) { throw new IOException("Path " + pPath + " is assumed to be a directory!"); } } catch (FileNotFoundException fnfe) { // Make sure the parent directory exists - return mkdir(bucketName, pathToKey(status.getPath().getParent())); + mkdir(pathToKey(f.getParent())); } } @@ -211,22 +172,15 @@ public FileStatus getFileStatus(Path path) throws IOException { return new FileStatus(0, true, 1, 0, 0, qualifiedPath); } - ObjectMetadata meta = getObjectMetadata(key); + ObjectMetadata meta = store.getObjectMetadata(key); // If key not found and key does not end with "/" if (meta == null && !key.endsWith("/")) { // Case: dir + "/" key += "/"; - meta = getObjectMetadata(key); + meta = store.getObjectMetadata(key); } if (meta == null) { - // Case: dir + "/" + file - ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); - listRequest.setPrefix(key); - listRequest.setDelimiter("/"); - listRequest.setMaxKeys(1); - - ObjectListing listing = ossClient.listObjects(listRequest); - statistics.incrementReadOps(1); + ObjectListing listing = store.listObjects(key, 1, "/", null); if (!listing.getObjectSummaries().isEmpty() || !listing.getCommonPrefixes().isEmpty()) { return new FileStatus(0, true, 1, 0, 0, qualifiedPath); @@ -242,22 +196,6 @@ public FileStatus getFileStatus(Path path) throws IOException { } } - /** - * Return object metadata given object key. - * - * @param key object key - * @return return null if key does not exist - */ - private ObjectMetadata getObjectMetadata(String key) { - try { - return ossClient.getObjectMetadata(bucketName, key); - } catch (OSSException osse) { - return null; - } finally { - statistics.incrementReadOps(1); - } - } - @Override public String getScheme() { return "oss"; @@ -288,7 +226,6 @@ public String getCanonicalServiceName() { * Initialize new FileSystem. * * @param name the uri of the file system, including host, port, etc. - * * @param conf configuration of the file system * @throws IOException IO problems */ @@ -296,153 +233,15 @@ public void initialize(URI name, Configuration conf) throws IOException { super.initialize(name, conf); uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority()); - workingDir = - new Path("/user", - System.getProperty("user.name")).makeQualified(uri, null); - - bucketName = name.getHost(); - - ClientConfiguration clientConf = new ClientConfiguration(); - clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY, - MAXIMUM_CONNECTIONS_DEFAULT)); - boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY, - SECURE_CONNECTIONS_DEFAULT); - clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); - clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY, - MAX_ERROR_RETRIES_DEFAULT)); - clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY, - ESTABLISH_TIMEOUT_DEFAULT)); - clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY, - SOCKET_TIMEOUT_DEFAULT)); - - String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, ""); - int proxyPort = conf.getInt(PROXY_PORT_KEY, -1); - if (!proxyHost.isEmpty()) { - clientConf.setProxyHost(proxyHost); - if (proxyPort >= 0) { - clientConf.setProxyPort(proxyPort); - } else { - if (secureConnections) { - LOG.warn("Proxy host set without port. Using HTTPS default 443"); - clientConf.setProxyPort(443); - } else { - LOG.warn("Proxy host set without port. Using HTTP default 80"); - clientConf.setProxyPort(80); - } - } - String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY); - String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY); - if ((proxyUsername == null) != (proxyPassword == null)) { - String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " + - PROXY_PASSWORD_KEY + " set without the other."; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - clientConf.setProxyUsername(proxyUsername); - clientConf.setProxyPassword(proxyPassword); - clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY)); - clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY)); - } else if (proxyPort >= 0) { - String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " + - PROXY_HOST_KEY; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - - String endPoint = conf.getTrimmed(ENDPOINT_KEY, ""); - ossClient = - new OSSClient(endPoint, getCredentialsProvider(name, conf), clientConf); + workingDir = new Path("/user", + System.getProperty("user.name")).makeQualified(uri, null); + store = new AliyunOSSFileSystemStore(); + store.initialize(name, conf, statistics); maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); - uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, - MULTIPART_UPLOAD_SIZE_DEFAULT); - multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, - MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); - - if (uploadPartSize < 5 * 1024 * 1024) { - LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB"); - uploadPartSize = 5 * 1024 * 1024; - } - - if (multipartThreshold < 5 * 1024 * 1024) { - LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB"); - multipartThreshold = 5 * 1024 * 1024; - } - - if (multipartThreshold > 1024 * 1024 * 1024) { - LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB"); - multipartThreshold = 1024 * 1024 * 1024; - } - - String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT); - if (!cannedACLName.isEmpty()) { - CannedAccessControlList cannedACL = - CannedAccessControlList.valueOf(cannedACLName); - ossClient.setBucketAcl(bucketName, cannedACL); - } - - serverSideEncryptionAlgorithm = - conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, ""); - setConf(conf); } - /** - * Create the default credential provider, or load in one explicitly - * identified in the configuration. - * @param name the uri of the file system - * @param conf configuration - * @return a credential provider - * @throws IOException on any problem. Class construction issues may be - * nested inside the IOE. - */ - private CredentialsProvider getCredentialsProvider(URI name, - Configuration conf) throws IOException { - CredentialsProvider credentials; - - String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY); - if (StringUtils.isEmpty(className)) { - Configuration newConf = - ProviderUtils.excludeIncompatibleCredentialProviders(conf, - AliyunOSSFileSystem.class); - String accessKey = - AliyunOSSUtils.getPassword(newConf, ACCESS_KEY, - UserInfo.EMPTY.getUser()); - String secretKey = - AliyunOSSUtils.getPassword(newConf, SECRET_KEY, - UserInfo.EMPTY.getPassword()); - credentials = - new DefaultCredentialProvider( - new DefaultCredentials(accessKey, secretKey)); - - } else { - try { - LOG.debug("Credential provider class is:" + className); - Class credClass = Class.forName(className); - try { - credentials = - (CredentialsProvider)credClass.getDeclaredConstructor( - URI.class, Configuration.class).newInstance(this.uri, conf); - } catch (NoSuchMethodException | SecurityException e) { - credentials = - (CredentialsProvider)credClass.getDeclaredConstructor() - .newInstance(); - } - } catch (ClassNotFoundException e) { - throw new IOException(className + " not found.", e); - } catch (NoSuchMethodException | SecurityException e) { - throw new IOException(String.format("%s constructor exception. A " + - "class specified in %s must provide an accessible constructor " + - "accepting URI and Configuration, or an accessible default " + - "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY), e); - } catch (ReflectiveOperationException | IllegalArgumentException e) { - throw new IOException(className + " instantiation exception.", e); - } - } - - return credentials; - } - /** * Check if OSS object represents a directory. * @@ -456,10 +255,10 @@ private boolean objectRepresentsDirectory(final String name, } /** - * Turns a path (relative or otherwise) into an OSS key. + * Turn a path (relative or otherwise) into an OSS key. * - * @param path the path of the file - * @return the key of the object that represent the file + * @param path the path of the file. + * @return the key of the object that represents the file. */ private String pathToKey(Path path) { if (!path.isAbsolute()) { @@ -492,18 +291,12 @@ public FileStatus[] listStatus(Path path) throws IOException { key = key + "/"; } - ListObjectsRequest listObjectsRequest = - new ListObjectsRequest(bucketName); - listObjectsRequest.setPrefix(key); - listObjectsRequest.setDelimiter("/"); - listObjectsRequest.setMaxKeys(maxKeys); - if (LOG.isDebugEnabled()) { LOG.debug("listStatus: doing listObjects for directory " + key); } + ObjectListing objects = store.listObjects(key, maxKeys, "/", null); while (true) { - ObjectListing objects = ossClient.listObjects(listObjectsRequest); statistics.incrementReadOps(1); for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { Path keyPath = keyToPath(objectSummary.getKey()) @@ -539,7 +332,8 @@ public FileStatus[] listStatus(Path path) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("listStatus: list truncated - getting next batch"); } - listObjectsRequest.setMarker(objects.getNextMarker()); + objects = store.listObjects(key, maxKeys, "/", + objects.getNextMarker()); statistics.incrementReadOps(1); } else { break; @@ -558,27 +352,17 @@ public FileStatus[] listStatus(Path path) throws IOException { /** * Used to create an empty file that represents an empty directory. * - * @param bucket the bucket this directory belongs to * @param key directory path - * @return true if directory successfully created + * @return true if directory is successfully created * @throws IOException */ - private boolean mkdir(final String bucket, final String key) - throws IOException { + private boolean mkdir(final String key) throws IOException { String dirName = key; - ObjectMetadata dirMeta = new ObjectMetadata(); - byte[] buffer = new byte[0]; - ByteArrayInputStream in = new ByteArrayInputStream(buffer); - dirMeta.setContentLength(0); if (!key.endsWith("/")) { dirName += "/"; } - try { - ossClient.putObject(bucket, dirName, in, dirMeta); - return true; - } finally { - in.close(); - } + store.storeEmptyFile(dirName); + return true; } @Override @@ -595,14 +379,14 @@ public boolean mkdirs(Path path, FsPermission permission) } catch (FileNotFoundException e) { validatePath(path); String key = pathToKey(path); - return mkdir(bucketName, key); + return mkdir(key); } } /** * Check whether the path is a valid path. * - * @param path the path to be checked + * @param path the path to be checked. * @throws IOException */ private void validatePath(Path path) throws IOException { @@ -631,8 +415,8 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException { " because it is a directory"); } - return new FSDataInputStream(new AliyunOSSInputStream(getConf(), ossClient, - bucketName, pathToKey(path), fileStatus.getLen(), statistics)); + return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store, + pathToKey(path), fileStatus.getLen(), statistics)); } @Override @@ -696,126 +480,31 @@ public boolean rename(Path srcPath, Path dstPath) throws IOException { } else { copyFile(srcPath, dstPath); } - if (srcPath.equals(dstPath)) { - return true; - } else { - return delete(srcPath, true); - } + + return srcPath.equals(dstPath) || delete(srcPath, true); } /** * Copy file from source path to destination path. - * (the caller should make sure srcPath is a file and dstPath is valid.) + * (the caller should make sure srcPath is a file and dstPath is valid) * - * @param srcPath source path - * @param dstPath destination path - * @return true if successfully copied + * @param srcPath source path. + * @param dstPath destination path. + * @return true if file is successfully copied. */ private boolean copyFile(Path srcPath, Path dstPath) { String srcKey = pathToKey(srcPath); String dstKey = pathToKey(dstPath); - return copyFile(srcKey, dstKey); - } - - /** - * Copy an object from source key to destination key. - * - * @param srcKey source key - * @param dstKey destination key - * @return true if successfully copied - */ - private boolean copyFile(String srcKey, String dstKey) { - ObjectMetadata objectMeta = - ossClient.getObjectMetadata(bucketName, srcKey); - long dataLen = objectMeta.getContentLength(); - if (dataLen <= multipartThreshold) { - return singleCopy(srcKey, dstKey); - } else { - return multipartCopy(srcKey, dataLen, dstKey); - } - } - - /** - * Use single copy to copy an oss object. - * - * @param srcKey source key - * @param dstKey destination key - * @return true if successfully copied - * (the caller should make sure srcPath is a file and dstPath is valid) - */ - private boolean singleCopy(String srcKey, String dstKey) { - CopyObjectResult copyResult = - ossClient.copyObject(bucketName, srcKey, bucketName, dstKey); - LOG.debug(copyResult.getETag()); - return true; - } - - /** - * Use multipart copy to copy an oss object. - * (the caller should make sure srcPath is a file and dstPath is valid) - * - * @param srcKey source key - * @param dataLen data size of the object to copy - * @param dstKey destination key - * @return true if successfully copied, or false if upload is aborted - */ - private boolean multipartCopy(String srcKey, long dataLen, String dstKey) { - int partNum = (int)(dataLen / uploadPartSize); - if (dataLen % uploadPartSize != 0) { - partNum++; - } - InitiateMultipartUploadRequest initiateMultipartUploadRequest = - new InitiateMultipartUploadRequest(bucketName, dstKey); - ObjectMetadata meta = new ObjectMetadata(); - if (!serverSideEncryptionAlgorithm.isEmpty()) { - meta.setServerSideEncryption(serverSideEncryptionAlgorithm); - } - initiateMultipartUploadRequest.setObjectMetadata(meta); - InitiateMultipartUploadResult initiateMultipartUploadResult = - ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); - String uploadId = initiateMultipartUploadResult.getUploadId(); - List partETags = new ArrayList(); - try { - for (int i = 0; i < partNum; i++) { - long skipBytes = uploadPartSize * i; - long size = (uploadPartSize < dataLen - skipBytes) ? - uploadPartSize : dataLen - skipBytes; - UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest(); - partCopyRequest.setSourceBucketName(bucketName); - partCopyRequest.setSourceKey(srcKey); - partCopyRequest.setBucketName(bucketName); - partCopyRequest.setKey(dstKey); - partCopyRequest.setUploadId(uploadId); - partCopyRequest.setPartSize(size); - partCopyRequest.setBeginIndex(skipBytes); - partCopyRequest.setPartNumber(i + 1); - UploadPartCopyResult partCopyResult = - ossClient.uploadPartCopy(partCopyRequest); - statistics.incrementWriteOps(1); - partETags.add(partCopyResult.getPartETag()); - } - CompleteMultipartUploadRequest completeMultipartUploadRequest = - new CompleteMultipartUploadRequest(bucketName, dstKey, - uploadId, partETags); - CompleteMultipartUploadResult completeMultipartUploadResult = - ossClient.completeMultipartUpload(completeMultipartUploadRequest); - LOG.debug(completeMultipartUploadResult.getETag()); - return true; - } catch (OSSException | ClientException e) { - AbortMultipartUploadRequest abortMultipartUploadRequest = - new AbortMultipartUploadRequest(bucketName, dstKey, uploadId); - ossClient.abortMultipartUpload(abortMultipartUploadRequest); - return false; - } + return store.copyFile(srcKey, dstKey); } /** * Copy a directory from source path to destination path. * (the caller should make sure srcPath is a directory, and dstPath is valid) * - * @param srcPath source path - * @param dstPath destination path - * @return true if successfully copied + * @param srcPath source path. + * @param dstPath destination path. + * @return true if directory is successfully copied. */ private boolean copyDirectory(Path srcPath, Path dstPath) { String srcKey = pathToKey(srcPath); @@ -835,21 +524,18 @@ private boolean copyDirectory(Path srcPath, Path dstPath) { return false; } - ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName); - listObjectsRequest.setPrefix(srcKey); - listObjectsRequest.setMaxKeys(maxKeys); - - ObjectListing objects = ossClient.listObjects(listObjectsRequest); + ObjectListing objects = store.listObjects(srcKey, maxKeys, null, null); statistics.incrementReadOps(1); // Copy files from src folder to dst while (true) { for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { String newKey = dstKey.concat(objectSummary.getKey().substring(srcKey.length())); - copyFile(objectSummary.getKey(), newKey); + store.copyFile(objectSummary.getKey(), newKey); } if (objects.isTruncated()) { - listObjectsRequest.setMarker(objects.getNextMarker()); + objects = store.listObjects(srcKey, maxKeys, null, + objects.getNextMarker()); statistics.incrementReadOps(1); } else { break; @@ -863,4 +549,7 @@ public void setWorkingDirectory(Path dir) { this.workingDir = dir; } + public AliyunOSSFileSystemStore getStore() { + return store; + } } diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java new file mode 100644 index 0000000000..b3cd1bdd14 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java @@ -0,0 +1,486 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.aliyun.oss; + +import com.aliyun.oss.ClientConfiguration; +import com.aliyun.oss.ClientException; +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.common.auth.CredentialsProvider; +import com.aliyun.oss.common.comm.Protocol; +import com.aliyun.oss.model.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +/** + * Core implementation of Aliyun OSS Filesystem for Hadoop. + * Provides the bridging logic between Hadoop's abstract filesystem and + * Aliyun OSS. + */ +public class AliyunOSSFileSystemStore { + public static final Logger LOG = + LoggerFactory.getLogger(AliyunOSSFileSystemStore.class); + private FileSystem.Statistics statistics; + private OSSClient ossClient; + private String bucketName; + private long uploadPartSize; + private long multipartThreshold; + private long partSize; + private int maxKeys; + private String serverSideEncryptionAlgorithm; + + public void initialize(URI uri, Configuration conf, + FileSystem.Statistics stat) throws IOException { + statistics = stat; + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY, + MAXIMUM_CONNECTIONS_DEFAULT)); + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY, + SECURE_CONNECTIONS_DEFAULT); + clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); + clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY, + MAX_ERROR_RETRIES_DEFAULT)); + clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY, + ESTABLISH_TIMEOUT_DEFAULT)); + clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY, + SOCKET_TIMEOUT_DEFAULT)); + + String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, ""); + int proxyPort = conf.getInt(PROXY_PORT_KEY, -1); + if (!proxyHost.isEmpty()) { + clientConf.setProxyHost(proxyHost); + if (proxyPort >= 0) { + clientConf.setProxyPort(proxyPort); + } else { + if (secureConnections) { + LOG.warn("Proxy host set without port. Using HTTPS default 443"); + clientConf.setProxyPort(443); + } else { + LOG.warn("Proxy host set without port. Using HTTP default 80"); + clientConf.setProxyPort(80); + } + } + String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY); + String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY); + if ((proxyUsername == null) != (proxyPassword == null)) { + String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " + + PROXY_PASSWORD_KEY + " set without the other."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + clientConf.setProxyUsername(proxyUsername); + clientConf.setProxyPassword(proxyPassword); + clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY)); + clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY)); + } else if (proxyPort >= 0) { + String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " + + PROXY_HOST_KEY; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + + String endPoint = conf.getTrimmed(ENDPOINT_KEY, ""); + CredentialsProvider provider = + AliyunOSSUtils.getCredentialsProvider(uri, conf); + ossClient = new OSSClient(endPoint, provider, clientConf); + uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, + MULTIPART_UPLOAD_SIZE_DEFAULT); + multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, + MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); + partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, + MULTIPART_UPLOAD_SIZE_DEFAULT); + if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) { + partSize = MIN_MULTIPART_UPLOAD_PART_SIZE; + } + serverSideEncryptionAlgorithm = + conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, ""); + + if (uploadPartSize < 5 * 1024 * 1024) { + LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB"); + uploadPartSize = 5 * 1024 * 1024; + } + + if (multipartThreshold < 5 * 1024 * 1024) { + LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB"); + multipartThreshold = 5 * 1024 * 1024; + } + + if (multipartThreshold > 1024 * 1024 * 1024) { + LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB"); + multipartThreshold = 1024 * 1024 * 1024; + } + + String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT); + if (!cannedACLName.isEmpty()) { + CannedAccessControlList cannedACL = + CannedAccessControlList.valueOf(cannedACLName); + ossClient.setBucketAcl(bucketName, cannedACL); + } + + maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); + bucketName = uri.getHost(); + } + + /** + * Delete an object, and update write operation statistics. + * + * @param key key to blob to delete. + */ + public void deleteObject(String key) { + ossClient.deleteObject(bucketName, key); + statistics.incrementWriteOps(1); + } + + /** + * Delete a list of keys, and update write operation statistics. + * + * @param keysToDelete collection of keys to delete. + */ + public void deleteObjects(List keysToDelete) { + DeleteObjectsRequest deleteRequest = + new DeleteObjectsRequest(bucketName); + deleteRequest.setKeys(keysToDelete); + ossClient.deleteObjects(deleteRequest); + statistics.incrementWriteOps(keysToDelete.size()); + } + + /** + * Delete a directory from Aliyun OSS. + * + * @param key directory key to delete. + */ + public void deleteDirs(String key) { + ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); + listRequest.setPrefix(key); + listRequest.setMaxKeys(maxKeys); + + while (true) { + ObjectListing objects = ossClient.listObjects(listRequest); + statistics.incrementReadOps(1); + List keysToDelete = new ArrayList(); + for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { + keysToDelete.add(objectSummary.getKey()); + } + deleteObjects(keysToDelete); + if (objects.isTruncated()) { + listRequest.setMarker(objects.getNextMarker()); + } else { + break; + } + } + } + + /** + * Return metadata of a given object key. + * + * @param key object key. + * @return return null if key does not exist. + */ + public ObjectMetadata getObjectMetadata(String key) { + try { + return ossClient.getObjectMetadata(bucketName, key); + } catch (OSSException osse) { + return null; + } finally { + statistics.incrementReadOps(1); + } + } + + /** + * Upload an empty file as an OSS object, using single upload. + * + * @param key object key. + * @throws IOException if failed to upload object. + */ + public void storeEmptyFile(String key) throws IOException { + ObjectMetadata dirMeta = new ObjectMetadata(); + byte[] buffer = new byte[0]; + ByteArrayInputStream in = new ByteArrayInputStream(buffer); + dirMeta.setContentLength(0); + try { + ossClient.putObject(bucketName, key, in, dirMeta); + } finally { + in.close(); + } + } + + /** + * Copy an object from source key to destination key. + * + * @param srcKey source key. + * @param dstKey destination key. + * @return true if file is successfully copied. + */ + public boolean copyFile(String srcKey, String dstKey) { + ObjectMetadata objectMeta = + ossClient.getObjectMetadata(bucketName, srcKey); + long contentLength = objectMeta.getContentLength(); + if (contentLength <= multipartThreshold) { + return singleCopy(srcKey, dstKey); + } else { + return multipartCopy(srcKey, contentLength, dstKey); + } + } + + /** + * Use single copy to copy an OSS object. + * (The caller should make sure srcPath is a file and dstPath is valid) + * + * @param srcKey source key. + * @param dstKey destination key. + * @return true if object is successfully copied. + */ + private boolean singleCopy(String srcKey, String dstKey) { + CopyObjectResult copyResult = + ossClient.copyObject(bucketName, srcKey, bucketName, dstKey); + LOG.debug(copyResult.getETag()); + return true; + } + + /** + * Use multipart copy to copy an OSS object. + * (The caller should make sure srcPath is a file and dstPath is valid) + * + * @param srcKey source key. + * @param contentLength data size of the object to copy. + * @param dstKey destination key. + * @return true if success, or false if upload is aborted. + */ + private boolean multipartCopy(String srcKey, long contentLength, + String dstKey) { + long realPartSize = + AliyunOSSUtils.calculatePartSize(contentLength, uploadPartSize); + int partNum = (int) (contentLength / realPartSize); + if (contentLength % realPartSize != 0) { + partNum++; + } + InitiateMultipartUploadRequest initiateMultipartUploadRequest = + new InitiateMultipartUploadRequest(bucketName, dstKey); + ObjectMetadata meta = new ObjectMetadata(); + if (!serverSideEncryptionAlgorithm.isEmpty()) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + initiateMultipartUploadRequest.setObjectMetadata(meta); + InitiateMultipartUploadResult initiateMultipartUploadResult = + ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); + String uploadId = initiateMultipartUploadResult.getUploadId(); + List partETags = new ArrayList(); + try { + for (int i = 0; i < partNum; i++) { + long skipBytes = realPartSize * i; + long size = (realPartSize < contentLength - skipBytes) ? + realPartSize : contentLength - skipBytes; + UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest(); + partCopyRequest.setSourceBucketName(bucketName); + partCopyRequest.setSourceKey(srcKey); + partCopyRequest.setBucketName(bucketName); + partCopyRequest.setKey(dstKey); + partCopyRequest.setUploadId(uploadId); + partCopyRequest.setPartSize(size); + partCopyRequest.setBeginIndex(skipBytes); + partCopyRequest.setPartNumber(i + 1); + UploadPartCopyResult partCopyResult = + ossClient.uploadPartCopy(partCopyRequest); + statistics.incrementWriteOps(1); + partETags.add(partCopyResult.getPartETag()); + } + CompleteMultipartUploadRequest completeMultipartUploadRequest = + new CompleteMultipartUploadRequest(bucketName, dstKey, + uploadId, partETags); + CompleteMultipartUploadResult completeMultipartUploadResult = + ossClient.completeMultipartUpload(completeMultipartUploadRequest); + LOG.debug(completeMultipartUploadResult.getETag()); + return true; + } catch (OSSException | ClientException e) { + AbortMultipartUploadRequest abortMultipartUploadRequest = + new AbortMultipartUploadRequest(bucketName, dstKey, uploadId); + ossClient.abortMultipartUpload(abortMultipartUploadRequest); + return false; + } + } + + /** + * Upload a file as an OSS object, using single upload. + * + * @param key object key. + * @param file local file to upload. + * @throws IOException if failed to upload object. + */ + public void uploadObject(String key, File file) throws IOException { + File object = file.getAbsoluteFile(); + FileInputStream fis = new FileInputStream(object); + ObjectMetadata meta = new ObjectMetadata(); + meta.setContentLength(object.length()); + if (!serverSideEncryptionAlgorithm.isEmpty()) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + try { + PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta); + LOG.debug(result.getETag()); + statistics.incrementWriteOps(1); + } finally { + fis.close(); + } + } + + /** + * Upload a file as an OSS object, using multipart upload. + * + * @param key object key. + * @param file local file to upload. + * @throws IOException if failed to upload object. + */ + public void multipartUploadObject(String key, File file) throws IOException { + File object = file.getAbsoluteFile(); + long dataLen = object.length(); + long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize); + int partNum = (int) (dataLen / realPartSize); + if (dataLen % realPartSize != 0) { + partNum += 1; + } + + InitiateMultipartUploadRequest initiateMultipartUploadRequest = + new InitiateMultipartUploadRequest(bucketName, key); + ObjectMetadata meta = new ObjectMetadata(); + if (!serverSideEncryptionAlgorithm.isEmpty()) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + initiateMultipartUploadRequest.setObjectMetadata(meta); + InitiateMultipartUploadResult initiateMultipartUploadResult = + ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); + List partETags = new ArrayList(); + String uploadId = initiateMultipartUploadResult.getUploadId(); + + try { + for (int i = 0; i < partNum; i++) { + // TODO: Optimize this, avoid opening the object multiple times + FileInputStream fis = new FileInputStream(object); + try { + long skipBytes = realPartSize * i; + AliyunOSSUtils.skipFully(fis, skipBytes); + long size = (realPartSize < dataLen - skipBytes) ? + realPartSize : dataLen - skipBytes; + UploadPartRequest uploadPartRequest = new UploadPartRequest(); + uploadPartRequest.setBucketName(bucketName); + uploadPartRequest.setKey(key); + uploadPartRequest.setUploadId(uploadId); + uploadPartRequest.setInputStream(fis); + uploadPartRequest.setPartSize(size); + uploadPartRequest.setPartNumber(i + 1); + UploadPartResult uploadPartResult = + ossClient.uploadPart(uploadPartRequest); + statistics.incrementWriteOps(1); + partETags.add(uploadPartResult.getPartETag()); + } finally { + fis.close(); + } + } + CompleteMultipartUploadRequest completeMultipartUploadRequest = + new CompleteMultipartUploadRequest(bucketName, key, + uploadId, partETags); + CompleteMultipartUploadResult completeMultipartUploadResult = + ossClient.completeMultipartUpload(completeMultipartUploadRequest); + LOG.debug(completeMultipartUploadResult.getETag()); + } catch (OSSException | ClientException e) { + AbortMultipartUploadRequest abortMultipartUploadRequest = + new AbortMultipartUploadRequest(bucketName, key, uploadId); + ossClient.abortMultipartUpload(abortMultipartUploadRequest); + } + } + + /** + * list objects. + * + * @param prefix prefix. + * @param maxListingLength max no. of entries + * @param delimiter delimiter. + * @param marker last key in any previous search. + * @return a list of matches. + */ + public ObjectListing listObjects(String prefix, int maxListingLength, + String delimiter, String marker) { + ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); + listRequest.setPrefix(prefix); + listRequest.setDelimiter(delimiter); + listRequest.setMaxKeys(maxListingLength); + listRequest.setMarker(marker); + + ObjectListing listing = ossClient.listObjects(listRequest); + statistics.incrementReadOps(1); + return listing; + } + + /** + * Retrieve a part of an object. + * + * @param key the object name that is being retrieved from the Aliyun OSS. + * @param byteStart start position. + * @param byteEnd end position. + * @return This method returns null if the key is not found. + */ + public InputStream retrieve(String key, long byteStart, long byteEnd) { + try { + GetObjectRequest request = new GetObjectRequest(bucketName, key); + request.setRange(byteStart, byteEnd); + return ossClient.getObject(request).getObjectContent(); + } catch (OSSException | ClientException e) { + return null; + } + } + + /** + * Close OSS client properly. + */ + public void close() { + if (ossClient != null) { + ossClient.shutdown(); + ossClient = null; + } + } + + /** + * Clean up all objects matching the prefix. + * + * @param prefix Aliyun OSS object prefix. + */ + public void purge(String prefix) { + String key; + try { + ObjectListing objects = listObjects(prefix, maxKeys, null, null); + for (OSSObjectSummary object : objects.getObjectSummaries()) { + key = object.getKey(); + ossClient.deleteObject(bucketName, key); + } + + for (String dir: objects.getCommonPrefixes()) { + deleteDirs(dir); + } + } catch (OSSException | ClientException e) { + LOG.error("Failed to purge " + prefix); + } + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java index b12e3f0ca5..69265fb81d 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java @@ -27,12 +27,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; -import com.aliyun.oss.OSSClient; -import com.aliyun.oss.model.GetObjectRequest; - /** * The input stream for OSS blob system. * The class uses multi-part downloading to read data from the object content @@ -40,27 +38,23 @@ */ public class AliyunOSSInputStream extends FSInputStream { public static final Log LOG = LogFactory.getLog(AliyunOSSInputStream.class); - private static final int MAX_RETRIES = 10; private final long downloadPartSize; - - private String bucketName; - private String key; - private OSSClient ossClient; + private AliyunOSSFileSystemStore store; + private final String key; private Statistics statistics; private boolean closed; private InputStream wrappedStream = null; - private long dataLen; + private long contentLength; private long position; private long partRemaining; - public AliyunOSSInputStream(Configuration conf, OSSClient client, - String bucketName, String key, Long dataLen, Statistics statistics) - throws IOException { - this.bucketName = bucketName; + public AliyunOSSInputStream(Configuration conf, + AliyunOSSFileSystemStore store, String key, Long contentLength, + Statistics statistics) throws IOException { + this.store = store; this.key = key; - ossClient = client; this.statistics = statistics; - this.dataLen = dataLen; + this.contentLength = contentLength; downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, MULTIPART_DOWNLOAD_SIZE_DEFAULT); reopen(0); @@ -75,18 +69,17 @@ public AliyunOSSInputStream(Configuration conf, OSSClient client, * @throws IOException if failed to reopen */ private synchronized void reopen(long pos) throws IOException { - - long partLen; + long partSize; if (pos < 0) { - throw new EOFException("Cannot seek at negtive position:" + pos); - } else if (pos > dataLen) { - throw new EOFException("Cannot seek after EOF, fileLen:" + dataLen + - " position:" + pos); - } else if (pos + downloadPartSize > dataLen) { - partLen = dataLen - pos; + throw new EOFException("Cannot seek at negative position:" + pos); + } else if (pos > contentLength) { + throw new EOFException("Cannot seek after EOF, contentLength:" + + contentLength + " position:" + pos); + } else if (pos + downloadPartSize > contentLength) { + partSize = contentLength - pos; } else { - partLen = downloadPartSize; + partSize = downloadPartSize; } if (wrappedStream != null) { @@ -96,21 +89,19 @@ private synchronized void reopen(long pos) throws IOException { wrappedStream.close(); } - GetObjectRequest request = new GetObjectRequest(bucketName, key); - request.setRange(pos, pos + partLen - 1); - wrappedStream = ossClient.getObject(request).getObjectContent(); + wrappedStream = store.retrieve(key, pos, pos + partSize -1); if (wrappedStream == null) { throw new IOException("Null IO stream"); } position = pos; - partRemaining = partLen; + partRemaining = partSize; } @Override public synchronized int read() throws IOException { checkNotClosed(); - if (partRemaining <= 0 && position < dataLen) { + if (partRemaining <= 0 && position < contentLength) { reopen(position); } @@ -139,13 +130,14 @@ public synchronized int read() throws IOException { /** - * Check whether the input stream is closed. + * Verify that the input stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. * - * @throws IOException if stream is closed + * @throws IOException if the connection is closed. */ private void checkNotClosed() throws IOException { if (closed) { - throw new IOException("Stream is closed!"); + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } } @@ -164,7 +156,7 @@ public synchronized int read(byte[] buf, int off, int len) int bytesRead = 0; // Not EOF, and read not done - while (position < dataLen && bytesRead < len) { + while (position < contentLength && bytesRead < len) { if (partRemaining == 0) { reopen(position); } @@ -219,7 +211,7 @@ public synchronized void close() throws IOException { public synchronized int available() throws IOException { checkNotClosed(); - long remaining = dataLen - position; + long remaining = contentLength - position; if (remaining > Integer.MAX_VALUE) { return Integer.MAX_VALUE; } diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java index 1e16df9edb..c952d0ae85 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java @@ -22,15 +22,10 @@ import java.io.BufferedOutputStream; import java.io.File; -import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import com.aliyun.oss.ClientException; -import com.aliyun.oss.OSSException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,18 +33,6 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.util.Progressable; -import com.aliyun.oss.OSSClient; -import com.aliyun.oss.model.AbortMultipartUploadRequest; -import com.aliyun.oss.model.CompleteMultipartUploadRequest; -import com.aliyun.oss.model.CompleteMultipartUploadResult; -import com.aliyun.oss.model.InitiateMultipartUploadRequest; -import com.aliyun.oss.model.InitiateMultipartUploadResult; -import com.aliyun.oss.model.ObjectMetadata; -import com.aliyun.oss.model.PartETag; -import com.aliyun.oss.model.PutObjectResult; -import com.aliyun.oss.model.UploadPartRequest; -import com.aliyun.oss.model.UploadPartResult; - /** * The output stream for OSS blob system. * Data will be buffered on local disk, then uploaded to OSS in @@ -57,36 +40,24 @@ */ public class AliyunOSSOutputStream extends OutputStream { public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class); - private String bucketName; - private String key; + private AliyunOSSFileSystemStore store; + private final String key; private Statistics statistics; private Progressable progress; - private String serverSideEncryptionAlgorithm; - private long partSize; private long partSizeThreshold; private LocalDirAllocator dirAlloc; private boolean closed; private File tmpFile; private BufferedOutputStream backupStream; - private OSSClient ossClient; - public AliyunOSSOutputStream(Configuration conf, OSSClient client, - String bucketName, String key, Progressable progress, - Statistics statistics, String serverSideEncryptionAlgorithm) - throws IOException { - this.bucketName = bucketName; + public AliyunOSSOutputStream(Configuration conf, + AliyunOSSFileSystemStore store, String key, Progressable progress, + Statistics statistics) throws IOException { + this.store = store; this.key = key; // The caller cann't get any progress information this.progress = progress; - ossClient = client; this.statistics = statistics; - this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; - - partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, - MULTIPART_UPLOAD_SIZE_DEFAULT); - if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) { - partSize = MIN_MULTIPART_UPLOAD_PART_SIZE; - } partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); @@ -113,9 +84,9 @@ public synchronized void close() throws IOException { long dataLen = tmpFile.length(); try { if (dataLen <= partSizeThreshold) { - uploadObject(); + store.uploadObject(key, tmpFile); } else { - multipartUploadObject(); + store.multipartUploadObject(key, tmpFile); } } finally { if (!tmpFile.delete()) { @@ -124,91 +95,7 @@ public synchronized void close() throws IOException { } } - /** - * Upload temporary file as an OSS object, using single upload. - * - * @throws IOException - */ - private void uploadObject() throws IOException { - File object = tmpFile.getAbsoluteFile(); - FileInputStream fis = new FileInputStream(object); - ObjectMetadata meta = new ObjectMetadata(); - meta.setContentLength(object.length()); - if (!serverSideEncryptionAlgorithm.isEmpty()) { - meta.setServerSideEncryption(serverSideEncryptionAlgorithm); - } - try { - PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta); - LOG.debug(result.getETag()); - statistics.incrementWriteOps(1); - } finally { - fis.close(); - } - } - /** - * Upload temporary file as an OSS object, using multipart upload. - * - * @throws IOException - */ - private void multipartUploadObject() throws IOException { - File object = tmpFile.getAbsoluteFile(); - long dataLen = object.length(); - long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize); - int partNum = (int)(dataLen / realPartSize); - if (dataLen % realPartSize != 0) { - partNum += 1; - } - - InitiateMultipartUploadRequest initiateMultipartUploadRequest = - new InitiateMultipartUploadRequest(bucketName, key); - ObjectMetadata meta = new ObjectMetadata(); - // meta.setContentLength(dataLen); - if (!serverSideEncryptionAlgorithm.isEmpty()) { - meta.setServerSideEncryption(serverSideEncryptionAlgorithm); - } - initiateMultipartUploadRequest.setObjectMetadata(meta); - InitiateMultipartUploadResult initiateMultipartUploadResult = - ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); - List partETags = new ArrayList(); - String uploadId = initiateMultipartUploadResult.getUploadId(); - - try { - for (int i = 0; i < partNum; i++) { - // TODO: Optimize this, avoid opening the object multiple times - FileInputStream fis = new FileInputStream(object); - try { - long skipBytes = realPartSize * i; - AliyunOSSUtils.skipFully(fis, skipBytes); - long size = (realPartSize < dataLen - skipBytes) ? - realPartSize : dataLen - skipBytes; - UploadPartRequest uploadPartRequest = new UploadPartRequest(); - uploadPartRequest.setBucketName(bucketName); - uploadPartRequest.setKey(key); - uploadPartRequest.setUploadId(uploadId); - uploadPartRequest.setInputStream(fis); - uploadPartRequest.setPartSize(size); - uploadPartRequest.setPartNumber(i + 1); - UploadPartResult uploadPartResult = - ossClient.uploadPart(uploadPartRequest); - statistics.incrementWriteOps(1); - partETags.add(uploadPartResult.getPartETag()); - } finally { - fis.close(); - } - } - CompleteMultipartUploadRequest completeMultipartUploadRequest = - new CompleteMultipartUploadRequest(bucketName, key, - uploadId, partETags); - CompleteMultipartUploadResult completeMultipartUploadResult = - ossClient.completeMultipartUpload(completeMultipartUploadRequest); - LOG.debug(completeMultipartUploadResult.getETag()); - } catch (OSSException | ClientException e) { - AbortMultipartUploadRequest abortMultipartUploadRequest = - new AbortMultipartUploadRequest(bucketName, key, uploadId); - ossClient.abortMultipartUpload(abortMultipartUploadRequest); - } - } @Override public synchronized void flush() throws IOException { diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java index d54dd9c44a..b96aea7f86 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java @@ -20,142 +20,58 @@ import java.io.IOException; import java.io.InputStream; -import java.io.UnsupportedEncodingException; import java.net.URI; -import java.net.URLDecoder; -import java.util.Objects; +import com.aliyun.oss.common.auth.CredentialsProvider; +import com.aliyun.oss.common.auth.DefaultCredentialProvider; +import com.aliyun.oss.common.auth.DefaultCredentials; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.ProviderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT; +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; +import static org.apache.hadoop.fs.aliyun.oss.Constants.ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY; /** * Utility methods for Aliyun OSS code. */ final public class AliyunOSSUtils { + private static final Logger LOG = + LoggerFactory.getLogger(AliyunOSSUtils.class); + private AliyunOSSUtils() { } /** - * User information includes user name and password. - */ - static public class UserInfo { - private final String user; - private final String password; - - public static final UserInfo EMPTY = new UserInfo("", ""); - - public UserInfo(String user, String password) { - this.user = user; - this.password = password; - } - - /** - * Predicate to verify user information is set. - * @return true if the username is defined (not null, not empty). - */ - public boolean hasLogin() { - return StringUtils.isNotEmpty(user); - } - - /** - * Equality test matches user and password. - * @param o other object - * @return true if the objects are considered equivalent. - */ - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - UserInfo that = (UserInfo) o; - return Objects.equals(user, that.user) && - Objects.equals(password, that.password); - } - - @Override - public int hashCode() { - return Objects.hash(user, password); - } - - public String getUser() { - return user; - } - - public String getPassword() { - return password; - } - } - - /** - * Used to get password from configuration, if default value is not available. + * Used to get password from configuration. + * * @param conf configuration that contains password information * @param key the key of the password - * @param val the default value of the key * @return the value for the key * @throws IOException if failed to get password from configuration */ - static public String getPassword(Configuration conf, String key, String val) + static public String getPassword(Configuration conf, String key) throws IOException { - if (StringUtils.isEmpty(val)) { - try { - final char[] pass = conf.getPassword(key); - if (pass != null) { - return (new String(pass)).trim(); - } else { - return ""; - } - } catch (IOException ioe) { - throw new IOException("Cannot find password option " + key, ioe); - } - } else { - return val; - } - } - - /** - * Extract the user information details from a URI. - * @param name URI of the filesystem. - * @return a login tuple, possibly empty. - */ - public static UserInfo extractLoginDetails(URI name) { try { - String authority = name.getAuthority(); - if (authority == null) { - return UserInfo.EMPTY; - } - int loginIndex = authority.indexOf('@'); - if (loginIndex < 0) { - // No user information - return UserInfo.EMPTY; - } - String login = authority.substring(0, loginIndex); - int loginSplit = login.indexOf(':'); - if (loginSplit > 0) { - String user = login.substring(0, loginSplit); - String password = URLDecoder.decode(login.substring(loginSplit + 1), - "UTF-8"); - return new UserInfo(user, password); - } else if (loginSplit == 0) { - // There is no user, just a password. - return UserInfo.EMPTY; + final char[] pass = conf.getPassword(key); + if (pass != null) { + return (new String(pass)).trim(); } else { - return new UserInfo(login, ""); + return ""; } - } catch (UnsupportedEncodingException e) { - // This should never happen; translate it if it does. - throw new RuntimeException(e); + } catch (IOException ioe) { + throw new IOException("Cannot find password option " + key, ioe); } } /** - * Skips the requested number of bytes or fail if there are not enough left. - * This allows for the possibility that {@link InputStream#skip(long)} may not - * skip as many bytes as requested (most likely because of reaching EOF). + * Skip the requested number of bytes or fail if there are no enough bytes + * left. This allows for the possibility that {@link InputStream#skip(long)} + * may not skip as many bytes as requested (most likely because of reaching + * EOF). + * * @param is the input stream to skip. * @param n the number of bytes to skip. * @throws IOException thrown when skipped less number of bytes. @@ -179,12 +95,69 @@ public static void skipFully(InputStream is, long n) throws IOException { * Calculate a proper size of multipart piece. If minPartSize * is too small, the number of multipart pieces may exceed the limit of * {@link Constants#MULTIPART_UPLOAD_PART_NUM_LIMIT}. + * * @param contentLength the size of file. * @param minPartSize the minimum size of multipart piece. * @return a revisional size of multipart piece. - */ + */ public static long calculatePartSize(long contentLength, long minPartSize) { long tmpPartSize = contentLength / MULTIPART_UPLOAD_PART_NUM_LIMIT + 1; return Math.max(minPartSize, tmpPartSize); } + + /** + * Create credential provider specified by configuration, or create default + * credential provider if not specified. + * + * @param name the uri of the file system + * @param conf configuration + * @return a credential provider + * @throws IOException on any problem. Class construction issues may be + * nested inside the IOE. + */ + public static CredentialsProvider getCredentialsProvider(URI name, + Configuration conf) throws IOException { + URI uri = java.net.URI.create( + name.getScheme() + "://" + name.getAuthority()); + CredentialsProvider credentials; + + String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY); + if (StringUtils.isEmpty(className)) { + Configuration newConf = + ProviderUtils.excludeIncompatibleCredentialProviders(conf, + AliyunOSSFileSystem.class); + String accessKey = + AliyunOSSUtils.getPassword(newConf, ACCESS_KEY); + String secretKey = + AliyunOSSUtils.getPassword(newConf, SECRET_KEY); + credentials = new DefaultCredentialProvider( + new DefaultCredentials(accessKey, secretKey)); + } else { + try { + LOG.debug("Credential provider class is:" + className); + Class credClass = Class.forName(className); + try { + credentials = + (CredentialsProvider)credClass.getDeclaredConstructor( + URI.class, Configuration.class).newInstance(uri, conf); + } catch (NoSuchMethodException | SecurityException e) { + credentials = + (CredentialsProvider)credClass.getDeclaredConstructor() + .newInstance(); + } + } catch (ClassNotFoundException e) { + throw new IOException(className + " not found.", e); + } catch (NoSuchMethodException | SecurityException e) { + throw new IOException(String.format("%s constructor exception. A " + + "class specified in %s must provide an accessible constructor " + + "accepting URI and Configuration, or an accessible default " + + "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY), + e); + } catch (ReflectiveOperationException | IllegalArgumentException e) { + throw new IOException(className + " instantiation exception.", e); + } + } + + return credentials; + } } diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java index 99022754fb..243fdd4c0e 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java @@ -72,7 +72,7 @@ private Constants() { // Number of records to get while paging through a directory listing public static final String MAX_PAGING_KEYS_KEY = "fs.oss.paging.maximum"; - public static final int MAX_PAGING_KEYS_DEFAULT = 500; + public static final int MAX_PAGING_KEYS_DEFAULT = 1000; // Size of each of or multipart pieces in bytes public static final String MULTIPART_UPLOAD_SIZE_KEY = @@ -109,5 +109,6 @@ private Constants() { public static final String FS_OSS = "oss"; public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L; + public static final int MAX_RETRIES = 10; } diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemContract.java index de4e5a9315..f234d508e7 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemContract.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemContract.java @@ -73,11 +73,6 @@ public void testMkdirsWithUmask() throws Exception { // not supported } - /** - * Assert that root directory renames are not allowed. - * - * @throws Exception on failures - */ @Override public void testRootDirAlwaysExists() throws Exception { //this will throw an exception if the path is not found @@ -88,11 +83,6 @@ public void testRootDirAlwaysExists() throws Exception { fs.exists(super.path("/"))); } - /** - * Assert that root directory renames are not allowed. - * - * @throws Exception on failures - */ @Override public void testRenameRootDirForbidden() throws Exception { if (!renameSupported()) { diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemStore.java new file mode 100644 index 0000000000..6331ed829f --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemStore.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.*; +import java.net.URI; +import java.security.DigestInputStream; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeNotNull; + +/** + * Test the bridging logic between Hadoop's abstract filesystem and + * Aliyun OSS. + */ +public class TestOSSFileSystemStore { + private Configuration conf; + private AliyunOSSFileSystemStore store; + private AliyunOSSFileSystem fs; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + fs = new AliyunOSSFileSystem(); + fs.initialize(URI.create(conf.get("test.fs.oss.name")), conf); + store = fs.getStore(); + } + + @After + public void tearDown() throws Exception { + try { + store.purge("test"); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + @BeforeClass + public static void checkSettings() throws Exception { + Configuration conf = new Configuration(); + assumeNotNull(conf.get("fs.oss.accessKeyId")); + assumeNotNull(conf.get("fs.oss.accessKeySecret")); + assumeNotNull(conf.get("test.fs.oss.name")); + } + + protected void writeRenameReadCompare(Path path, long len) + throws IOException, NoSuchAlgorithmException { + // If len > fs.oss.multipart.upload.threshold, + // we'll use a multipart upload copy + MessageDigest digest = MessageDigest.getInstance("MD5"); + OutputStream out = new BufferedOutputStream( + new DigestOutputStream(fs.create(path, false), digest)); + for (long i = 0; i < len; i++) { + out.write('Q'); + } + out.flush(); + out.close(); + + assertTrue("Exists", fs.exists(path)); + + Path copyPath = path.suffix(".copy"); + fs.rename(path, copyPath); + + assertTrue("Copy exists", fs.exists(copyPath)); + + // Download file from Aliyun OSS and compare the digest against the original + MessageDigest digest2 = MessageDigest.getInstance("MD5"); + InputStream in = new BufferedInputStream( + new DigestInputStream(fs.open(copyPath), digest2)); + long copyLen = 0; + while (in.read() != -1) { + copyLen++; + } + in.close(); + + assertEquals("Copy length matches original", len, copyLen); + assertArrayEquals("Digests match", digest.digest(), digest2.digest()); + } + + @Test + public void testSmallUpload() throws IOException, NoSuchAlgorithmException { + // Regular upload, regular copy + writeRenameReadCompare(new Path("/test/small"), 16384); + } + + @Test + public void testLargeUpload() + throws IOException, NoSuchAlgorithmException { + // Multipart upload, multipart copy + writeRenameReadCompare(new Path("/test/xlarge"), 52428800L); // 50MB byte + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/OSSContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/OSSContract.java index 8214b9f6be..f90a8bb594 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/OSSContract.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/OSSContract.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.aliyun.oss.contract; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.aliyun.oss.OSSTestUtils; diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractDispCp.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractDispCp.java new file mode 100644 index 0000000000..eb0c5e0cdd --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractDispCp.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.tools.contract.AbstractContractDistCpTest; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +/** + * Contract test suite covering Aliyun OSS integration with DistCp. + */ +public class TestOSSContractDispCp extends AbstractContractDistCpTest { + + private static final long MULTIPART_SETTING = 8 * 1024 * 1024; // 8 MB + + @Override + protected Configuration createConfiguration() { + Configuration newConf = super.createConfiguration(); + newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING); + newConf.setLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_SETTING); + return newConf; + } + + @Override + protected OSSContract createContract(Configuration conf) { + return new OSSContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractGetFileStatus.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractGetFileStatus.java new file mode 100644 index 0000000000..cc21a2e2ce --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractGetFileStatus.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Test getFileStatus and related listing operations. + */ +public class TestOSSContractGetFileStatus + extends AbstractContractGetFileStatusTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new OSSContract(conf); + } + +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractRootDir.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractRootDir.java new file mode 100644 index 0000000000..cbc262c735 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractRootDir.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; + +/** + * Root dir operations against an Aliyun OSS bucket. + */ +public class TestOSSContractRootDir extends + AbstractContractRootDirectoryTest { + + private static final Logger LOG = + LoggerFactory.getLogger(TestOSSContractRootDir.class); + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new OSSContract(conf); + } + + @Override + public void testListEmptyRootDirectory() throws IOException { + for (int attempt = 1, maxAttempts = 10; attempt <= maxAttempts; ++attempt) { + try { + super.testListEmptyRootDirectory(); + break; + } catch (AssertionError | FileNotFoundException e) { + if (attempt < maxAttempts) { + LOG.info("Attempt {} of {} for empty root directory test failed. " + + "Attempting retry.", attempt, maxAttempts); + try { + Thread.sleep(1000); + } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); + fail("Test interrupted."); + break; + } + } else { + LOG.error( + "Empty root directory test failed {} attempts. Failing test.", + maxAttempts); + throw e; + } + } + } + } +}