HDDS-956. MultipartUpload: List Parts for a Multipart upload key. Contributed by Bharat Viswanadham.

This commit is contained in:
Márton Elek 2019-01-31 14:51:31 +01:00
parent 71c49fa60f
commit 033d97abac
21 changed files with 853 additions and 45 deletions

View File

@ -245,6 +245,10 @@ private OzoneConsts() {
public static final String REPLICATION_FACTOR = "replicationFactor";
public static final String KEY_LOCATION_INFO = "keyLocationInfo";
public static final String MULTIPART_LIST = "multipartList";
public static final String UPLOAD_ID = "uploadID";
public static final String PART_NUMBER_MARKER = "partNumberMarker";
public static final String MAX_PARTS = "maxParts";
// For OM metrics saving to a file

View File

@ -406,6 +406,28 @@ public void abortMultipartUpload(String keyName, String uploadID) throws
proxy.abortMultipartUpload(volumeName, name, keyName, uploadID);
}
/**
* Returns list of parts of a multipart upload key.
* @param keyName
* @param uploadID
* @param partNumberMarker
* @param maxParts
* @return OzoneMultipartUploadPartListParts
*/
public OzoneMultipartUploadPartListParts listParts(String keyName,
String uploadID, int partNumberMarker, int maxParts) throws IOException {
// As at most we can have 10000 parts for a key, not using iterator. If
// needed, it can be done later. So, if we send 10000 as max parts at
// most in a single rpc call, we return 0.6 mb, by assuming each part
// size as 60 bytes (ignored the replication type size during calculation)
return proxy.listParts(volumeName, name, keyName, uploadID,
partNumberMarker, maxParts);
}
/**
* An Iterator to iterate over {@link OzoneKey} list.
*/

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client;
import org.apache.hadoop.hdds.client.ReplicationType;
import java.util.ArrayList;
import java.util.List;
/**
* Class that represents Multipart upload List parts response.
*/
public class OzoneMultipartUploadPartListParts {
private ReplicationType replicationType;
//When a list is truncated, this element specifies the last part in the list,
// as well as the value to use for the part-number-marker request parameter
// in a subsequent request.
private int nextPartNumberMarker;
// Indicates whether the returned list of parts is truncated. A true value
// indicates that the list was truncated.
// A list can be truncated if the number of parts exceeds the limit
// returned in the MaxParts element.
private boolean truncated;
private List<PartInfo> partInfoList = new ArrayList<>();
public OzoneMultipartUploadPartListParts(ReplicationType type,
int nextMarker, boolean truncate) {
this.replicationType = type;
this.nextPartNumberMarker = nextMarker;
this.truncated = truncate;
}
public void addPart(PartInfo partInfo) {
this.partInfoList.add(partInfo);
}
public ReplicationType getReplicationType() {
return replicationType;
}
public int getNextPartNumberMarker() {
return nextPartNumberMarker;
}
public boolean isTruncated() {
return truncated;
}
public List<PartInfo> getPartInfoList() {
return partInfoList;
}
/**
* Class that represents each Part information of a multipart upload part.
*/
public static class PartInfo {
private int partNumber;
private String partName;
private long modificationTime;
private long size;
public PartInfo(int number, String name, long time, long size) {
this.partNumber = number;
this.partName = name;
this.modificationTime = time;
this.size = size;
}
public int getPartNumber() {
return partNumber;
}
public String getPartName() {
return partName;
}
public long getModificationTime() {
return modificationTime;
}
public long getSize() {
return size;
}
}
}

View File

@ -454,6 +454,22 @@ OmMultipartUploadCompleteInfo completeMultipartUpload(String volumeName,
void abortMultipartUpload(String volumeName,
String bucketName, String keyName, String uploadID) throws IOException;
/**
* Returns list of parts of a multipart upload key.
* @param volumeName
* @param bucketName
* @param keyName
* @param uploadID
* @param partNumberMarker - returns parts with part number which are greater
* than this partNumberMarker.
* @param maxParts
* @return OmMultipartUploadListParts
*/
OzoneMultipartUploadPartListParts listParts(String volumeName,
String bucketName, String keyName, String uploadID, int partNumberMarker,
int maxParts) throws IOException;
/**
* Get a valid Delegation Token.
*

View File

@ -1045,4 +1045,12 @@ public void abortMultipartUpload(String volumeName,
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
@Override
public OzoneMultipartUploadPartListParts listParts(String volumeName,
String bucketName, String keyName, String uploadID, int partNumberMarker,
int maxParts) throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
}

View File

@ -52,6 +52,8 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@ -778,8 +780,8 @@ public OzoneOutputStream createMultipartKey(String volumeName,
throws IOException {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
HddsClientUtils.checkNotNull(keyName, uploadID);
Preconditions.checkArgument(partNumber > 0, "Part number should be " +
"greater than zero");
Preconditions.checkArgument(partNumber > 0 && partNumber <=10000, "Part " +
"number should be greater than zero and less than or equal to 10000");
Preconditions.checkArgument(size >=0, "size should be greater than or " +
"equal to zero");
String requestId = UUID.randomUUID().toString();
@ -858,4 +860,35 @@ public void abortMultipartUpload(String volumeName,
ozoneManagerClient.abortMultipartUpload(omKeyArgs);
}
@Override
public OzoneMultipartUploadPartListParts listParts(String volumeName,
String bucketName, String keyName, String uploadID, int partNumberMarker,
int maxParts) throws IOException {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
HddsClientUtils.checkNotNull(uploadID);
Preconditions.checkArgument(maxParts > 0, "Max Parts Should be greater " +
"than zero");
Preconditions.checkArgument(partNumberMarker >= 0, "Part Number Marker " +
"Should be greater than or equal to zero, as part numbers starts from" +
" 1 and ranges till 10000");
OmMultipartUploadListParts omMultipartUploadListParts =
ozoneManagerClient.listParts(volumeName, bucketName, keyName,
uploadID, partNumberMarker, maxParts);
OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
new OzoneMultipartUploadPartListParts(ReplicationType.valueOf(
omMultipartUploadListParts.getReplicationType().toString()),
omMultipartUploadListParts.getNextPartNumberMarker(),
omMultipartUploadListParts.isTruncated());
for (OmPartInfo omPartInfo : omMultipartUploadListParts.getPartInfoList()) {
ozoneMultipartUploadPartListParts.addPart(
new OzoneMultipartUploadPartListParts.PartInfo(
omPartInfo.getPartNumber(), omPartInfo.getPartName(),
omPartInfo.getModificationTime(), omPartInfo.getSize()));
}
return ozoneMultipartUploadPartListParts;
}
}

View File

@ -155,42 +155,43 @@ public static boolean isReadOnly(
OzoneManagerProtocolProtos.OMRequest omRequest) {
OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType();
switch (cmdType) {
case CheckVolumeAccess:
case InfoVolume:
case ListVolume:
case InfoBucket:
case ListBuckets:
case LookupKey:
case ListKeys:
case InfoS3Bucket:
case ListS3Buckets:
case ServiceList:
return true;
case CreateVolume:
case SetVolumeProperty:
case DeleteVolume:
case CreateBucket:
case SetBucketProperty:
case DeleteBucket:
case CreateKey:
case RenameKey:
case DeleteKey:
case CommitKey:
case AllocateBlock:
case CreateS3Bucket:
case DeleteS3Bucket:
case InitiateMultiPartUpload:
case CommitMultiPartUpload:
case CompleteMultiPartUpload:
case AbortMultiPartUpload:
case GetS3Secret:
case GetDelegationToken:
case RenewDelegationToken:
case CancelDelegationToken:
return false;
default:
LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);
return false;
case CheckVolumeAccess:
case InfoVolume:
case ListVolume:
case InfoBucket:
case ListBuckets:
case LookupKey:
case ListKeys:
case InfoS3Bucket:
case ListS3Buckets:
case ServiceList:
case ListMultiPartUploadParts:
return true;
case CreateVolume:
case SetVolumeProperty:
case DeleteVolume:
case CreateBucket:
case SetBucketProperty:
case DeleteBucket:
case CreateKey:
case RenameKey:
case DeleteKey:
case CommitKey:
case AllocateBlock:
case CreateS3Bucket:
case DeleteS3Bucket:
case InitiateMultiPartUpload:
case CommitMultiPartUpload:
case CompleteMultiPartUpload:
case AbortMultiPartUpload:
case GetS3Secret:
case GetDelegationToken:
case RenewDelegationToken:
case CancelDelegationToken:
return false;
default:
LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);
return false;
}
}

View File

@ -49,7 +49,8 @@ public enum OMAction implements AuditAction {
LIST_S3BUCKETS,
INITIATE_MULTIPART_UPLOAD,
COMMIT_MULTIPART_UPLOAD_PARTKEY,
COMPLETE_MULTIPART_UPLOAD;
COMPLETE_MULTIPART_UPLOAD,
LIST_MULTIPART_UPLOAD_PARTS;
@Override
public String getAction() {

View File

@ -130,6 +130,7 @@ public enum ResultCodes {
INVALID_TOKEN,
TOKEN_EXPIRED,
TOKEN_ERROR_OTHER,
UNKNOWN
UNKNOWN,
LIST_MULTIPART_UPLOAD_PARTS_FAILED;
}
}

View File

@ -52,7 +52,7 @@ public String getUploadID() {
return uploadID;
}
public TreeMap<Integer, PartKeyInfo> getPartKeyInfoList() {
public TreeMap<Integer, PartKeyInfo> getPartKeyInfoMap() {
return partKeyInfoList;
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.helpers;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.PartInfo;
import java.util.ArrayList;
import java.util.List;
/**
* Class which is response for the list parts of a multipart upload key.
*/
public class OmMultipartUploadListParts {
private HddsProtos.ReplicationType replicationType;
//When a list is truncated, this element specifies the last part in the list,
// as well as the value to use for the part-number-marker request parameter
// in a subsequent request.
private int nextPartNumberMarker;
// Indicates whether the returned list of parts is truncated. A true value
// indicates that the list was truncated.
// A list can be truncated if the number of parts exceeds the limit
// returned in the MaxParts element.
private boolean truncated;
private final List<OmPartInfo> partInfoList = new ArrayList<>();
public OmMultipartUploadListParts(HddsProtos.ReplicationType type,
int nextMarker, boolean truncate) {
this.replicationType = type;
this.nextPartNumberMarker = nextMarker;
this.truncated = truncate;
}
public void addPart(OmPartInfo partInfo) {
partInfoList.add(partInfo);
}
public HddsProtos.ReplicationType getReplicationType() {
return replicationType;
}
public int getNextPartNumberMarker() {
return nextPartNumberMarker;
}
public boolean isTruncated() {
return truncated;
}
public void setReplicationType(HddsProtos.ReplicationType replicationType) {
this.replicationType = replicationType;
}
public List<OmPartInfo> getPartInfoList() {
return partInfoList;
}
public void addPartList(List<OmPartInfo> partInfos) {
this.partInfoList.addAll(partInfos);
}
public void addProtoPartList(List<PartInfo> partInfos) {
partInfos.forEach(partInfo -> partInfoList.add(new OmPartInfo(
partInfo.getPartNumber(), partInfo.getPartName(),
partInfo.getModificationTime(), partInfo.getSize())));
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.helpers;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartInfo;
/**
* Class that defines information about each part of a multipart upload key.
*/
public class OmPartInfo {
private int partNumber;
private String partName;
private long modificationTime;
private long size;
public OmPartInfo(int number, String name, long time, long size) {
this.partNumber = number;
this.partName = name;
this.modificationTime = time;
this.size = size;
}
public int getPartNumber() {
return partNumber;
}
public String getPartName() {
return partName;
}
public long getModificationTime() {
return modificationTime;
}
public long getSize() {
return size;
}
public PartInfo getProto() {
return PartInfo.newBuilder().setPartNumber(partNumber).setPartName(partName)
.setModificationTime(modificationTime)
.setSize(size).build();
}
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@ -350,6 +351,20 @@ OmMultipartUploadCompleteInfo completeMultipartUpload(
*/
void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException;
/**
* Returns list of parts of a multipart upload key.
* @param volumeName
* @param bucketName
* @param keyName
* @param uploadID
* @param partNumberMarker
* @param maxParts
* @return OmMultipartUploadListParts
*/
OmMultipartUploadListParts listParts(String volumeName, String bucketName,
String keyName, String uploadID, int partNumberMarker,
int maxParts) throws IOException;
/**
* Gets s3Secret for given kerberos user.
* @param kerberosID

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@ -102,6 +103,10 @@
.MultipartUploadCompleteRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadCompleteResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadListPartsRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadListPartsResponse;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.RenameKeyRequest;
import org.apache.hadoop.ozone.protocol.proto
@ -1143,6 +1148,37 @@ public void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException {
}
@Override
public OmMultipartUploadListParts listParts(String volumeName,
String bucketName, String keyName, String uploadID,
int partNumberMarker, int maxParts) throws IOException {
MultipartUploadListPartsRequest.Builder multipartUploadListPartsRequest =
MultipartUploadListPartsRequest.newBuilder();
multipartUploadListPartsRequest.setVolume(volumeName)
.setBucket(bucketName).setKey(keyName).setUploadID(uploadID)
.setPartNumbermarker(partNumberMarker).setMaxParts(maxParts);
OMRequest omRequest = createOMRequest(Type.ListMultiPartUploadParts)
.setListMultipartUploadPartsRequest(
multipartUploadListPartsRequest.build()).build();
MultipartUploadListPartsResponse response =
submitRequest(omRequest).getListMultipartUploadPartsResponse();
if (response.getStatus() != Status.OK) {
throw new IOException("List Multipart upload parts failed, error: " +
response.getStatus());
}
OmMultipartUploadListParts omMultipartUploadListParts =
new OmMultipartUploadListParts(response.getType(),
response.getNextPartNumberMarker(), response.getIsTruncated());
omMultipartUploadListParts.addProtoPartList(response.getPartsListList());
return omMultipartUploadListParts;
}
public List<ServiceInfo> getServiceList() throws IOException {
ServiceListRequest req = ServiceListRequest.newBuilder().build();

View File

@ -69,6 +69,7 @@ enum Type {
CompleteMultiPartUpload = 47;
AbortMultiPartUpload = 48;
GetS3Secret = 49;
ListMultiPartUploadParts = 50;
ServiceList = 51;
@ -116,6 +117,7 @@ message OMRequest {
optional MultipartUploadCompleteRequest completeMultiPartUploadRequest = 47;
optional MultipartUploadAbortRequest abortMultiPartUploadRequest = 48;
optional GetS3SecretRequest getS3SecretRequest = 49;
optional MultipartUploadListPartsRequest listMultipartUploadPartsRequest = 50;
optional ServiceListRequest serviceListRequest = 51;
@ -165,6 +167,7 @@ message OMResponse {
optional MultipartUploadCompleteResponse completeMultiPartUploadResponse = 47;
optional MultipartUploadAbortResponse abortMultiPartUploadResponse = 48;
optional GetS3SecretResponse getS3SecretResponse = 49;
optional MultipartUploadListPartsResponse listMultipartUploadPartsResponse = 50;
optional ServiceListResponse ServiceListResponse = 51;
@ -213,6 +216,7 @@ enum Status {
INVALID_TOKEN = 34;
TOKEN_EXPIRED = 35;
TOKEN_ERROR_OTHER = 36;
LIST_MULTIPART_UPLOAD_PARTS_FAILED = 37;
}
@ -671,6 +675,30 @@ message MultipartUploadAbortRequest {
message MultipartUploadAbortResponse {
required Status status = 1;
}
message MultipartUploadListPartsRequest {
required string volume = 1;
required string bucket = 2;
required string key = 3;
required string uploadID = 4;
optional uint32 partNumbermarker = 5;
optional uint32 maxParts = 6;
}
message MultipartUploadListPartsResponse {
required Status status = 1;
optional hadoop.hdds.ReplicationType type = 2;
optional uint32 nextPartNumberMarker = 3;
optional bool isTruncated = 4;
repeated PartInfo partsList = 5;
}
message PartInfo {
required uint32 partNumber = 1;
required string partName = 2;
required uint64 modificationTime = 3;
required uint64 size = 4;
}
message GetDelegationTokenResponseProto{
required Status status = 1;

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneKeyLocation;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
@ -1758,6 +1759,214 @@ public void testAbortUploadSuccessWithParts() throws Exception {
}
}
@Test
public void testListMultipartUploadParts() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
Map<Integer, String> partsMap = new TreeMap<>();
String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
.STAND_ALONE, ReplicationFactor.ONE);
String partName1 = uploadPart(bucket, keyName, uploadID, 1,
generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, (byte)97));
partsMap.put(1, partName1);
String partName2 =uploadPart(bucket, keyName, uploadID, 2,
generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, (byte)97));
partsMap.put(2, partName2);
String partName3 =uploadPart(bucket, keyName, uploadID, 3,
generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, (byte)97));
partsMap.put(3, partName3);
OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
bucket.listParts(keyName, uploadID, 0, 3);
Assert.assertEquals(ReplicationType.STAND_ALONE,
ozoneMultipartUploadPartListParts.getReplicationType());
Assert.assertEquals(3,
ozoneMultipartUploadPartListParts.getPartInfoList().size());
Assert.assertEquals(partsMap.get(ozoneMultipartUploadPartListParts
.getPartInfoList().get(0).getPartNumber()),
ozoneMultipartUploadPartListParts.getPartInfoList().get(0)
.getPartName());
Assert.assertEquals(partsMap.get(ozoneMultipartUploadPartListParts
.getPartInfoList().get(1).getPartNumber()),
ozoneMultipartUploadPartListParts.getPartInfoList().get(1)
.getPartName());
Assert.assertEquals(partsMap.get(ozoneMultipartUploadPartListParts
.getPartInfoList().get(2).getPartNumber()),
ozoneMultipartUploadPartListParts.getPartInfoList().get(2)
.getPartName());
Assert.assertFalse(ozoneMultipartUploadPartListParts.isTruncated());
}
@Test
public void testListMultipartUploadPartsWithContinuation()
throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
Map<Integer, String> partsMap = new TreeMap<>();
String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
.STAND_ALONE, ReplicationFactor.ONE);
String partName1 = uploadPart(bucket, keyName, uploadID, 1,
generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, (byte)97));
partsMap.put(1, partName1);
String partName2 =uploadPart(bucket, keyName, uploadID, 2,
generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, (byte)97));
partsMap.put(2, partName2);
String partName3 =uploadPart(bucket, keyName, uploadID, 3,
generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, (byte)97));
partsMap.put(3, partName3);
OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
bucket.listParts(keyName, uploadID, 0, 2);
Assert.assertEquals(ReplicationType.STAND_ALONE,
ozoneMultipartUploadPartListParts.getReplicationType());
Assert.assertEquals(2,
ozoneMultipartUploadPartListParts.getPartInfoList().size());
Assert.assertEquals(partsMap.get(ozoneMultipartUploadPartListParts
.getPartInfoList().get(0).getPartNumber()),
ozoneMultipartUploadPartListParts.getPartInfoList().get(0)
.getPartName());
Assert.assertEquals(partsMap.get(ozoneMultipartUploadPartListParts
.getPartInfoList().get(1).getPartNumber()),
ozoneMultipartUploadPartListParts.getPartInfoList().get(1)
.getPartName());
// Get remaining
Assert.assertTrue(ozoneMultipartUploadPartListParts.isTruncated());
ozoneMultipartUploadPartListParts = bucket.listParts(keyName, uploadID,
ozoneMultipartUploadPartListParts.getNextPartNumberMarker(), 2);
Assert.assertEquals(1,
ozoneMultipartUploadPartListParts.getPartInfoList().size());
Assert.assertEquals(partsMap.get(ozoneMultipartUploadPartListParts
.getPartInfoList().get(0).getPartNumber()),
ozoneMultipartUploadPartListParts.getPartInfoList().get(0)
.getPartName());
// As we don't have any parts for this, we should get false here
Assert.assertFalse(ozoneMultipartUploadPartListParts.isTruncated());
}
@Test
public void testListPartsInvalidPartMarker() throws Exception {
try {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
bucket.listParts(keyName, "random", -1, 2);
} catch (IllegalArgumentException ex) {
GenericTestUtils.assertExceptionContains("Should be greater than or " +
"equal to zero", ex);
}
}
@Test
public void testListPartsInvalidMaxParts() throws Exception {
try {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
bucket.listParts(keyName, "random", 1, -1);
} catch (IllegalArgumentException ex) {
GenericTestUtils.assertExceptionContains("Max Parts Should be greater " +
"than zero", ex);
}
}
@Test
public void testListPartsWithPartMarkerGreaterThanPartCount()
throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
.STAND_ALONE, ReplicationFactor.ONE);
uploadPart(bucket, keyName, uploadID, 1,
generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, (byte)97));
OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
bucket.listParts(keyName, uploadID, 100, 2);
// Should return empty
Assert.assertEquals(0,
ozoneMultipartUploadPartListParts.getPartInfoList().size());
Assert.assertEquals(ReplicationType.STAND_ALONE,
ozoneMultipartUploadPartListParts.getReplicationType());
// As we don't have any parts with greater than partNumberMarker and list
// is not truncated, so it should return false here.
Assert.assertFalse(ozoneMultipartUploadPartListParts.isTruncated());
}
@Test
public void testListPartsWithInvalidUploadID() throws Exception {
try {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
bucket.listParts(keyName, "random", 100, 2);
} catch (IOException ex) {
GenericTestUtils.assertExceptionContains("NO_SUCH_MULTIPART_UPLOAD", ex);
}
}
private byte[] generateData(int size, byte val) {
byte[] chars = new byte[size];

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.utils.BackgroundService;
@ -220,4 +221,20 @@ OmMultipartUploadCompleteInfo completeMultipartUpload(OmKeyArgs omKeyArgs,
* @throws IOException
*/
void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException;
/**
* Returns list of parts of a multipart upload key.
* @param volumeName
* @param bucketName
* @param keyName
* @param uploadID
* @param partNumberMarker
* @param maxParts
* @return OmMultipartUploadListParts
*/
OmMultipartUploadListParts listParts(String volumeName, String bucketName,
String keyName, String uploadID, int partNumberMarker,
int maxParts) throws IOException;
}

View File

@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
@ -31,6 +32,7 @@
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
@ -53,6 +55,8 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.PartKeyInfo;
@ -833,7 +837,7 @@ public OmMultipartUploadCompleteInfo completeMultipartUpload(
ResultCodes.NO_SUCH_MULTIPART_UPLOAD);
}
TreeMap<Integer, PartKeyInfo> partKeyInfoMap = multipartKeyInfo
.getPartKeyInfoList();
.getPartKeyInfoMap();
TreeMap<Integer, String> multipartMap = multipartUploadList
.getMultipartMap();
@ -991,7 +995,7 @@ public void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException {
} else {
// Move all the parts to delete table
TreeMap<Integer, PartKeyInfo> partKeyInfoMap = multipartKeyInfo
.getPartKeyInfoList();
.getPartKeyInfoMap();
DBStore store = metadataManager.getStore();
try (BatchOperation batch = store.initBatchOperation()) {
for (Map.Entry<Integer, PartKeyInfo> partKeyInfoEntry : partKeyInfoMap
@ -1024,4 +1028,81 @@ public void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException {
}
@Override
public OmMultipartUploadListParts listParts(String volumeName,
String bucketName, String keyName, String uploadID,
int partNumberMarker, int maxParts) throws IOException {
Preconditions.checkNotNull(volumeName);
Preconditions.checkNotNull(bucketName);
Preconditions.checkNotNull(keyName);
Preconditions.checkNotNull(uploadID);
boolean isTruncated = false;
int nextPartNumberMarker = 0;
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
String multipartKey = metadataManager.getMultipartKey(volumeName,
bucketName, keyName, uploadID);
OmMultipartKeyInfo multipartKeyInfo =
metadataManager.getMultipartInfoTable().get(multipartKey);
if (multipartKeyInfo == null) {
throw new OMException("No Such Multipart upload exists for this key.",
ResultCodes.NO_SUCH_MULTIPART_UPLOAD);
} else {
TreeMap<Integer, PartKeyInfo> partKeyInfoMap =
multipartKeyInfo.getPartKeyInfoMap();
Iterator<Map.Entry<Integer, PartKeyInfo>> partKeyInfoMapIterator =
partKeyInfoMap.entrySet().iterator();
HddsProtos.ReplicationType replicationType =
partKeyInfoMap.firstEntry().getValue().getPartKeyInfo().getType();
int count = 0;
List<OmPartInfo> omPartInfoList = new ArrayList<>();
while (count < maxParts && partKeyInfoMapIterator.hasNext()) {
Map.Entry<Integer, PartKeyInfo> partKeyInfoEntry =
partKeyInfoMapIterator.next();
nextPartNumberMarker = partKeyInfoEntry.getKey();
// As we should return only parts with part number greater
// than part number marker
if (partKeyInfoEntry.getKey() > partNumberMarker) {
PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue();
OmPartInfo omPartInfo = new OmPartInfo(partKeyInfo.getPartNumber(),
partKeyInfo.getPartName(),
partKeyInfo.getPartKeyInfo().getModificationTime(),
partKeyInfo.getPartKeyInfo().getDataSize());
omPartInfoList.add(omPartInfo);
replicationType = partKeyInfo.getPartKeyInfo().getType();
count++;
}
}
if (partKeyInfoMapIterator.hasNext()) {
Map.Entry<Integer, PartKeyInfo> partKeyInfoEntry =
partKeyInfoMapIterator.next();
isTruncated = true;
} else {
isTruncated = false;
nextPartNumberMarker = 0;
}
OmMultipartUploadListParts omMultipartUploadListParts =
new OmMultipartUploadListParts(replicationType,
nextPartNumberMarker, isTruncated);
omMultipartUploadListParts.addPartList(omPartInfoList);
return omMultipartUploadListParts;
}
} catch (OMException ex) {
throw ex;
} catch (IOException ex){
LOG.error("List Multipart Upload Parts Failed: volume: " + volumeName +
"bucket: " + bucketName + "key: " + keyName, ex);
throw new OMException(ex.getMessage(), ResultCodes
.LIST_MULTIPART_UPLOAD_PARTS_FAILED);
} finally {
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
}

View File

@ -92,6 +92,8 @@ public class OMMetrics {
private @Metric MutableCounterLong numCompleteMultipartUploadFails;
private @Metric MutableCounterLong numAbortMultipartUploads;
private @Metric MutableCounterLong numAbortMultipartUploadFails;
private @Metric MutableCounterLong numListMultipartUploadParts;
private @Metric MutableCounterLong numListMultipartUploadPartFails;
// Metrics for total number of volumes, buckets and keys
@ -269,6 +271,15 @@ public void incNumAbortMultipartUploadFails() {
numAbortMultipartUploadFails.incr();
}
public void incNumListMultipartUploadParts() {
numKeyOps.incr();
numListMultipartUploadParts.incr();
}
public void incNumListMultipartUploadPartFails() {
numListMultipartUploadPartFails.incr();
}
public void incNumGetServiceLists() {
numGetServiceLists.incr();
}

View File

@ -76,6 +76,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@ -2104,8 +2105,33 @@ public void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException {
}
@Override
public OmMultipartUploadListParts listParts(String volumeName,
String bucketName, String keyName, String uploadID, int partNumberMarker,
int maxParts) throws IOException {
Map<String, String> auditMap = new HashMap<>();
auditMap.put(OzoneConsts.VOLUME, volumeName);
auditMap.put(OzoneConsts.BUCKET, bucketName);
auditMap.put(OzoneConsts.KEY, keyName);
auditMap.put(OzoneConsts.UPLOAD_ID, uploadID);
auditMap.put(OzoneConsts.PART_NUMBER_MARKER,
Integer.toString(partNumberMarker));
auditMap.put(OzoneConsts.MAX_PARTS, Integer.toString(maxParts));
metrics.incNumListMultipartUploadParts();
try {
OmMultipartUploadListParts omMultipartUploadListParts =
keyManager.listParts(volumeName, bucketName, keyName, uploadID,
partNumberMarker, maxParts);
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction
.LIST_MULTIPART_UPLOAD_PARTS, auditMap));
return omMultipartUploadListParts;
} catch (IOException ex) {
metrics.incNumAbortMultipartUploadFails();
AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction
.LIST_MULTIPART_UPLOAD_PARTS, auditMap, ex));
throw ex;
}
}
/**
* Startup options.

View File

@ -19,6 +19,7 @@
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
import java.util.stream.Collectors;
@ -35,6 +36,8 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@ -118,6 +121,10 @@
.MultipartUploadCompleteRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadCompleteResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadListPartsRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadListPartsResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@ -325,6 +332,11 @@ public OMResponse handle(OMRequest request) {
responseBuilder.setAbortMultiPartUploadResponse(
abortMultiPartAbortResponse);
break;
case ListMultiPartUploadParts:
MultipartUploadListPartsResponse listPartsResponse =
listParts(request.getListMultipartUploadPartsRequest());
responseBuilder.setListMultipartUploadPartsResponse(listPartsResponse);
break;
case ServiceList:
ServiceListResponse serviceListResponse = getServiceList(
request.getServiceListRequest());
@ -417,6 +429,8 @@ private Status exceptionToResponseStatus(IOException ex) {
return Status.ENTITY_TOO_SMALL;
case ABORT_MULTIPART_UPLOAD_FAILED:
return Status.ABORT_MULTIPART_UPLOAD_FAILED;
case LIST_MULTIPART_UPLOAD_PARTS_FAILED:
return Status.LIST_MULTIPART_UPLOAD_PARTS_FAILED;
case INVALID_AUTH_METHOD:
return Status.INVALID_AUTH_METHOD;
case INVALID_TOKEN:
@ -957,6 +971,44 @@ private MultipartUploadAbortResponse abortMultipartUpload(
return response.build();
}
private MultipartUploadListPartsResponse listParts(
MultipartUploadListPartsRequest multipartUploadListPartsRequest) {
MultipartUploadListPartsResponse.Builder response =
MultipartUploadListPartsResponse.newBuilder();
try {
OmMultipartUploadListParts omMultipartUploadListParts =
impl.listParts(multipartUploadListPartsRequest.getVolume(),
multipartUploadListPartsRequest.getBucket(),
multipartUploadListPartsRequest.getKey(),
multipartUploadListPartsRequest.getUploadID(),
multipartUploadListPartsRequest.getPartNumbermarker(),
multipartUploadListPartsRequest.getMaxParts());
List<OmPartInfo> omPartInfoList =
omMultipartUploadListParts.getPartInfoList();
List<OzoneManagerProtocolProtos.PartInfo> partInfoList =
new ArrayList<>();
omPartInfoList.forEach(partInfo -> partInfoList.add(partInfo.getProto()));
response.setType(omMultipartUploadListParts.getReplicationType());
response.setNextPartNumberMarker(
omMultipartUploadListParts.getNextPartNumberMarker());
response.setIsTruncated(omMultipartUploadListParts.isTruncated());
response.setStatus(Status.OK);
return response.addAllPartsList(partInfoList).build();
} catch (IOException ex) {
response.setStatus(exceptionToResponseStatus(ex));
}
return response.build();
}
private GetDelegationTokenResponseProto getDelegationToken(
GetDelegationTokenRequestProto request){
GetDelegationTokenResponseProto.Builder rb =