diff --git a/.gitignore b/.gitignore index a5d69d094c..194862b311 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,5 @@ hadoop-tools/hadoop-aws/src/test/resources/auth-keys.xml hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml hadoop-tools/hadoop-azure/src/test/resources/azure-auth-keys.xml patchprocess/ +hadoop-tools/hadoop-aliyun/src/test/resources/auth-keys.xml +hadoop-tools/hadoop-aliyun/src/test/resources/contract-test-options.xml diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 65e9672ce4..29c2760d7d 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -436,6 +436,12 @@ ${project.version} + + org.apache.hadoop + hadoop-aliyun + ${project.version} + + org.apache.hadoop hadoop-kms @@ -1004,6 +1010,22 @@ 4.2.0 + + com.aliyun.oss + aliyun-sdk-oss + 2.2.1 + + + org.apache.httpcomponents + httpclient + + + commons-beanutils + commons-beanutils + + + + xerces xercesImpl diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml new file mode 100644 index 0000000000..40d78d0cd6 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml @@ -0,0 +1,18 @@ + + + diff --git a/hadoop-tools/hadoop-aliyun/pom.xml b/hadoop-tools/hadoop-aliyun/pom.xml new file mode 100644 index 0000000000..c87d13f24c --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/pom.xml @@ -0,0 +1,133 @@ + + + + 4.0.0 + + org.apache.hadoop + hadoop-project + 3.0.0-alpha2-SNAPSHOT + ../../hadoop-project + + hadoop-aliyun + Apache Hadoop Aliyun OSS support + jar + + + UTF-8 + true + + + + + tests-off + + + src/test/resources/auth-keys.xml + + + + true + + + + tests-on + + + src/test/resources/auth-keys.xml + + + + false + + + + + + + + org.codehaus.mojo + findbugs-maven-plugin + + true + true + ${basedir}/dev-support/findbugs-exclude.xml + + Max + + + + org.apache.maven.plugins + maven-project-info-reports-plugin + + false + false + + + + org.apache.maven.plugins + maven-surefire-plugin + + 3600 + + + + org.apache.maven.plugins + maven-dependency-plugin + + + deplist + compile + + list + + + + ${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt + + + + + + + + + + junit + junit + test + + + + com.aliyun.oss + aliyun-sdk-oss + compile + + + + org.apache.hadoop + hadoop-common + compile + + + + org.apache.hadoop + hadoop-common + test + test-jar + + + + diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java new file mode 100644 index 0000000000..30ddf8c6a6 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -0,0 +1,847 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +import com.aliyun.oss.common.auth.CredentialsProvider; +import com.aliyun.oss.common.auth.DefaultCredentialProvider; +import com.aliyun.oss.common.auth.DefaultCredentials; +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.UserInfo; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.ProviderUtils; +import org.apache.hadoop.util.Progressable; + +import com.aliyun.oss.ClientConfiguration; +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.common.comm.Protocol; +import com.aliyun.oss.model.AbortMultipartUploadRequest; +import com.aliyun.oss.model.CannedAccessControlList; +import com.aliyun.oss.model.CompleteMultipartUploadRequest; +import com.aliyun.oss.model.CompleteMultipartUploadResult; +import com.aliyun.oss.model.CopyObjectResult; +import com.aliyun.oss.model.DeleteObjectsRequest; +import com.aliyun.oss.model.InitiateMultipartUploadRequest; +import com.aliyun.oss.model.InitiateMultipartUploadResult; +import com.aliyun.oss.model.ListObjectsRequest; +import com.aliyun.oss.model.OSSObjectSummary; +import com.aliyun.oss.model.ObjectListing; +import com.aliyun.oss.model.ObjectMetadata; +import com.aliyun.oss.model.PartETag; +import com.aliyun.oss.model.UploadPartCopyRequest; +import com.aliyun.oss.model.UploadPartCopyResult; + +/** + * Implementation of {@link FileSystem} for + * Aliyun OSS, used to access OSS blob system in a filesystem style. + */ +public class AliyunOSSFileSystem extends FileSystem { + + private URI uri; + private Path workingDir; + private OSSClient ossClient; + private String bucketName; + private long uploadPartSize; + private long multipartThreshold; + private int maxKeys; + private String serverSideEncryptionAlgorithm; + + @Override + public FSDataOutputStream append(Path path, int bufferSize, + Progressable progress) throws IOException { + throw new IOException("Append is not supported!"); + } + + @Override + public void close() throws IOException { + try { + if (ossClient != null) { + ossClient.shutdown(); + } + } finally { + super.close(); + } + } + + @Override + public FSDataOutputStream create(Path path, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + String key = pathToKey(path); + + if (!overwrite && exists(path)) { + throw new FileAlreadyExistsException(path + " already exists"); + } + + return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(), + ossClient, bucketName, key, progress, statistics, + serverSideEncryptionAlgorithm), (Statistics)(null)); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + FileStatus status; + try { + status = getFileStatus(path); + } catch (FileNotFoundException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Couldn't delete " + path + ": Does not exist!"); + } + return false; + } + + String key = pathToKey(status.getPath()); + if (status.isDirectory()) { + if (!key.endsWith("/")) { + key += "/"; + } + if (!recursive) { + FileStatus[] statuses = listStatus(status.getPath()); + // Check whether it is an empty directory or not + if (statuses.length > 0) { + throw new IOException("Cannot remove directory" + path + + ": It is not empty!"); + } else { + // Delete empty directory without '-r' + ossClient.deleteObject(bucketName, key); + statistics.incrementWriteOps(1); + } + } else { + ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); + listRequest.setPrefix(key); + listRequest.setMaxKeys(maxKeys); + + while (true) { + ObjectListing objects = ossClient.listObjects(listRequest); + statistics.incrementReadOps(1); + List keysToDelete = new ArrayList(); + for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { + keysToDelete.add(objectSummary.getKey()); + } + DeleteObjectsRequest deleteRequest = + new DeleteObjectsRequest(bucketName); + deleteRequest.setKeys(keysToDelete); + ossClient.deleteObjects(deleteRequest); + statistics.incrementWriteOps(1); + if (objects.isTruncated()) { + listRequest.setMarker(objects.getNextMarker()); + } else { + break; + } + } + } + } else { + ossClient.deleteObject(bucketName, key); + statistics.incrementWriteOps(1); + } + //TODO: optimize logic here + try { + Path pPath = status.getPath().getParent(); + FileStatus pStatus = getFileStatus(pPath); + if (pStatus.isDirectory()) { + return true; + } else { + throw new IOException("Path " + pPath + + " is assumed to be a directory!"); + } + } catch (FileNotFoundException fnfe) { + // Make sure the parent directory exists + return mkdir(bucketName, pathToKey(status.getPath().getParent())); + } + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + Path qualifiedPath = path.makeQualified(uri, workingDir); + String key = pathToKey(qualifiedPath); + + // Root always exists + if (key.length() == 0) { + return new FileStatus(0, true, 1, 0, 0, qualifiedPath); + } + + ObjectMetadata meta = getObjectMetadata(key); + // If key not found and key does not end with "/" + if (meta == null && !key.endsWith("/")) { + // Case: dir + "/" + key += "/"; + meta = getObjectMetadata(key); + } + if (meta == null) { + // Case: dir + "/" + file + ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); + listRequest.setPrefix(key); + listRequest.setDelimiter("/"); + listRequest.setMaxKeys(1); + + ObjectListing listing = ossClient.listObjects(listRequest); + statistics.incrementReadOps(1); + if (!listing.getObjectSummaries().isEmpty() || + !listing.getCommonPrefixes().isEmpty()) { + return new FileStatus(0, true, 1, 0, 0, qualifiedPath); + } else { + throw new FileNotFoundException(path + ": No such file or directory!"); + } + } else if (objectRepresentsDirectory(key, meta.getContentLength())) { + return new FileStatus(0, true, 1, 0, 0, qualifiedPath); + } else { + return new FileStatus(meta.getContentLength(), false, 1, + getDefaultBlockSize(path), meta.getLastModified().getTime(), + qualifiedPath); + } + } + + /** + * Return object metadata given object key. + * + * @param key object key + * @return return null if key does not exist + */ + private ObjectMetadata getObjectMetadata(String key) { + try { + return ossClient.getObjectMetadata(bucketName, key); + } catch (OSSException osse) { + return null; + } finally { + statistics.incrementReadOps(1); + } + } + + @Override + public String getScheme() { + return "oss"; + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public Path getWorkingDirectory() { + return workingDir; + } + + @Deprecated + public long getDefaultBlockSize() { + return getConf().getLong(FS_OSS_BLOCK_SIZE_KEY, FS_OSS_BLOCK_SIZE_DEFAULT); + } + + @Override + public String getCanonicalServiceName() { + // Does not support Token + return null; + } + + /** + * Initialize new FileSystem. + * + * @param name the uri of the file system, including host, port, etc. + * + * @param conf configuration of the file system + * @throws IOException IO problems + */ + public void initialize(URI name, Configuration conf) throws IOException { + super.initialize(name, conf); + + uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority()); + workingDir = + new Path("/user", + System.getProperty("user.name")).makeQualified(uri, null); + + bucketName = name.getHost(); + + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY, + MAXIMUM_CONNECTIONS_DEFAULT)); + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY, + SECURE_CONNECTIONS_DEFAULT); + clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); + clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY, + MAX_ERROR_RETRIES_DEFAULT)); + clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY, + ESTABLISH_TIMEOUT_DEFAULT)); + clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY, + SOCKET_TIMEOUT_DEFAULT)); + + String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, ""); + int proxyPort = conf.getInt(PROXY_PORT_KEY, -1); + if (!proxyHost.isEmpty()) { + clientConf.setProxyHost(proxyHost); + if (proxyPort >= 0) { + clientConf.setProxyPort(proxyPort); + } else { + if (secureConnections) { + LOG.warn("Proxy host set without port. Using HTTPS default 443"); + clientConf.setProxyPort(443); + } else { + LOG.warn("Proxy host set without port. Using HTTP default 80"); + clientConf.setProxyPort(80); + } + } + String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY); + String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY); + if ((proxyUsername == null) != (proxyPassword == null)) { + String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " + + PROXY_PASSWORD_KEY + " set without the other."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + clientConf.setProxyUsername(proxyUsername); + clientConf.setProxyPassword(proxyPassword); + clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY)); + clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY)); + } else if (proxyPort >= 0) { + String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " + + PROXY_HOST_KEY; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + + String endPoint = conf.getTrimmed(ENDPOINT_KEY, ""); + ossClient = + new OSSClient(endPoint, getCredentialsProvider(name, conf), clientConf); + + maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); + uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, + MULTIPART_UPLOAD_SIZE_DEFAULT); + multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, + MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); + + if (uploadPartSize < 5 * 1024 * 1024) { + LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB"); + uploadPartSize = 5 * 1024 * 1024; + } + + if (multipartThreshold < 5 * 1024 * 1024) { + LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB"); + multipartThreshold = 5 * 1024 * 1024; + } + + if (multipartThreshold > 1024 * 1024 * 1024) { + LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB"); + multipartThreshold = 1024 * 1024 * 1024; + } + + String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT); + if (!cannedACLName.isEmpty()) { + CannedAccessControlList cannedACL = + CannedAccessControlList.valueOf(cannedACLName); + ossClient.setBucketAcl(bucketName, cannedACL); + } + + serverSideEncryptionAlgorithm = + conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, ""); + + setConf(conf); + } + + /** + * Create the default credential provider, or load in one explicitly + * identified in the configuration. + * @param name the uri of the file system + * @param conf configuration + * @return a credential provider + * @throws IOException on any problem. Class construction issues may be + * nested inside the IOE. + */ + private CredentialsProvider getCredentialsProvider(URI name, + Configuration conf) throws IOException { + CredentialsProvider credentials; + + String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY); + if (StringUtils.isEmpty(className)) { + Configuration newConf = + ProviderUtils.excludeIncompatibleCredentialProviders(conf, + AliyunOSSFileSystem.class); + String accessKey = + AliyunOSSUtils.getPassword(newConf, ACCESS_KEY, + UserInfo.EMPTY.getUser()); + String secretKey = + AliyunOSSUtils.getPassword(newConf, SECRET_KEY, + UserInfo.EMPTY.getPassword()); + credentials = + new DefaultCredentialProvider( + new DefaultCredentials(accessKey, secretKey)); + + } else { + try { + LOG.debug("Credential provider class is:" + className); + Class credClass = Class.forName(className); + try { + credentials = + (CredentialsProvider)credClass.getDeclaredConstructor( + URI.class, Configuration.class).newInstance(this.uri, conf); + } catch (NoSuchMethodException | SecurityException e) { + credentials = + (CredentialsProvider)credClass.getDeclaredConstructor() + .newInstance(); + } + } catch (ClassNotFoundException e) { + throw new IOException(className + " not found.", e); + } catch (NoSuchMethodException | SecurityException e) { + throw new IOException(String.format("%s constructor exception. A " + + "class specified in %s must provide an accessible constructor " + + "accepting URI and Configuration, or an accessible default " + + "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY), e); + } catch (ReflectiveOperationException | IllegalArgumentException e) { + throw new IOException(className + " instantiation exception.", e); + } + } + + return credentials; + } + + /** + * Check if OSS object represents a directory. + * + * @param name object key + * @param size object content length + * @return true if object represents a directory + */ + private boolean objectRepresentsDirectory(final String name, + final long size) { + return !name.isEmpty() && name.endsWith("/") && size == 0L; + } + + /** + * Turns a path (relative or otherwise) into an OSS key. + * + * @param path the path of the file + * @return the key of the object that represent the file + */ + private String pathToKey(Path path) { + if (!path.isAbsolute()) { + path = new Path(workingDir, path); + } + + if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { + return ""; + } + + return path.toUri().getPath().substring(1); + } + + private Path keyToPath(String key) { + return new Path("/" + key); + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + String key = pathToKey(path); + if (LOG.isDebugEnabled()) { + LOG.debug("List status for path: " + path); + } + + final List result = new ArrayList(); + final FileStatus fileStatus = getFileStatus(path); + + if (fileStatus.isDirectory()) { + if (!key.endsWith("/")) { + key = key + "/"; + } + + ListObjectsRequest listObjectsRequest = + new ListObjectsRequest(bucketName); + listObjectsRequest.setPrefix(key); + listObjectsRequest.setDelimiter("/"); + listObjectsRequest.setMaxKeys(maxKeys); + + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: doing listObjects for directory " + key); + } + + while (true) { + ObjectListing objects = ossClient.listObjects(listObjectsRequest); + statistics.incrementReadOps(1); + for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { + Path keyPath = keyToPath(objectSummary.getKey()) + .makeQualified(uri, workingDir); + if (keyPath.equals(path)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring: " + keyPath); + } + continue; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: fi: " + keyPath); + } + result.add(new FileStatus(objectSummary.getSize(), false, 1, + getDefaultBlockSize(keyPath), + objectSummary.getLastModified().getTime(), keyPath)); + } + } + + for (String prefix : objects.getCommonPrefixes()) { + Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); + if (keyPath.equals(path)) { + continue; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: rd: " + keyPath); + } + result.add(new FileStatus(0, true, 1, 0, 0, keyPath)); + } + } + + if (objects.isTruncated()) { + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: list truncated - getting next batch"); + } + listObjectsRequest.setMarker(objects.getNextMarker()); + statistics.incrementReadOps(1); + } else { + break; + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: rd (not a dir): " + path); + } + result.add(fileStatus); + } + + return result.toArray(new FileStatus[result.size()]); + } + + /** + * Used to create an empty file that represents an empty directory. + * + * @param bucketName the bucket this directory belongs to + * @param objectName directory path + * @return true if directory successfully created + * @throws IOException + */ + private boolean mkdir(final String bucket, final String objectName) + throws IOException { + String dirName = objectName; + ObjectMetadata dirMeta = new ObjectMetadata(); + byte[] buffer = new byte[0]; + ByteArrayInputStream in = new ByteArrayInputStream(buffer); + dirMeta.setContentLength(0); + if (!objectName.endsWith("/")) { + dirName += "/"; + } + try { + ossClient.putObject(bucket, dirName, in, dirMeta); + return true; + } finally { + in.close(); + } + } + + @Override + public boolean mkdirs(Path path, FsPermission permission) + throws IOException { + try { + FileStatus fileStatus = getFileStatus(path); + + if (fileStatus.isDirectory()) { + return true; + } else { + throw new FileAlreadyExistsException("Path is a file: " + path); + } + } catch (FileNotFoundException e) { + validatePath(path); + String key = pathToKey(path); + return mkdir(bucketName, key); + } + } + + /** + * Check whether the path is a valid path. + * + * @param path the path to be checked + * @throws IOException + */ + private void validatePath(Path path) throws IOException { + Path fPart = path.getParent(); + do { + try { + FileStatus fileStatus = getFileStatus(fPart); + if (fileStatus.isDirectory()) { + // If path exists and a directory, exit + break; + } else { + throw new FileAlreadyExistsException(String.format( + "Can't make directory for path '%s', it is a file.", fPart)); + } + } catch (FileNotFoundException fnfe) { + } + fPart = fPart.getParent(); + } while (fPart != null); + } + + @Override + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + final FileStatus fileStatus = getFileStatus(path); + if (fileStatus.isDirectory()) { + throw new FileNotFoundException("Can't open " + path + + " because it is a directory"); + } + + return new FSDataInputStream(new AliyunOSSInputStream(getConf(), ossClient, + bucketName, pathToKey(path), fileStatus.getLen(), statistics)); + } + + @Override + public boolean rename(Path srcPath, Path dstPath) throws IOException { + if (srcPath.isRoot()) { + // Cannot rename root of file system + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot rename the root of a filesystem"); + } + return false; + } + Path parent = dstPath.getParent(); + while (parent != null && !srcPath.equals(parent)) { + parent = parent.getParent(); + } + if (parent != null) { + return false; + } + FileStatus srcStatus = getFileStatus(srcPath); + FileStatus dstStatus; + try { + dstStatus = getFileStatus(dstPath); + } catch (FileNotFoundException fnde) { + dstStatus = null; + } + if (dstStatus == null) { + // If dst doesn't exist, check whether dst dir exists or not + dstStatus = getFileStatus(dstPath.getParent()); + if (!dstStatus.isDirectory()) { + throw new IOException(String.format( + "Failed to rename %s to %s, %s is a file", srcPath, dstPath, + dstPath.getParent())); + } + } else { + if (srcStatus.getPath().equals(dstStatus.getPath())) { + return !srcStatus.isDirectory(); + } else if (dstStatus.isDirectory()) { + // If dst is a directory + dstPath = new Path(dstPath, srcPath.getName()); + FileStatus[] statuses; + try { + statuses = listStatus(dstPath); + } catch (FileNotFoundException fnde) { + statuses = null; + } + if (statuses != null && statuses.length > 0) { + // If dst exists and not a directory / not empty + throw new FileAlreadyExistsException(String.format( + "Failed to rename %s to %s, file already exists or not empty!", + srcPath, dstPath)); + } + } else { + // If dst is not a directory + throw new FileAlreadyExistsException(String.format( + "Failed to rename %s to %s, file already exists!", srcPath, + dstPath)); + } + } + if (srcStatus.isDirectory()) { + copyDirectory(srcPath, dstPath); + } else { + copyFile(srcPath, dstPath); + } + if (srcPath.equals(dstPath)) { + return true; + } else { + return delete(srcPath, true); + } + } + + /** + * Copy file from source path to destination path. + * (the caller should make sure srcPath is a file and dstPath is valid.) + * + * @param srcPath source path + * @param dstPath destination path + * @return true if successfully copied + */ + private boolean copyFile(Path srcPath, Path dstPath) { + String srcKey = pathToKey(srcPath); + String dstKey = pathToKey(dstPath); + return copyFile(srcKey, dstKey); + } + + /** + * Copy an object from source key to destination key. + * + * @param srcKey source key + * @param dstKey destination key + * @return true if successfully copied + */ + private boolean copyFile(String srcKey, String dstKey) { + ObjectMetadata objectMeta = + ossClient.getObjectMetadata(bucketName, srcKey); + long dataLen = objectMeta.getContentLength(); + if (dataLen <= multipartThreshold) { + return singleCopy(srcKey, dstKey); + } else { + return multipartCopy(srcKey, dataLen, dstKey); + } + } + + /** + * Use single copy to copy an oss object. + * + * @param srcKey source key + * @param dstKey destination key + * @return true if successfully copied + * (the caller should make sure srcPath is a file and dstPath is valid) + */ + private boolean singleCopy(String srcKey, String dstKey) { + CopyObjectResult copyResult = + ossClient.copyObject(bucketName, srcKey, bucketName, dstKey); + LOG.debug(copyResult.getETag()); + return true; + } + + /** + * Use multipart copy to copy an oss object. + * (the caller should make sure srcPath is a file and dstPath is valid) + * + * @param srcKey source key + * @param dataLen data size of the object to copy + * @param dstKey destination key + * @return true if successfully copied, or false if upload is aborted + */ + private boolean multipartCopy(String srcKey, long dataLen, String dstKey) { + int partNum = (int)(dataLen / uploadPartSize); + if (dataLen % uploadPartSize != 0) { + partNum++; + } + InitiateMultipartUploadRequest initiateMultipartUploadRequest = + new InitiateMultipartUploadRequest(bucketName, dstKey); + ObjectMetadata meta = new ObjectMetadata(); + if (!serverSideEncryptionAlgorithm.isEmpty()) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + initiateMultipartUploadRequest.setObjectMetadata(meta); + InitiateMultipartUploadResult initiateMultipartUploadResult = + ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); + String uploadId = initiateMultipartUploadResult.getUploadId(); + List partETags = new ArrayList(); + try { + for (int i = 0; i < partNum; i++) { + long skipBytes = uploadPartSize * i; + long size = (uploadPartSize < dataLen - skipBytes) ? + uploadPartSize : dataLen - skipBytes; + UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest(); + partCopyRequest.setSourceBucketName(bucketName); + partCopyRequest.setSourceKey(srcKey); + partCopyRequest.setBucketName(bucketName); + partCopyRequest.setKey(dstKey); + partCopyRequest.setUploadId(uploadId); + partCopyRequest.setPartSize(size); + partCopyRequest.setBeginIndex(skipBytes); + partCopyRequest.setPartNumber(i + 1); + UploadPartCopyResult partCopyResult = + ossClient.uploadPartCopy(partCopyRequest); + statistics.incrementWriteOps(1); + partETags.add(partCopyResult.getPartETag()); + } + CompleteMultipartUploadRequest completeMultipartUploadRequest = + new CompleteMultipartUploadRequest(bucketName, dstKey, + uploadId, partETags); + CompleteMultipartUploadResult completeMultipartUploadResult = + ossClient.completeMultipartUpload(completeMultipartUploadRequest); + LOG.debug(completeMultipartUploadResult.getETag()); + return true; + } catch (Exception e) { + AbortMultipartUploadRequest abortMultipartUploadRequest = + new AbortMultipartUploadRequest(bucketName, dstKey, uploadId); + ossClient.abortMultipartUpload(abortMultipartUploadRequest); + return false; + } + } + + /** + * Copy a directory from source path to destination path. + * (the caller should make sure srcPath is a directory, and dstPath is valid) + * + * @param srcPath source path + * @param dstPath destination path + * @return true if successfully copied + */ + private boolean copyDirectory(Path srcPath, Path dstPath) { + String srcKey = pathToKey(srcPath); + String dstKey = pathToKey(dstPath); + + if (!srcKey.endsWith("/")) { + srcKey = srcKey + "/"; + } + if (!dstKey.endsWith("/")) { + dstKey = dstKey + "/"; + } + + if (dstKey.startsWith(srcKey)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot rename a directory to a subdirectory of self"); + } + return false; + } + + ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName); + listObjectsRequest.setPrefix(srcKey); + listObjectsRequest.setMaxKeys(maxKeys); + + ObjectListing objects = ossClient.listObjects(listObjectsRequest); + statistics.incrementReadOps(1); + // Copy files from src folder to dst + while (true) { + for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { + String newKey = + dstKey.concat(objectSummary.getKey().substring(srcKey.length())); + copyFile(objectSummary.getKey(), newKey); + } + if (objects.isTruncated()) { + listObjectsRequest.setMarker(objects.getNextMarker()); + statistics.incrementReadOps(1); + } else { + break; + } + } + return true; + } + + @Override + public void setWorkingDirectory(Path dir) { + this.workingDir = dir; + } + +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java new file mode 100644 index 0000000000..bcd00dc50e --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem.Statistics; + +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.model.GetObjectRequest; + +/** + * The input stream for OSS blob system. + * The class uses multi-part downloading to read data from the object content + * stream. + */ +public class AliyunOSSInputStream extends FSInputStream { + public static final Log LOG = LogFactory.getLog(AliyunOSSInputStream.class); + private static final int MAX_RETRIES = 10; + private final long downloadPartSize; + + private String bucketName; + private String key; + private OSSClient ossClient; + private Statistics statistics; + private boolean closed; + private InputStream wrappedStream = null; + private long dataLen; + private long position; + private long partRemaining; + + public AliyunOSSInputStream(Configuration conf, OSSClient client, + String bucketName, String key, Long dataLen, Statistics statistics) + throws IOException { + this.bucketName = bucketName; + this.key = key; + ossClient = client; + this.statistics = statistics; + this.dataLen = dataLen; + downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, + MULTIPART_DOWNLOAD_SIZE_DEFAULT); + reopen(0); + closed = false; + } + + /** + * Reopen the wrapped stream at give position, by seeking for + * data of a part length from object content stream. + * + * @param pos position from start of a file + * @throws IOException if failed to reopen + */ + private synchronized void reopen(long pos) throws IOException { + + long partLen; + + if (pos < 0) { + throw new EOFException("Cannot seek at negtive position:" + pos); + } else if (pos > dataLen) { + throw new EOFException("Cannot seek after EOF, fileLen:" + dataLen + + " position:" + pos); + } else if (pos + downloadPartSize > dataLen) { + partLen = dataLen - pos; + } else { + partLen = downloadPartSize; + } + + if (wrappedStream != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Aborting old stream to open at pos " + pos); + } + wrappedStream.close(); + } + + GetObjectRequest request = new GetObjectRequest(bucketName, key); + request.setRange(pos, pos + partLen - 1); + wrappedStream = ossClient.getObject(request).getObjectContent(); + if (wrappedStream == null) { + throw new IOException("Null IO stream"); + } + position = pos; + partRemaining = partLen; + } + + @Override + public synchronized int read() throws IOException { + checkNotClosed(); + + if (partRemaining <= 0 && position < dataLen) { + reopen(position); + } + + int tries = MAX_RETRIES; + boolean retry; + int byteRead = -1; + do { + retry = false; + try { + byteRead = wrappedStream.read(); + } catch (Exception e) { + handleReadException(e, --tries); + retry = true; + } + } while (retry); + if (byteRead >= 0) { + position++; + partRemaining--; + } + + if (statistics != null && byteRead >= 0) { + statistics.incrementBytesRead(1); + } + return byteRead; + } + + + /** + * Check whether the input stream is closed. + * + * @throws IOException if stream is closed + */ + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException("Stream is closed!"); + } + } + + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + checkNotClosed(); + + if (buf == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > buf.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + int bytesRead = 0; + // Not EOF, and read not done + while (position < dataLen && bytesRead < len) { + if (partRemaining == 0) { + reopen(position); + } + + int tries = MAX_RETRIES; + boolean retry; + int bytes = -1; + do { + retry = false; + try { + bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead); + } catch (Exception e) { + handleReadException(e, --tries); + retry = true; + } + } while (retry); + + if (bytes > 0) { + bytesRead += bytes; + position += bytes; + partRemaining -= bytes; + } else if (partRemaining != 0) { + throw new IOException("Failed to read from stream. Remaining:" + + partRemaining); + } + } + + if (statistics != null && bytesRead > 0) { + statistics.incrementBytesRead(bytesRead); + } + + // Read nothing, but attempt to read something + if (bytesRead == 0 && len > 0) { + return -1; + } else { + return bytesRead; + } + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + closed = true; + if (wrappedStream != null) { + wrappedStream.close(); + } + } + + @Override + public synchronized int available() throws IOException { + checkNotClosed(); + + long remaining = dataLen - position; + if (remaining > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int)remaining; + } + + @Override + public void seek(long pos) throws IOException { + checkNotClosed(); + if (position == pos) { + return; + } else if (pos > position && pos < position + partRemaining) { + wrappedStream.skip(pos - position); + position = pos; + } else { + reopen(pos); + } + } + + @Override + public long getPos() throws IOException { + checkNotClosed(); + return position; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + checkNotClosed(); + return false; + } + + private void handleReadException(Exception e, int tries) throws IOException{ + if (tries == 0) { + throw new IOException(e); + } + + LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" + + " connection at position '" + position + "', " + e.getMessage()); + try { + Thread.sleep(100); + } catch (InterruptedException e2) { + LOG.warn(e2.getMessage()); + } + reopen(position); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java new file mode 100644 index 0000000000..589e014f45 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.util.Progressable; + +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.model.AbortMultipartUploadRequest; +import com.aliyun.oss.model.CompleteMultipartUploadRequest; +import com.aliyun.oss.model.CompleteMultipartUploadResult; +import com.aliyun.oss.model.InitiateMultipartUploadRequest; +import com.aliyun.oss.model.InitiateMultipartUploadResult; +import com.aliyun.oss.model.ObjectMetadata; +import com.aliyun.oss.model.PartETag; +import com.aliyun.oss.model.PutObjectResult; +import com.aliyun.oss.model.UploadPartRequest; +import com.aliyun.oss.model.UploadPartResult; + +/** + * The output stream for OSS blob system. + * Data will be buffered on local disk, then uploaded to OSS in + * {@link #close()} method. + */ +public class AliyunOSSOutputStream extends OutputStream { + public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class); + private String bucketName; + private String key; + private Statistics statistics; + private Progressable progress; + private String serverSideEncryptionAlgorithm; + private long partSize; + private long partSizeThreshold; + private LocalDirAllocator dirAlloc; + private boolean closed; + private File tmpFile; + private BufferedOutputStream backupStream; + private OSSClient ossClient; + + public AliyunOSSOutputStream(Configuration conf, OSSClient client, + String bucketName, String key, Progressable progress, + Statistics statistics, String serverSideEncryptionAlgorithm) + throws IOException { + this.bucketName = bucketName; + this.key = key; + // The caller cann't get any progress information + this.progress = progress; + ossClient = client; + this.statistics = statistics; + this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; + + partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, + MULTIPART_UPLOAD_SIZE_DEFAULT); + partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, + MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); + + if (conf.get(BUFFER_DIR_KEY) == null) { + conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss"); + } + dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY); + + tmpFile = dirAlloc.createTmpFileForWrite("output-", + LocalDirAllocator.SIZE_UNKNOWN, conf); + backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile)); + closed = false; + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + closed = true; + if (backupStream != null) { + backupStream.close(); + } + long dataLen = tmpFile.length(); + try { + if (dataLen <= partSizeThreshold) { + uploadObject(); + } else { + multipartUploadObject(); + } + } finally { + tmpFile.delete(); + } + } + + /** + * Upload temporary file as an OSS object, using single upload. + * + * @throws IOException + */ + private void uploadObject() throws IOException { + File object = tmpFile.getAbsoluteFile(); + FileInputStream fis = new FileInputStream(object); + ObjectMetadata meta = new ObjectMetadata(); + meta.setContentLength(object.length()); + if (!serverSideEncryptionAlgorithm.isEmpty()) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + try { + PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta); + LOG.debug(result.getETag()); + statistics.incrementWriteOps(1); + } finally { + fis.close(); + } + } + + /** + * Upload temporary file as an OSS object, using multipart upload. + * + * @throws IOException + */ + private void multipartUploadObject() throws IOException { + File object = tmpFile.getAbsoluteFile(); + long dataLen = object.length(); + InitiateMultipartUploadRequest initiateMultipartUploadRequest = + new InitiateMultipartUploadRequest(bucketName, key); + ObjectMetadata meta = new ObjectMetadata(); + // meta.setContentLength(dataLen); + if (!serverSideEncryptionAlgorithm.isEmpty()) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + initiateMultipartUploadRequest.setObjectMetadata(meta); + InitiateMultipartUploadResult initiateMultipartUploadResult = + ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); + int partNum = (int)(dataLen / partSize); + if (dataLen % partSize != 0) { + partNum += 1; + } + if (partNum > MULTIPART_UPLOAD_PART_NUM_LIMIT) { + throw new IOException("Number of parts " + partNum + " should not be " + + "bigger than limit " + MULTIPART_UPLOAD_PART_NUM_LIMIT); + } + List partETags = new ArrayList(); + String uploadId = initiateMultipartUploadResult.getUploadId(); + + try { + for (int i = 0; i < partNum; i++) { + // TODO: Optimize this, avoid opening the object multiple times + FileInputStream fis = new FileInputStream(object); + try { + long skipBytes = partSize * i; + fis.skip(skipBytes); + long size = (partSize < dataLen - skipBytes) ? + partSize : dataLen - skipBytes; + UploadPartRequest uploadPartRequest = new UploadPartRequest(); + uploadPartRequest.setBucketName(bucketName); + uploadPartRequest.setKey(key); + uploadPartRequest.setUploadId(uploadId); + uploadPartRequest.setInputStream(fis); + uploadPartRequest.setPartSize(size); + uploadPartRequest.setPartNumber(i + 1); + UploadPartResult uploadPartResult = + ossClient.uploadPart(uploadPartRequest); + statistics.incrementWriteOps(1); + partETags.add(uploadPartResult.getPartETag()); + } finally { + fis.close(); + } + } + CompleteMultipartUploadRequest completeMultipartUploadRequest = + new CompleteMultipartUploadRequest(bucketName, key, + uploadId, partETags); + CompleteMultipartUploadResult completeMultipartUploadResult = + ossClient.completeMultipartUpload(completeMultipartUploadRequest); + LOG.debug(completeMultipartUploadResult.getETag()); + } catch (Exception e) { + AbortMultipartUploadRequest abortMultipartUploadRequest = + new AbortMultipartUploadRequest(bucketName, key, uploadId); + ossClient.abortMultipartUpload(abortMultipartUploadRequest); + } + } + + @Override + public synchronized void flush() throws IOException { + backupStream.flush(); + } + + @Override + public synchronized void write(int b) throws IOException { + backupStream.write(b); + statistics.incrementBytesWritten(1); + } + +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java new file mode 100644 index 0000000000..3f66a4fc5e --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URLDecoder; +import java.util.Objects; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +/** + * Utility methods for Aliyun OSS code. + */ +final public class AliyunOSSUtils { + private AliyunOSSUtils() { + } + + /** + * User information includes user name and password. + */ + static public class UserInfo { + private final String user; + private final String password; + + public static final UserInfo EMPTY = new UserInfo("", ""); + + public UserInfo(String user, String password) { + this.user = user; + this.password = password; + } + + /** + * Predicate to verify user information is set. + * @return true if the username is defined (not null, not empty). + */ + public boolean hasLogin() { + return StringUtils.isNotEmpty(user); + } + + /** + * Equality test matches user and password. + * @param o other object + * @return true if the objects are considered equivalent. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UserInfo that = (UserInfo) o; + return Objects.equals(user, that.user) && + Objects.equals(password, that.password); + } + + @Override + public int hashCode() { + return Objects.hash(user, password); + } + + public String getUser() { + return user; + } + + public String getPassword() { + return password; + } + } + + /** + * Used to get password from configuration, if default value is not available. + * @param conf configuration that contains password information + * @param key the key of the password + * @param val the default value of the key + * @return the value for the key + * @throws IOException if failed to get password from configuration + */ + static public String getPassword(Configuration conf, String key, String val) + throws IOException { + if (StringUtils.isEmpty(val)) { + try { + final char[] pass = conf.getPassword(key); + if (pass != null) { + return (new String(pass)).trim(); + } else { + return ""; + } + } catch (IOException ioe) { + throw new IOException("Cannot find password option " + key, ioe); + } + } else { + return val; + } + } + + /** + * Extract the user information details from a URI. + * @param name URI of the filesystem + * @return a login tuple, possibly empty. + */ + public static UserInfo extractLoginDetails(URI name) { + try { + String authority = name.getAuthority(); + if (authority == null) { + return UserInfo.EMPTY; + } + int loginIndex = authority.indexOf('@'); + if (loginIndex < 0) { + // No user information + return UserInfo.EMPTY; + } + String login = authority.substring(0, loginIndex); + int loginSplit = login.indexOf(':'); + if (loginSplit > 0) { + String user = login.substring(0, loginSplit); + String password = URLDecoder.decode(login.substring(loginSplit + 1), + "UTF-8"); + return new UserInfo(user, password); + } else if (loginSplit == 0) { + // There is no user, just a password. + return UserInfo.EMPTY; + } else { + return new UserInfo(login, ""); + } + } catch (UnsupportedEncodingException e) { + // This should never happen; translate it if it does. + throw new RuntimeException(e); + } + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java new file mode 100644 index 0000000000..4ee4cd4f47 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +/** + * ALL configuration constants for OSS filesystem. + */ +public final class Constants { + + private Constants() { + } + + // Class of credential provider + public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY = + "fs.oss.credentials.provider"; + + // OSS access verification + public static final String ACCESS_KEY = "fs.oss.access.key"; + public static final String SECRET_KEY = "fs.oss.secret.key"; + + // Number of simultaneous connections to oss + public static final String MAXIMUM_CONNECTIONS_KEY = + "fs.oss.connection.maximum"; + public static final int MAXIMUM_CONNECTIONS_DEFAULT = 32; + + // Connect to oss over ssl + public static final String SECURE_CONNECTIONS_KEY = + "fs.oss.connection.secure.enabled"; + public static final boolean SECURE_CONNECTIONS_DEFAULT = true; + + // Use a custom endpoint + public static final String ENDPOINT_KEY = "fs.oss.endpoint"; + + // Connect to oss through a proxy server + public static final String PROXY_HOST_KEY = "fs.oss.proxy.host"; + public static final String PROXY_PORT_KEY = "fs.oss.proxy.port"; + public static final String PROXY_USERNAME_KEY = "fs.oss.proxy.username"; + public static final String PROXY_PASSWORD_KEY = "fs.oss.proxy.password"; + public static final String PROXY_DOMAIN_KEY = "fs.oss.proxy.domain"; + public static final String PROXY_WORKSTATION_KEY = + "fs.oss.proxy.workstation"; + + // Number of times we should retry errors + public static final String MAX_ERROR_RETRIES_KEY = "fs.oss.attempts.maximum"; + public static final int MAX_ERROR_RETRIES_DEFAULT = 20; + + // Time until we give up trying to establish a connection to oss + public static final String ESTABLISH_TIMEOUT_KEY = + "fs.oss.connection.establish.timeout"; + public static final int ESTABLISH_TIMEOUT_DEFAULT = 50000; + + // Time until we give up on a connection to oss + public static final String SOCKET_TIMEOUT_KEY = "fs.oss.connection.timeout"; + public static final int SOCKET_TIMEOUT_DEFAULT = 200000; + + // Number of records to get while paging through a directory listing + public static final String MAX_PAGING_KEYS_KEY = "fs.oss.paging.maximum"; + public static final int MAX_PAGING_KEYS_DEFAULT = 500; + + // Size of each of or multipart pieces in bytes + public static final String MULTIPART_UPLOAD_SIZE_KEY = + "fs.oss.multipart.upload.size"; + + public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024; + public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 1000; + + // Minimum size in bytes before we start a multipart uploads or copy + public static final String MIN_MULTIPART_UPLOAD_THRESHOLD_KEY = + "fs.oss.multipart.upload.threshold"; + public static final long MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT = + 20 * 1024 * 1024; + + public static final String MULTIPART_DOWNLOAD_SIZE_KEY = + "fs.oss.multipart.download.size"; + + public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024; + + // Comma separated list of directories + public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir"; + + // private | public-read | public-read-write | authenticated-read | + // log-delivery-write | bucket-owner-read | bucket-owner-full-control + public static final String CANNED_ACL_KEY = "fs.oss.acl.default"; + public static final String CANNED_ACL_DEFAULT = ""; + + // OSS server-side encryption + public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY = + "fs.oss.server-side-encryption-algorithm"; + + public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size"; + public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024; + public static final String FS_OSS = "oss"; + +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java new file mode 100644 index 0000000000..234567b2b0 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Aliyun OSS Filesystem. + */ +package org.apache.hadoop.fs.aliyun.oss; \ No newline at end of file diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/OSSTestUtils.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/OSSTestUtils.java new file mode 100644 index 0000000000..37ed831917 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/OSSTestUtils.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.junit.internal.AssumptionViolatedException; + +import java.io.IOException; +import java.net.URI; +import java.util.Date; +import java.util.Random; + +/** + * Utility class for OSS Tests. + */ +public final class OSSTestUtils { + + private OSSTestUtils() { + } + + /** + * Create the test filesystem. + * + * If the test.fs.oss.name property is not set, + * tests will fail. + * + * @param conf configuration + * @return the FS + * @throws IOException + */ + public static AliyunOSSFileSystem createTestFileSystem(Configuration conf) + throws IOException { + String fsname = conf.getTrimmed( + TestOSSFileSystemContract.TEST_FS_OSS_NAME, ""); + + boolean liveTest = !StringUtils.isEmpty(fsname); + URI testURI = null; + if (liveTest) { + testURI = URI.create(fsname); + liveTest = testURI.getScheme().equals(Constants.FS_OSS); + } + + if (!liveTest) { + throw new AssumptionViolatedException("No test filesystem in " + + TestOSSFileSystemContract.TEST_FS_OSS_NAME); + } + AliyunOSSFileSystem ossfs = new AliyunOSSFileSystem(); + ossfs.initialize(testURI, conf); + return ossfs; + } + + /** + * Generate unique test path for multiple user tests. + * + * @return root test path + */ + public static String generateUniqueTestPath() { + Long time = new Date().getTime(); + Random rand = new Random(); + return "/test_" + Long.toString(time) + "_" + + Long.toString(Math.abs(rand.nextLong())); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemContract.java new file mode 100644 index 0000000000..de4e5a9315 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemContract.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystemContractBaseTest; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; + +/** + * Tests a live OSS system. + * + * This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from + * TestCase which uses the old Junit3 runner that doesn't ignore assumptions + * properly making it impossible to skip the tests if we don't have a valid + * bucket. + */ +public class TestOSSFileSystemContract extends FileSystemContractBaseTest { + + protected static final Logger LOG = + LoggerFactory.getLogger(TestOSSFileSystemContract.class); + + public static final String TEST_FS_OSS_NAME = "test.fs.oss.name"; + private static String testRootPath = OSSTestUtils.generateUniqueTestPath(); + + @Override + public void setUp() throws Exception { + Configuration conf = new Configuration(); + fs = OSSTestUtils.createTestFileSystem(conf); + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + if (fs != null) { + fs.delete(super.path(testRootPath), true); + } + super.tearDown(); + } + + @Override + protected Path path(String path) { + if (path.startsWith("/")) { + return super.path(testRootPath + path); + } else { + return super.path(testRootPath + "/" + path); + } + } + + @Override + public void testMkdirsWithUmask() throws Exception { + // not supported + } + + /** + * Assert that root directory renames are not allowed. + * + * @throws Exception on failures + */ + @Override + public void testRootDirAlwaysExists() throws Exception { + //this will throw an exception if the path is not found + fs.getFileStatus(super.path("/")); + //this catches overrides of the base exists() method that don't + //use getFileStatus() as an existence probe + assertTrue("FileSystem.exists() fails for root", + fs.exists(super.path("/"))); + } + + /** + * Assert that root directory renames are not allowed. + * + * @throws Exception on failures + */ + @Override + public void testRenameRootDirForbidden() throws Exception { + if (!renameSupported()) { + return; + } + rename(super.path("/"), + super.path("/test/newRootDir"), + false, true, false); + } + + public void testDeleteSubdir() throws IOException { + Path parentDir = this.path("/test/hadoop"); + Path file = this.path("/test/hadoop/file"); + Path subdir = this.path("/test/hadoop/subdir"); + this.createFile(file); + + assertTrue("Created subdir", this.fs.mkdirs(subdir)); + assertTrue("File exists", this.fs.exists(file)); + assertTrue("Parent dir exists", this.fs.exists(parentDir)); + assertTrue("Subdir exists", this.fs.exists(subdir)); + + assertTrue("Deleted subdir", this.fs.delete(subdir, true)); + assertTrue("Parent should exist", this.fs.exists(parentDir)); + + assertTrue("Deleted file", this.fs.delete(file, false)); + assertTrue("Parent should exist", this.fs.exists(parentDir)); + } + + + @Override + protected boolean renameSupported() { + return true; + } + + @Override + public void testRenameNonExistentPath() throws Exception { + if (this.renameSupported()) { + Path src = this.path("/test/hadoop/path"); + Path dst = this.path("/test/new/newpath"); + try { + super.rename(src, dst, false, false, false); + fail("Should throw FileNotFoundException!"); + } catch (FileNotFoundException e) { + // expected + } + } + } + + @Override + public void testRenameFileMoveToNonExistentDirectory() throws Exception { + if (this.renameSupported()) { + Path src = this.path("/test/hadoop/file"); + this.createFile(src); + Path dst = this.path("/test/new/newfile"); + try { + super.rename(src, dst, false, true, false); + fail("Should throw FileNotFoundException!"); + } catch (FileNotFoundException e) { + // expected + } + } + } + + @Override + public void testRenameDirectoryMoveToNonExistentDirectory() throws Exception { + if (this.renameSupported()) { + Path src = this.path("/test/hadoop/dir"); + this.fs.mkdirs(src); + Path dst = this.path("/test/new/newdir"); + try { + super.rename(src, dst, false, true, false); + fail("Should throw FileNotFoundException!"); + } catch (FileNotFoundException e) { + // expected + } + } + } + + @Override + public void testRenameFileMoveToExistingDirectory() throws Exception { + super.testRenameFileMoveToExistingDirectory(); + } + + @Override + public void testRenameFileAsExistingFile() throws Exception { + if (this.renameSupported()) { + Path src = this.path("/test/hadoop/file"); + this.createFile(src); + Path dst = this.path("/test/new/newfile"); + this.createFile(dst); + try { + super.rename(src, dst, false, true, true); + fail("Should throw FileAlreadyExistsException"); + } catch (FileAlreadyExistsException e) { + // expected + } + } + } + + @Override + public void testRenameDirectoryAsExistingFile() throws Exception { + if (this.renameSupported()) { + Path src = this.path("/test/hadoop/dir"); + this.fs.mkdirs(src); + Path dst = this.path("/test/new/newfile"); + this.createFile(dst); + try { + super.rename(src, dst, false, true, true); + fail("Should throw FileAlreadyExistsException"); + } catch (FileAlreadyExistsException e) { + // expected + } + } + } + + public void testGetFileStatusFileAndDirectory() throws Exception { + Path filePath = this.path("/test/oss/file1"); + this.createFile(filePath); + assertTrue("Should be file", this.fs.getFileStatus(filePath).isFile()); + assertFalse("Should not be directory", + this.fs.getFileStatus(filePath).isDirectory()); + + Path dirPath = this.path("/test/oss/dir"); + this.fs.mkdirs(dirPath); + assertTrue("Should be directory", + this.fs.getFileStatus(dirPath).isDirectory()); + assertFalse("Should not be file", this.fs.getFileStatus(dirPath).isFile()); + } + + public void testMkdirsForExistingFile() throws Exception { + Path testFile = this.path("/test/hadoop/file"); + assertFalse(this.fs.exists(testFile)); + this.createFile(testFile); + assertTrue(this.fs.exists(testFile)); + try { + this.fs.mkdirs(testFile); + fail("Should throw FileAlreadyExistsException!"); + } catch (FileAlreadyExistsException e) { + // expected + } + } + + public void testWorkingDirectory() throws Exception { + Path workDir = super.path(this.getDefaultWorkingDirectory()); + assertEquals(workDir, this.fs.getWorkingDirectory()); + this.fs.setWorkingDirectory(super.path(".")); + assertEquals(workDir, this.fs.getWorkingDirectory()); + this.fs.setWorkingDirectory(super.path("..")); + assertEquals(workDir.getParent(), this.fs.getWorkingDirectory()); + Path relativeDir = super.path("hadoop"); + this.fs.setWorkingDirectory(relativeDir); + assertEquals(relativeDir, this.fs.getWorkingDirectory()); + Path absoluteDir = super.path("/test/hadoop"); + this.fs.setWorkingDirectory(absoluteDir); + assertEquals(absoluteDir, this.fs.getWorkingDirectory()); + } + +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSInputStream.java new file mode 100644 index 0000000000..411cd576f7 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSInputStream.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.io.IOUtils; +import org.junit.*; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; + +import static org.junit.Assert.assertTrue; + +/** + * Tests basic functionality for AliyunOSSInputStream, including seeking and + * reading files. + */ +public class TestOSSInputStream { + + private FileSystem fs; + + protected static final Logger LOG = + LoggerFactory.getLogger(TestOSSInputStream.class); + + private static String testRootPath = OSSTestUtils.generateUniqueTestPath(); + + @Rule + public Timeout testTimeout = new Timeout(30 * 60 * 1000); + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + fs = OSSTestUtils.createTestFileSystem(conf); + } + + @After + public void tearDown() throws Exception { + if (fs != null) { + fs.delete(new Path(testRootPath), true); + } + } + + private Path setPath(String path) { + if (path.startsWith("/")) { + return new Path(testRootPath + path); + } else { + return new Path(testRootPath + "/" + path); + } + } + + @Test + public void testSeekFile() throws Exception { + Path smallSeekFile = setPath("/test/smallSeekFile.txt"); + long size = 5 * 1024 * 1024; + + ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255); + LOG.info("5MB file created: smallSeekFile.txt"); + + FSDataInputStream instream = this.fs.open(smallSeekFile); + int seekTimes = 5; + LOG.info("multiple fold position seeking test...:"); + for (int i = 0; i < seekTimes; i++) { + long pos = size / (seekTimes - i) - 1; + LOG.info("begin seeking for pos: " + pos); + instream.seek(pos); + assertTrue("expected position at:" + pos + ", but got:" + + instream.getPos(), instream.getPos() == pos); + LOG.info("completed seeking at pos: " + instream.getPos()); + } + LOG.info("random position seeking test...:"); + Random rand = new Random(); + for (int i = 0; i < seekTimes; i++) { + long pos = Math.abs(rand.nextLong()) % size; + LOG.info("begin seeking for pos: " + pos); + instream.seek(pos); + assertTrue("expected position at:" + pos + ", but got:" + + instream.getPos(), instream.getPos() == pos); + LOG.info("completed seeking at pos: " + instream.getPos()); + } + IOUtils.closeStream(instream); + } + + @Test + public void testReadFile() throws Exception { + final int bufLen = 256; + final int sizeFlag = 5; + String filename = "readTestFile_" + sizeFlag + ".txt"; + Path readTestFile = setPath("/test/" + filename); + long size = sizeFlag * 1024 * 1024; + + ContractTestUtils.generateTestFile(this.fs, readTestFile, size, 256, 255); + LOG.info(sizeFlag + "MB file created: /test/" + filename); + + FSDataInputStream instream = this.fs.open(readTestFile); + byte[] buf = new byte[bufLen]; + long bytesRead = 0; + while (bytesRead < size) { + int bytes; + if (size - bytesRead < bufLen) { + int remaining = (int)(size - bytesRead); + bytes = instream.read(buf, 0, remaining); + } else { + bytes = instream.read(buf, 0, bufLen); + } + bytesRead += bytes; + + if (bytesRead % (1024 * 1024) == 0) { + int available = instream.available(); + int remaining = (int)(size - bytesRead); + assertTrue("expected remaining:" + remaining + ", but got:" + available, + remaining == available); + LOG.info("Bytes read: " + Math.round((double)bytesRead / (1024 * 1024)) + + " MB"); + } + } + assertTrue(instream.available() == 0); + IOUtils.closeStream(instream); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSOutputStream.java new file mode 100644 index 0000000000..3951529dc6 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSOutputStream.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.io.IOException; + +/** + * Tests regular and multi-part upload functionality for AliyunOSSOutputStream. + */ +public class TestOSSOutputStream { + private FileSystem fs; + private static String testRootPath = OSSTestUtils.generateUniqueTestPath(); + + @Rule + public Timeout testTimeout = new Timeout(30 * 60 * 1000); + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024); + conf.setInt(Constants.MULTIPART_UPLOAD_SIZE_KEY, 5 * 1024 * 1024); + fs = OSSTestUtils.createTestFileSystem(conf); + } + + @After + public void tearDown() throws Exception { + if (fs != null) { + fs.delete(new Path(testRootPath), true); + } + } + + protected Path getTestPath() { + return new Path(testRootPath + "/testoss"); + } + + @Test + public void testRegularUpload() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024); + } + + @Test + public void testMultiPartUpload() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/OSSContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/OSSContract.java new file mode 100644 index 0000000000..8214b9f6be --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/OSSContract.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.aliyun.oss.OSSTestUtils; +import org.apache.hadoop.fs.contract.AbstractBondedFSContract; + +/** + * The contract of OSS: only enabled if the test bucket is provided. + */ +public class OSSContract extends AbstractBondedFSContract { + + public static final String CONTRACT_XML = "contract/oss.xml"; + public static final String CONTRACT_TEST_OSS_FS_NAME = + "fs.contract.test.fs.oss"; + + private static String testPath = OSSTestUtils.generateUniqueTestPath(); + + public OSSContract(Configuration conf) { + super(conf); + //insert the base features + addConfResource(CONTRACT_XML); + } + + @Override + public String getScheme() { + return "oss"; + } + + @Override + public Path getTestPath() { + Path path = new Path(testPath); + return path; + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractCreate.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractCreate.java new file mode 100644 index 0000000000..cc5a2d1752 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractCreate.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractCreateTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; + +/** + * OSS contract creating tests. + */ +public class TestOSSContractCreate extends AbstractContractCreateTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new OSSContract(conf); + } + + @Override + public void testOverwriteEmptyDirectory() throws Throwable { + ContractTestUtils.skip( + "blobstores can't distinguish empty directories from files"); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractDelete.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractDelete.java new file mode 100644 index 0000000000..6a1eb40e3a --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractDelete.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractDeleteTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * OSS contract deleting tests. + */ +public class TestOSSContractDelete extends AbstractContractDeleteTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new OSSContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractMkdir.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractMkdir.java new file mode 100644 index 0000000000..1dcb7f030d --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractMkdir.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractMkdirTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * OSS contract directory tests. + */ +public class TestOSSContractMkdir extends AbstractContractMkdirTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new OSSContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractOpen.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractOpen.java new file mode 100644 index 0000000000..ee0c055dec --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractOpen.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractOpenTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * OSS contract opening file tests. + */ +public class TestOSSContractOpen extends AbstractContractOpenTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new OSSContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractRename.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractRename.java new file mode 100644 index 0000000000..634fcf12fa --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractRename.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRenameTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * OSS contract renaming tests. + */ +public class TestOSSContractRename extends AbstractContractRenameTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new OSSContract(conf); + } + +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractSeek.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractSeek.java new file mode 100644 index 0000000000..40ea772c94 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractSeek.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractSeekTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * OSS contract seeking tests. + */ +public class TestOSSContractSeek extends AbstractContractSeekTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new OSSContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/resources/contract/oss.xml b/hadoop-tools/hadoop-aliyun/src/test/resources/contract/oss.xml new file mode 100644 index 0000000000..2bc34b754b --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/resources/contract/oss.xml @@ -0,0 +1,105 @@ + + + + + + fs.contract.test.random-seek-count + 10 + + + + fs.contract.is-blobstore + true + + + + fs.contract.is-case-sensitive + true + + + + fs.contract.rename-returns-false-if-source-missing + false + + + + fs.contract.rename-remove-dest-if-empty-dir + false + + + + fs.contract.supports-append + false + + + + fs.contract.supports-atomic-directory-delete + false + + + + fs.contract.supports-atomic-rename + false + + + + fs.contract.supports-block-locality + false + + + + fs.contract.supports-concat + false + + + + fs.contract.supports-seek + true + + + + fs.contract.supports-seek-on-closed-file + true + + + + fs.contract.rejects-seek-past-eof + true + + + + fs.contract.supports-strict-exceptions + true + + + + fs.contract.supports-unix-permissions + false + + + + fs.contract.rename-overwrites-dest + true + + + + fs.oss.multipart.download.size + 102400 + + diff --git a/hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml b/hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml new file mode 100644 index 0000000000..fa4118c216 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml @@ -0,0 +1,46 @@ + + + + + + + hadoop.tmp.dir + target/build/test + A base for other temporary directories. + true + + + + + hadoop.security.authentication + simple + + + + + + + + diff --git a/hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties new file mode 100644 index 0000000000..bb5cbe5ec3 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# log4j configuration used during build and unit tests + +log4j.rootLogger=INFO,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n diff --git a/hadoop-tools/hadoop-tools-dist/pom.xml b/hadoop-tools/hadoop-tools-dist/pom.xml index 899a9455d2..14fa9f05f7 100644 --- a/hadoop-tools/hadoop-tools-dist/pom.xml +++ b/hadoop-tools/hadoop-tools-dist/pom.xml @@ -100,6 +100,12 @@ compile ${project.version} + + org.apache.hadoop + hadoop-aliyun + compile + ${project.version} + org.apache.hadoop hadoop-sls diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml index db002f46fc..e7e876bc82 100644 --- a/hadoop-tools/pom.xml +++ b/hadoop-tools/pom.xml @@ -47,6 +47,7 @@ hadoop-aws hadoop-kafka hadoop-azure-datalake + hadoop-aliyun