HADOOP-13591. Unit test failure in TestOSSContractGetFileStatus and TestOSSContractRootDir. Contributed by Genmao Yu

This commit is contained in:
Kai Zheng 2016-09-20 15:12:02 +08:00
parent 9cd4760257
commit 08b37603d9
5 changed files with 89 additions and 71 deletions

View File

@ -24,6 +24,8 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -124,9 +126,6 @@ private boolean innerDelete(FileStatus status, boolean recursive)
Path f = status.getPath(); Path f = status.getPath();
String key = pathToKey(f); String key = pathToKey(f);
if (status.isDirectory()) { if (status.isDirectory()) {
if (!key.endsWith("/")) {
key += "/";
}
if (!recursive) { if (!recursive) {
FileStatus[] statuses = listStatus(status.getPath()); FileStatus[] statuses = listStatus(status.getPath());
// Check whether it is an empty directory or not // Check whether it is an empty directory or not
@ -135,6 +134,7 @@ private boolean innerDelete(FileStatus status, boolean recursive)
": It is not empty!"); ": It is not empty!");
} else { } else {
// Delete empty directory without '-r' // Delete empty directory without '-r'
key = AliyunOSSUtils.maybeAddTrailingSlash(key);
store.deleteObject(key); store.deleteObject(key);
} }
} else { } else {
@ -149,15 +149,9 @@ private boolean innerDelete(FileStatus status, boolean recursive)
} }
private void createFakeDirectoryIfNecessary(Path f) throws IOException { private void createFakeDirectoryIfNecessary(Path f) throws IOException {
try { String key = pathToKey(f);
Path pPath = f.getParent(); if (StringUtils.isNotEmpty(key) && !exists(f)) {
FileStatus pStatus = getFileStatus(pPath); LOG.debug("Creating new fake directory at {}", f);
if (pStatus.isFile()) {
throw new IOException("Path " + pPath +
" is assumed to be a directory!");
}
} catch (FileNotFoundException fnfe) {
// Make sure the parent directory exists
mkdir(pathToKey(f.getParent())); mkdir(pathToKey(f.getParent()));
} }
} }
@ -175,14 +169,14 @@ public FileStatus getFileStatus(Path path) throws IOException {
ObjectMetadata meta = store.getObjectMetadata(key); ObjectMetadata meta = store.getObjectMetadata(key);
// If key not found and key does not end with "/" // If key not found and key does not end with "/"
if (meta == null && !key.endsWith("/")) { if (meta == null && !key.endsWith("/")) {
// Case: dir + "/" // In case of 'dir + "/"'
key += "/"; key += "/";
meta = store.getObjectMetadata(key); meta = store.getObjectMetadata(key);
} }
if (meta == null) { if (meta == null) {
ObjectListing listing = store.listObjects(key, 1, "/", null); ObjectListing listing = store.listObjects(key, 1, null, false);
if (!listing.getObjectSummaries().isEmpty() || if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) ||
!listing.getCommonPrefixes().isEmpty()) { CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) {
return new FileStatus(0, true, 1, 0, 0, qualifiedPath); return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
} else { } else {
throw new FileNotFoundException(path + ": No such file or directory!"); throw new FileNotFoundException(path + ": No such file or directory!");
@ -251,7 +245,7 @@ public void initialize(URI name, Configuration conf) throws IOException {
*/ */
private boolean objectRepresentsDirectory(final String name, private boolean objectRepresentsDirectory(final String name,
final long size) { final long size) {
return !name.isEmpty() && name.endsWith("/") && size == 0L; return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L;
} }
/** /**
@ -265,10 +259,6 @@ private String pathToKey(Path path) {
path = new Path(workingDir, path); path = new Path(workingDir, path);
} }
if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
return "";
}
return path.toUri().getPath().substring(1); return path.toUri().getPath().substring(1);
} }
@ -287,26 +277,23 @@ public FileStatus[] listStatus(Path path) throws IOException {
final FileStatus fileStatus = getFileStatus(path); final FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) { if (fileStatus.isDirectory()) {
if (!key.endsWith("/")) {
key = key + "/";
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: doing listObjects for directory " + key); LOG.debug("listStatus: doing listObjects for directory " + key);
} }
ObjectListing objects = store.listObjects(key, maxKeys, "/", null); ObjectListing objects = store.listObjects(key, maxKeys, null, false);
while (true) { while (true) {
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
Path keyPath = keyToPath(objectSummary.getKey()) String objKey = objectSummary.getKey();
.makeQualified(uri, workingDir); if (objKey.equals(key + "/")) {
if (keyPath.equals(path)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring: " + keyPath); LOG.debug("Ignoring: " + objKey);
} }
continue; continue;
} else { } else {
Path keyPath = keyToPath(objectSummary.getKey())
.makeQualified(uri, workingDir);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Adding: fi: " + keyPath); LOG.debug("Adding: fi: " + keyPath);
} }
@ -317,10 +304,13 @@ public FileStatus[] listStatus(Path path) throws IOException {
} }
for (String prefix : objects.getCommonPrefixes()) { for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); if (prefix.equals(key + "/")) {
if (keyPath.equals(path)) { if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring: " + prefix);
}
continue; continue;
} else { } else {
Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd: " + keyPath); LOG.debug("Adding: rd: " + keyPath);
} }
@ -332,8 +322,8 @@ public FileStatus[] listStatus(Path path) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: list truncated - getting next batch"); LOG.debug("listStatus: list truncated - getting next batch");
} }
objects = store.listObjects(key, maxKeys, "/", String nextMarker = objects.getNextMarker();
objects.getNextMarker()); objects = store.listObjects(key, maxKeys, nextMarker, false);
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
} else { } else {
break; break;
@ -358,10 +348,12 @@ public FileStatus[] listStatus(Path path) throws IOException {
*/ */
private boolean mkdir(final String key) throws IOException { private boolean mkdir(final String key) throws IOException {
String dirName = key; String dirName = key;
if (StringUtils.isNotEmpty(key)) {
if (!key.endsWith("/")) { if (!key.endsWith("/")) {
dirName += "/"; dirName += "/";
} }
store.storeEmptyFile(dirName); store.storeEmptyFile(dirName);
}
return true; return true;
} }
@ -506,16 +498,11 @@ private boolean copyFile(Path srcPath, Path dstPath) {
* @param dstPath destination path. * @param dstPath destination path.
* @return true if directory is successfully copied. * @return true if directory is successfully copied.
*/ */
private boolean copyDirectory(Path srcPath, Path dstPath) { private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
String srcKey = pathToKey(srcPath); String srcKey = AliyunOSSUtils
String dstKey = pathToKey(dstPath); .maybeAddTrailingSlash(pathToKey(srcPath));
String dstKey = AliyunOSSUtils
if (!srcKey.endsWith("/")) { .maybeAddTrailingSlash(pathToKey(dstPath));
srcKey = srcKey + "/";
}
if (!dstKey.endsWith("/")) {
dstKey = dstKey + "/";
}
if (dstKey.startsWith(srcKey)) { if (dstKey.startsWith(srcKey)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -524,7 +511,8 @@ private boolean copyDirectory(Path srcPath, Path dstPath) {
return false; return false;
} }
ObjectListing objects = store.listObjects(srcKey, maxKeys, null, null); store.storeEmptyFile(dstKey);
ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
// Copy files from src folder to dst // Copy files from src folder to dst
while (true) { while (true) {
@ -534,8 +522,8 @@ private boolean copyDirectory(Path srcPath, Path dstPath) {
store.copyFile(objectSummary.getKey(), newKey); store.copyFile(objectSummary.getKey(), newKey);
} }
if (objects.isTruncated()) { if (objects.isTruncated()) {
objects = store.listObjects(srcKey, maxKeys, null, String nextMarker = objects.getNextMarker();
objects.getNextMarker()); objects = store.listObjects(srcKey, maxKeys, nextMarker, true);
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
} else { } else {
break; break;

View File

@ -42,6 +42,8 @@
import com.aliyun.oss.model.UploadPartCopyResult; import com.aliyun.oss.model.UploadPartCopyResult;
import com.aliyun.oss.model.UploadPartRequest; import com.aliyun.oss.model.UploadPartRequest;
import com.aliyun.oss.model.UploadPartResult; import com.aliyun.oss.model.UploadPartResult;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -89,7 +91,7 @@ public void initialize(URI uri, Configuration conf,
String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, ""); String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, "");
int proxyPort = conf.getInt(PROXY_PORT_KEY, -1); int proxyPort = conf.getInt(PROXY_PORT_KEY, -1);
if (!proxyHost.isEmpty()) { if (StringUtils.isNotEmpty(proxyHost)) {
clientConf.setProxyHost(proxyHost); clientConf.setProxyHost(proxyHost);
if (proxyPort >= 0) { if (proxyPort >= 0) {
clientConf.setProxyPort(proxyPort); clientConf.setProxyPort(proxyPort);
@ -123,7 +125,7 @@ public void initialize(URI uri, Configuration conf,
String endPoint = conf.getTrimmed(ENDPOINT_KEY, ""); String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
CredentialsProvider provider = CredentialsProvider provider =
AliyunOSSUtils.getCredentialsProvider(uri, conf); AliyunOSSUtils.getCredentialsProvider(conf);
ossClient = new OSSClient(endPoint, provider, clientConf); ossClient = new OSSClient(endPoint, provider, clientConf);
uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
MULTIPART_UPLOAD_SIZE_DEFAULT); MULTIPART_UPLOAD_SIZE_DEFAULT);
@ -153,7 +155,7 @@ public void initialize(URI uri, Configuration conf,
} }
String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT); String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
if (!cannedACLName.isEmpty()) { if (StringUtils.isNotEmpty(cannedACLName)) {
CannedAccessControlList cannedACL = CannedAccessControlList cannedACL =
CannedAccessControlList.valueOf(cannedACLName); CannedAccessControlList.valueOf(cannedACLName);
ossClient.setBucketAcl(bucketName, cannedACL); ossClient.setBucketAcl(bucketName, cannedACL);
@ -179,12 +181,14 @@ public void deleteObject(String key) {
* @param keysToDelete collection of keys to delete. * @param keysToDelete collection of keys to delete.
*/ */
public void deleteObjects(List<String> keysToDelete) { public void deleteObjects(List<String> keysToDelete) {
if (CollectionUtils.isNotEmpty(keysToDelete)) {
DeleteObjectsRequest deleteRequest = DeleteObjectsRequest deleteRequest =
new DeleteObjectsRequest(bucketName); new DeleteObjectsRequest(bucketName);
deleteRequest.setKeys(keysToDelete); deleteRequest.setKeys(keysToDelete);
ossClient.deleteObjects(deleteRequest); ossClient.deleteObjects(deleteRequest);
statistics.incrementWriteOps(keysToDelete.size()); statistics.incrementWriteOps(keysToDelete.size());
} }
}
/** /**
* Delete a directory from Aliyun OSS. * Delete a directory from Aliyun OSS.
@ -192,8 +196,10 @@ public void deleteObjects(List<String> keysToDelete) {
* @param key directory key to delete. * @param key directory key to delete.
*/ */
public void deleteDirs(String key) { public void deleteDirs(String key) {
key = AliyunOSSUtils.maybeAddTrailingSlash(key);
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(key); listRequest.setPrefix(key);
listRequest.setDelimiter(null);
listRequest.setMaxKeys(maxKeys); listRequest.setMaxKeys(maxKeys);
while (true) { while (true) {
@ -299,7 +305,7 @@ private boolean multipartCopy(String srcKey, long contentLength,
InitiateMultipartUploadRequest initiateMultipartUploadRequest = InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, dstKey); new InitiateMultipartUploadRequest(bucketName, dstKey);
ObjectMetadata meta = new ObjectMetadata(); ObjectMetadata meta = new ObjectMetadata();
if (!serverSideEncryptionAlgorithm.isEmpty()) { if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm); meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
} }
initiateMultipartUploadRequest.setObjectMetadata(meta); initiateMultipartUploadRequest.setObjectMetadata(meta);
@ -353,7 +359,7 @@ public void uploadObject(String key, File file) throws IOException {
FileInputStream fis = new FileInputStream(object); FileInputStream fis = new FileInputStream(object);
ObjectMetadata meta = new ObjectMetadata(); ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(object.length()); meta.setContentLength(object.length());
if (!serverSideEncryptionAlgorithm.isEmpty()) { if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm); meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
} }
try { try {
@ -384,7 +390,7 @@ public void multipartUploadObject(String key, File file) throws IOException {
InitiateMultipartUploadRequest initiateMultipartUploadRequest = InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, key); new InitiateMultipartUploadRequest(bucketName, key);
ObjectMetadata meta = new ObjectMetadata(); ObjectMetadata meta = new ObjectMetadata();
if (!serverSideEncryptionAlgorithm.isEmpty()) { if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm); meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
} }
initiateMultipartUploadRequest.setObjectMetadata(meta); initiateMultipartUploadRequest.setObjectMetadata(meta);
@ -435,12 +441,14 @@ public void multipartUploadObject(String key, File file) throws IOException {
* *
* @param prefix prefix. * @param prefix prefix.
* @param maxListingLength max no. of entries * @param maxListingLength max no. of entries
* @param delimiter delimiter.
* @param marker last key in any previous search. * @param marker last key in any previous search.
* @param recursive whether to list directory recursively.
* @return a list of matches. * @return a list of matches.
*/ */
public ObjectListing listObjects(String prefix, int maxListingLength, public ObjectListing listObjects(String prefix, int maxListingLength,
String delimiter, String marker) { String marker, boolean recursive) {
String delimiter = recursive ? null : "/";
prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(prefix); listRequest.setPrefix(prefix);
listRequest.setDelimiter(delimiter); listRequest.setDelimiter(delimiter);
@ -488,7 +496,7 @@ public void close() {
public void purge(String prefix) { public void purge(String prefix) {
String key; String key;
try { try {
ObjectListing objects = listObjects(prefix, maxKeys, null, null); ObjectListing objects = listObjects(prefix, maxKeys, null, true);
for (OSSObjectSummary object : objects.getObjectSummaries()) { for (OSSObjectSummary object : objects.getObjectSummaries()) {
key = object.getKey(); key = object.getKey();
ossClient.deleteObject(bucketName, key); ossClient.deleteObject(bucketName, key);

View File

@ -20,7 +20,6 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI;
import com.aliyun.oss.common.auth.CredentialsProvider; import com.aliyun.oss.common.auth.CredentialsProvider;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -106,16 +105,13 @@ public static long calculatePartSize(long contentLength, long minPartSize) {
* Create credential provider specified by configuration, or create default * Create credential provider specified by configuration, or create default
* credential provider if not specified. * credential provider if not specified.
* *
* @param name the uri of the file system
* @param conf configuration * @param conf configuration
* @return a credential provider * @return a credential provider
* @throws IOException on any problem. Class construction issues may be * @throws IOException on any problem. Class construction issues may be
* nested inside the IOE. * nested inside the IOE.
*/ */
public static CredentialsProvider getCredentialsProvider(URI name, public static CredentialsProvider getCredentialsProvider(Configuration conf)
Configuration conf) throws IOException { throws IOException {
URI uri = java.net.URI.create(
name.getScheme() + "://" + name.getAuthority());
CredentialsProvider credentials; CredentialsProvider credentials;
String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY); String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY);
@ -152,4 +148,20 @@ public static CredentialsProvider getCredentialsProvider(URI name,
return credentials; return credentials;
} }
/**
* Turns a path (relative or otherwise) into an OSS key, adding a trailing
* "/" if the path is not the root <i>and</i> does not already have a "/"
* at the end.
*
* @param key OSS key or ""
* @return the with a trailing "/", or, if it is the root key, "".
*/
public static String maybeAddTrailingSlash(String key) {
if (StringUtils.isNotEmpty(key) && !key.endsWith("/")) {
return key + '/';
} else {
return key;
}
}
} }

View File

@ -48,7 +48,7 @@ public static AliyunOSSFileSystem createTestFileSystem(Configuration conf)
String fsname = conf.getTrimmed( String fsname = conf.getTrimmed(
TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME, ""); TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME, "");
boolean liveTest = !StringUtils.isEmpty(fsname); boolean liveTest = StringUtils.isNotEmpty(fsname);
URI testURI = null; URI testURI = null;
if (liveTest) { if (liveTest) {
testURI = URI.create(fsname); testURI = URI.create(fsname);

View File

@ -98,6 +98,16 @@
<value>true</value> <value>true</value>
</property> </property>
<property>
<name>fs.contract.test.root-tests-enabled</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-getfilestatus</name>
<value>true</value>
</property>
<property> <property>
<name>fs.oss.multipart.download.size</name> <name>fs.oss.multipart.download.size</name>
<value>102400</value> <value>102400</value>