HADOOP-17374. support listObjectV2 (#3587)
This commit is contained in:
parent
18b2cca74c
commit
a9c51ea57d
@ -107,6 +107,12 @@
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
|
@ -49,7 +49,6 @@
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import com.aliyun.oss.model.OSSObjectSummary;
|
||||
import com.aliyun.oss.model.ObjectListing;
|
||||
import com.aliyun.oss.model.ObjectMetadata;
|
||||
|
||||
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
|
||||
@ -271,14 +270,15 @@ public FileStatus getFileStatus(Path path) throws IOException {
|
||||
meta = store.getObjectMetadata(key);
|
||||
}
|
||||
if (meta == null) {
|
||||
ObjectListing listing = store.listObjects(key, 1, null, false);
|
||||
OSSListRequest listRequest = store.createListObjectsRequest(key,
|
||||
maxKeys, null, null, false);
|
||||
OSSListResult listing = store.listObjects(listRequest);
|
||||
do {
|
||||
if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) ||
|
||||
CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) {
|
||||
return new OSSFileStatus(0, true, 1, 0, 0, qualifiedPath, username);
|
||||
} else if (listing.isTruncated()) {
|
||||
listing = store.listObjects(key, 1000, listing.getNextMarker(),
|
||||
false);
|
||||
listing = store.continueListObjects(listRequest, listing);
|
||||
} else {
|
||||
throw new FileNotFoundException(
|
||||
path + ": No such file or directory!");
|
||||
@ -416,7 +416,9 @@ public FileStatus[] listStatus(Path path) throws IOException {
|
||||
LOG.debug("listStatus: doing listObjects for directory " + key);
|
||||
}
|
||||
|
||||
ObjectListing objects = store.listObjects(key, maxKeys, null, false);
|
||||
OSSListRequest listRequest = store.createListObjectsRequest(key,
|
||||
maxKeys, null, null, false);
|
||||
OSSListResult objects = store.listObjects(listRequest);
|
||||
while (true) {
|
||||
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
|
||||
String objKey = objectSummary.getKey();
|
||||
@ -456,8 +458,7 @@ public FileStatus[] listStatus(Path path) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("listStatus: list truncated - getting next batch");
|
||||
}
|
||||
String nextMarker = objects.getNextMarker();
|
||||
objects = store.listObjects(key, maxKeys, nextMarker, false);
|
||||
objects = store.continueListObjects(listRequest, objects);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@ -520,7 +521,7 @@ private RemoteIterator<LocatedFileStatus> innerList(final Path f,
|
||||
locations);
|
||||
} else {
|
||||
return store.createLocatedFileStatusIterator(key, maxKeys, this, filter,
|
||||
acceptor, recursive ? null : "/");
|
||||
acceptor, recursive);
|
||||
}
|
||||
}
|
||||
|
||||
@ -707,7 +708,9 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
|
||||
ExecutorService executorService = MoreExecutors.listeningDecorator(
|
||||
new SemaphoredDelegatingExecutor(boundedCopyThreadPool,
|
||||
maxConcurrentCopyTasksPerDir, true));
|
||||
ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
|
||||
OSSListRequest listRequest = store.createListObjectsRequest(srcKey,
|
||||
maxKeys, null, null, true);
|
||||
OSSListResult objects = store.listObjects(listRequest);
|
||||
// Copy files from src folder to dst
|
||||
int copiesToFinish = 0;
|
||||
while (true) {
|
||||
@ -729,8 +732,7 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
|
||||
}
|
||||
}
|
||||
if (objects.isTruncated()) {
|
||||
String nextMarker = objects.getNextMarker();
|
||||
objects = store.listObjects(srcKey, maxKeys, nextMarker, true);
|
||||
objects = store.continueListObjects(listRequest, objects);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
@ -36,8 +36,8 @@
|
||||
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
|
||||
import com.aliyun.oss.model.InitiateMultipartUploadResult;
|
||||
import com.aliyun.oss.model.ListObjectsRequest;
|
||||
import com.aliyun.oss.model.ListObjectsV2Request;
|
||||
import com.aliyun.oss.model.ObjectMetadata;
|
||||
import com.aliyun.oss.model.ObjectListing;
|
||||
import com.aliyun.oss.model.OSSObjectSummary;
|
||||
import com.aliyun.oss.model.PartETag;
|
||||
import com.aliyun.oss.model.PutObjectResult;
|
||||
@ -90,6 +90,7 @@ public class AliyunOSSFileSystemStore {
|
||||
private long uploadPartSize;
|
||||
private int maxKeys;
|
||||
private String serverSideEncryptionAlgorithm;
|
||||
private boolean useListV1;
|
||||
|
||||
public void initialize(URI uri, Configuration conf, String user,
|
||||
FileSystem.Statistics stat) throws IOException {
|
||||
@ -170,6 +171,12 @@ public void initialize(URI uri, Configuration conf, String user,
|
||||
}
|
||||
|
||||
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
|
||||
int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
|
||||
if (listVersion < 1 || listVersion > 2) {
|
||||
LOG.warn("Configured fs.oss.list.version {} is invalid, forcing " +
|
||||
"version 2", listVersion);
|
||||
}
|
||||
useListV1 = (listVersion == 1);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -231,14 +238,10 @@ public void deleteObjects(List<String> keysToDelete) throws IOException {
|
||||
* @throws IOException if failed to delete directory.
|
||||
*/
|
||||
public void deleteDirs(String key) throws IOException {
|
||||
key = AliyunOSSUtils.maybeAddTrailingSlash(key);
|
||||
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
|
||||
listRequest.setPrefix(key);
|
||||
listRequest.setDelimiter(null);
|
||||
listRequest.setMaxKeys(maxKeys);
|
||||
|
||||
OSSListRequest listRequest = createListObjectsRequest(key,
|
||||
maxKeys, null, null, true);
|
||||
while (true) {
|
||||
ObjectListing objects = ossClient.listObjects(listRequest);
|
||||
OSSListResult objects = listObjects(listRequest);
|
||||
statistics.incrementReadOps(1);
|
||||
List<String> keysToDelete = new ArrayList<String>();
|
||||
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
|
||||
@ -246,7 +249,12 @@ public void deleteDirs(String key) throws IOException {
|
||||
}
|
||||
deleteObjects(keysToDelete);
|
||||
if (objects.isTruncated()) {
|
||||
listRequest.setMarker(objects.getNextMarker());
|
||||
if (objects.isV1()) {
|
||||
listRequest.getV1().setMarker(objects.getV1().getNextMarker());
|
||||
} else {
|
||||
listRequest.getV2().setContinuationToken(
|
||||
objects.getV2().getNextContinuationToken());
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@ -418,25 +426,76 @@ public void uploadObject(String key, File file) throws IOException {
|
||||
/**
|
||||
* list objects.
|
||||
*
|
||||
* @param listRequest list request.
|
||||
* @return a list of matches.
|
||||
*/
|
||||
public OSSListResult listObjects(OSSListRequest listRequest) {
|
||||
OSSListResult listResult;
|
||||
if (listRequest.isV1()) {
|
||||
listResult = OSSListResult.v1(
|
||||
ossClient.listObjects(listRequest.getV1()));
|
||||
} else {
|
||||
listResult = OSSListResult.v2(
|
||||
ossClient.listObjectsV2(listRequest.getV2()));
|
||||
}
|
||||
statistics.incrementReadOps(1);
|
||||
return listResult;
|
||||
}
|
||||
|
||||
/**
|
||||
* continue to list objects depends on previous list result.
|
||||
*
|
||||
* @param listRequest list request.
|
||||
* @param preListResult previous list result.
|
||||
* @return a list of matches.
|
||||
*/
|
||||
public OSSListResult continueListObjects(OSSListRequest listRequest,
|
||||
OSSListResult preListResult) {
|
||||
OSSListResult listResult;
|
||||
if (listRequest.isV1()) {
|
||||
listRequest.getV1().setMarker(preListResult.getV1().getNextMarker());
|
||||
listResult = OSSListResult.v1(
|
||||
ossClient.listObjects(listRequest.getV1()));
|
||||
} else {
|
||||
listRequest.getV2().setContinuationToken(
|
||||
preListResult.getV2().getNextContinuationToken());
|
||||
listResult = OSSListResult.v2(
|
||||
ossClient.listObjectsV2(listRequest.getV2()));
|
||||
}
|
||||
statistics.incrementReadOps(1);
|
||||
return listResult;
|
||||
}
|
||||
|
||||
/**
|
||||
* create list objects request.
|
||||
*
|
||||
* @param prefix prefix.
|
||||
* @param maxListingLength max no. of entries
|
||||
* @param marker last key in any previous search.
|
||||
* @param continuationToken list from a specific point.
|
||||
* @param recursive whether to list directory recursively.
|
||||
* @return a list of matches.
|
||||
*/
|
||||
public ObjectListing listObjects(String prefix, int maxListingLength,
|
||||
String marker, boolean recursive) {
|
||||
protected OSSListRequest createListObjectsRequest(String prefix,
|
||||
int maxListingLength, String marker,
|
||||
String continuationToken, boolean recursive) {
|
||||
String delimiter = recursive ? null : "/";
|
||||
prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
|
||||
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
|
||||
listRequest.setPrefix(prefix);
|
||||
listRequest.setDelimiter(delimiter);
|
||||
listRequest.setMaxKeys(maxListingLength);
|
||||
listRequest.setMarker(marker);
|
||||
|
||||
ObjectListing listing = ossClient.listObjects(listRequest);
|
||||
statistics.incrementReadOps(1);
|
||||
return listing;
|
||||
if (useListV1) {
|
||||
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
|
||||
listRequest.setPrefix(prefix);
|
||||
listRequest.setDelimiter(delimiter);
|
||||
listRequest.setMaxKeys(maxListingLength);
|
||||
listRequest.setMarker(marker);
|
||||
return OSSListRequest.v1(listRequest);
|
||||
} else {
|
||||
ListObjectsV2Request listV2Request = new ListObjectsV2Request(bucketName);
|
||||
listV2Request.setPrefix(prefix);
|
||||
listV2Request.setDelimiter(delimiter);
|
||||
listV2Request.setMaxKeys(maxListingLength);
|
||||
listV2Request.setContinuationToken(continuationToken);
|
||||
return OSSListRequest.v2(listV2Request);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -478,21 +537,7 @@ public void close() {
|
||||
* @throws IOException if failed to clean up objects.
|
||||
*/
|
||||
public void purge(String prefix) throws IOException {
|
||||
String key;
|
||||
try {
|
||||
ObjectListing objects = listObjects(prefix, maxKeys, null, true);
|
||||
for (OSSObjectSummary object : objects.getObjectSummaries()) {
|
||||
key = object.getKey();
|
||||
ossClient.deleteObject(bucketName, key);
|
||||
statistics.incrementWriteOps(1);
|
||||
}
|
||||
|
||||
for (String dir: objects.getCommonPrefixes()) {
|
||||
deleteDirs(dir);
|
||||
}
|
||||
} catch (OSSException | ClientException e) {
|
||||
LOG.error("Failed to purge " + prefix);
|
||||
}
|
||||
deleteDirs(prefix);
|
||||
}
|
||||
|
||||
public RemoteIterator<LocatedFileStatus> singleStatusRemoteIterator(
|
||||
@ -520,12 +565,12 @@ public LocatedFileStatus next() throws IOException {
|
||||
|
||||
public RemoteIterator<LocatedFileStatus> createLocatedFileStatusIterator(
|
||||
final String prefix, final int maxListingLength, FileSystem fs,
|
||||
PathFilter filter, FileStatusAcceptor acceptor, String delimiter) {
|
||||
PathFilter filter, FileStatusAcceptor acceptor, boolean recursive) {
|
||||
return new RemoteIterator<LocatedFileStatus>() {
|
||||
private String nextMarker = null;
|
||||
private boolean firstListing = true;
|
||||
private boolean meetEnd = false;
|
||||
private ListIterator<FileStatus> batchIterator;
|
||||
private OSSListRequest listRequest = null;
|
||||
|
||||
@Override
|
||||
public boolean hasNext() throws IOException {
|
||||
@ -550,15 +595,24 @@ public LocatedFileStatus next() throws IOException {
|
||||
}
|
||||
|
||||
private boolean requestNextBatch() {
|
||||
while (!meetEnd) {
|
||||
if (continueListStatus()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean continueListStatus() {
|
||||
if (meetEnd) {
|
||||
return false;
|
||||
}
|
||||
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
|
||||
listRequest.setPrefix(AliyunOSSUtils.maybeAddTrailingSlash(prefix));
|
||||
listRequest.setMaxKeys(maxListingLength);
|
||||
listRequest.setMarker(nextMarker);
|
||||
listRequest.setDelimiter(delimiter);
|
||||
ObjectListing listing = ossClient.listObjects(listRequest);
|
||||
if (listRequest == null) {
|
||||
listRequest = createListObjectsRequest(prefix,
|
||||
maxListingLength, null, null, recursive);
|
||||
}
|
||||
OSSListResult listing = listObjects(listRequest);
|
||||
List<FileStatus> stats = new ArrayList<>(
|
||||
listing.getObjectSummaries().size() +
|
||||
listing.getCommonPrefixes().size());
|
||||
@ -584,7 +638,12 @@ private boolean requestNextBatch() {
|
||||
|
||||
batchIterator = stats.listIterator();
|
||||
if (listing.isTruncated()) {
|
||||
nextMarker = listing.getNextMarker();
|
||||
if (listing.isV1()) {
|
||||
listRequest.getV1().setMarker(listing.getV1().getNextMarker());
|
||||
} else {
|
||||
listRequest.getV2().setContinuationToken(
|
||||
listing.getV2().getNextContinuationToken());
|
||||
}
|
||||
} else {
|
||||
meetEnd = true;
|
||||
}
|
||||
|
@ -154,4 +154,8 @@ private Constants() {
|
||||
public static final String UPLOAD_ACTIVE_BLOCKS_KEY =
|
||||
"fs.oss.upload.active.blocks";
|
||||
public static final int UPLOAD_ACTIVE_BLOCKS_DEFAULT = 4;
|
||||
|
||||
public static final String LIST_VERSION = "fs.oss.list.version";
|
||||
|
||||
public static final int DEFAULT_LIST_VERSION = 2;
|
||||
}
|
||||
|
@ -0,0 +1,88 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.aliyun.oss;
|
||||
|
||||
import com.aliyun.oss.model.ListObjectsRequest;
|
||||
import com.aliyun.oss.model.ListObjectsV2Request;
|
||||
|
||||
/**
|
||||
* API version-independent container for OSS List requests.
|
||||
*/
|
||||
public class OSSListRequest {
|
||||
/**
|
||||
* Format for the toString() method: {@value}.
|
||||
*/
|
||||
private static final String DESCRIPTION
|
||||
= "List %s:/%s delimiter=%s keys=%d";
|
||||
|
||||
private final ListObjectsRequest v1Request;
|
||||
private final ListObjectsV2Request v2Request;
|
||||
|
||||
private OSSListRequest(ListObjectsRequest v1, ListObjectsV2Request v2) {
|
||||
v1Request = v1;
|
||||
v2Request = v2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Restricted constructors to ensure v1 or v2, not both.
|
||||
* @param request v1 request
|
||||
* @return new list request container
|
||||
*/
|
||||
public static OSSListRequest v1(ListObjectsRequest request) {
|
||||
return new OSSListRequest(request, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Restricted constructors to ensure v1 or v2, not both.
|
||||
* @param request v2 request
|
||||
* @return new list request container
|
||||
*/
|
||||
public static OSSListRequest v2(ListObjectsV2Request request) {
|
||||
return new OSSListRequest(null, request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this a v1 API request or v2?
|
||||
* @return true if v1, false if v2
|
||||
*/
|
||||
public boolean isV1() {
|
||||
return v1Request != null;
|
||||
}
|
||||
|
||||
public ListObjectsRequest getV1() {
|
||||
return v1Request;
|
||||
}
|
||||
|
||||
public ListObjectsV2Request getV2() {
|
||||
return v2Request;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (isV1()) {
|
||||
return String.format(DESCRIPTION,
|
||||
v1Request.getBucketName(), v1Request.getPrefix(),
|
||||
v1Request.getDelimiter(), v1Request.getMaxKeys());
|
||||
} else {
|
||||
return String.format(DESCRIPTION,
|
||||
v2Request.getBucketName(), v2Request.getPrefix(),
|
||||
v2Request.getDelimiter(), v2Request.getMaxKeys());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,115 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.aliyun.oss;
|
||||
|
||||
import com.aliyun.oss.model.ListObjectsV2Result;
|
||||
import com.aliyun.oss.model.OSSObjectSummary;
|
||||
import com.aliyun.oss.model.ObjectListing;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* API version-independent container for OSS List responses.
|
||||
*/
|
||||
public final class OSSListResult {
|
||||
private ObjectListing v1Result;
|
||||
private ListObjectsV2Result v2Result;
|
||||
|
||||
protected OSSListResult(ObjectListing v1, ListObjectsV2Result v2) {
|
||||
v1Result = v1;
|
||||
v2Result = v2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Restricted constructors to ensure v1 or v2, not both.
|
||||
* @param result v1 result
|
||||
* @return new list result container
|
||||
*/
|
||||
public static OSSListResult v1(ObjectListing result) {
|
||||
return new OSSListResult(result, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Restricted constructors to ensure v1 or v2, not both.
|
||||
* @param result v2 result
|
||||
* @return new list result container
|
||||
*/
|
||||
public static OSSListResult v2(ListObjectsV2Result result) {
|
||||
return new OSSListResult(null, result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this a v1 API result or v2?
|
||||
* @return true if v1, false if v2
|
||||
*/
|
||||
public boolean isV1() {
|
||||
return v1Result != null;
|
||||
}
|
||||
|
||||
public ObjectListing getV1() {
|
||||
return v1Result;
|
||||
}
|
||||
|
||||
public ListObjectsV2Result getV2() {
|
||||
return v2Result;
|
||||
}
|
||||
|
||||
public List<OSSObjectSummary> getObjectSummaries() {
|
||||
if (isV1()) {
|
||||
return v1Result.getObjectSummaries();
|
||||
} else {
|
||||
return v2Result.getObjectSummaries();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isTruncated() {
|
||||
if (isV1()) {
|
||||
return v1Result.isTruncated();
|
||||
} else {
|
||||
return v2Result.isTruncated();
|
||||
}
|
||||
}
|
||||
|
||||
public List<String> getCommonPrefixes() {
|
||||
if (isV1()) {
|
||||
return v1Result.getCommonPrefixes();
|
||||
} else {
|
||||
return v2Result.getCommonPrefixes();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dump the result at debug level.
|
||||
* @param log log to use
|
||||
*/
|
||||
public void logAtDebug(Logger log) {
|
||||
Collection<String> prefixes = getCommonPrefixes();
|
||||
Collection<OSSObjectSummary> summaries = getObjectSummaries();
|
||||
log.debug("Prefix count = {}; object count={}",
|
||||
prefixes.size(), summaries.size());
|
||||
for (OSSObjectSummary summary : summaries) {
|
||||
log.debug("Summary: {} {}", summary.getKey(), summary.getSize());
|
||||
}
|
||||
for (String prefix : prefixes) {
|
||||
log.debug("Prefix: {}", prefix);
|
||||
}
|
||||
}
|
||||
}
|
@ -243,6 +243,14 @@ please raise your issues with them.
|
||||
<description>Size in bytes in each request from ALiyun OSS.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.oss.list.version</name>
|
||||
<value>2</value>
|
||||
<description>Select which version of the OSS SDK's List Objects API to use.
|
||||
Currently support 2(default) and 1(older API).
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.oss.buffer.dir</name>
|
||||
<description>Comma separated list of directories to buffer OSS data before uploading to Aliyun OSS</description>
|
||||
|
@ -83,4 +83,14 @@ public static String generateUniqueTestPath() {
|
||||
return testUniqueForkId == null ? "/test" :
|
||||
"/" + testUniqueForkId + "/test";
|
||||
}
|
||||
|
||||
/**
|
||||
* Turn off FS Caching: use if a filesystem with different options from
|
||||
* the default is required.
|
||||
* @param conf configuration to patch
|
||||
*/
|
||||
public static void disableFilesystemCaching(Configuration conf) {
|
||||
conf.setBoolean(TestAliyunOSSFileSystemContract.FS_OSS_IMPL_DISABLE_CACHE,
|
||||
true);
|
||||
}
|
||||
}
|
||||
|
@ -43,6 +43,7 @@
|
||||
*/
|
||||
public class TestAliyunOSSBlockOutputStream {
|
||||
private FileSystem fs;
|
||||
private static final int PART_SIZE = 1024 * 1024;
|
||||
private static String testRootPath =
|
||||
AliyunOSSTestUtils.generateUniqueTestPath();
|
||||
|
||||
@ -52,7 +53,7 @@ public class TestAliyunOSSBlockOutputStream {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
|
||||
conf.setInt(MULTIPART_UPLOAD_PART_SIZE_KEY, PART_SIZE);
|
||||
conf.setInt(IO_CHUNK_BUFFER_SIZE,
|
||||
conf.getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
|
||||
conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20);
|
||||
@ -155,10 +156,8 @@ public void testMultiPartUploadConcurrent() throws IOException {
|
||||
|
||||
@Test
|
||||
public void testHugeUpload() throws IOException {
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
|
||||
MULTIPART_UPLOAD_PART_SIZE_DEFAULT - 1);
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
|
||||
MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), PART_SIZE - 1);
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), PART_SIZE);
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
|
||||
MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
|
||||
bufferDirShouldEmpty();
|
||||
|
@ -47,6 +47,8 @@
|
||||
public class TestAliyunOSSFileSystemContract
|
||||
extends FileSystemContractBaseTest {
|
||||
public static final String TEST_FS_OSS_NAME = "test.fs.oss.name";
|
||||
public static final String FS_OSS_IMPL_DISABLE_CACHE
|
||||
= "fs.oss.impl.disable.cache";
|
||||
private static Path testRootPath =
|
||||
new Path(AliyunOSSTestUtils.generateUniqueTestPath());
|
||||
|
||||
@ -413,7 +415,7 @@ private void testRenameDir(boolean changing, boolean result, boolean empty)
|
||||
Thread thread = new Thread(task);
|
||||
thread.start();
|
||||
while (!task.isRunning()) {
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(1);
|
||||
}
|
||||
|
||||
if (changing) {
|
||||
@ -421,7 +423,11 @@ private void testRenameDir(boolean changing, boolean result, boolean empty)
|
||||
}
|
||||
|
||||
thread.join();
|
||||
assertEquals(result, task.isSucceed());
|
||||
if (changing) {
|
||||
assertTrue(task.isSucceed() || fs.exists(this.path("a")));
|
||||
} else {
|
||||
assertEquals(result, task.isSucceed());
|
||||
}
|
||||
}
|
||||
|
||||
class TestRenameTask implements Runnable {
|
||||
@ -451,6 +457,8 @@ public void run() {
|
||||
running = true;
|
||||
result = fs.rename(srcPath, dstPath);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
this.result = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.fs.aliyun.oss;
|
||||
|
||||
import com.aliyun.oss.model.ObjectMetadata;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.After;
|
||||
@ -89,13 +90,17 @@ protected void writeRenameReadCompare(Path path, long len)
|
||||
|
||||
assertTrue("Exists", fs.exists(path));
|
||||
|
||||
ObjectMetadata srcMeta = fs.getStore().getObjectMetadata(
|
||||
path.toUri().getPath().substring(1));
|
||||
|
||||
Path copyPath = path.suffix(".copy");
|
||||
long start = System.currentTimeMillis();
|
||||
fs.rename(path, copyPath);
|
||||
|
||||
assertTrue("Copy exists", fs.exists(copyPath));
|
||||
// should less than 1 second
|
||||
assertTrue(System.currentTimeMillis() - start < 1000);
|
||||
// file type should not change
|
||||
ObjectMetadata dstMeta = fs.getStore().getObjectMetadata(
|
||||
copyPath.toUri().getPath().substring(1));
|
||||
assertEquals(srcMeta.getObjectType(), dstMeta.getObjectType());
|
||||
// Download file from Aliyun OSS and compare the digest against the original
|
||||
MessageDigest digest2 = MessageDigest.getInstance("MD5");
|
||||
InputStream in = new BufferedInputStream(
|
||||
@ -120,6 +125,7 @@ public void testSmallUpload() throws IOException, NoSuchAlgorithmException {
|
||||
public void testLargeUpload()
|
||||
throws IOException, NoSuchAlgorithmException {
|
||||
// Multipart upload, shallow copy
|
||||
writeRenameReadCompare(new Path("/test/xlarge"), 2147483648L); // 2GB
|
||||
writeRenameReadCompare(new Path("/test/xlarge"),
|
||||
Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,53 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.aliyun.oss.contract;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
|
||||
import org.apache.hadoop.fs.aliyun.oss.Constants;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
|
||||
/**
|
||||
* Test getFileStatus and related listing operations,
|
||||
* using the v1 List Objects API.
|
||||
*/
|
||||
public class TestAliyunOSSContractGetFileStatusV1List
|
||||
extends AbstractContractGetFileStatusTest {
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new AliyunOSSContract(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void teardown() throws Exception {
|
||||
getLogger().info("FS details {}", getFileSystem());
|
||||
super.teardown();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
Configuration conf = super.createConfiguration();
|
||||
AliyunOSSTestUtils.disableFilesystemCaching(conf);
|
||||
conf.setInt(Constants.MAX_PAGING_KEYS_KEY, 2);
|
||||
// Use v1 List Objects API
|
||||
conf.setInt(Constants.LIST_VERSION, 1);
|
||||
return conf;
|
||||
}
|
||||
}
|
@ -95,7 +95,7 @@
|
||||
|
||||
<property>
|
||||
<name>fs.contract.rename-overwrites-dest</name>
|
||||
<value>true</value>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
|
Loading…
Reference in New Issue
Block a user