HDDS-879. MultipartUpload: Add InitiateMultipartUpload in ozone. Contributed by Bharat Viswanadham.

This commit is contained in:
Márton Elek 2018-12-10 10:05:38 +01:00
parent 1c09a10e96
commit 17a8708039
24 changed files with 758 additions and 4 deletions

View File

@ -47,6 +47,8 @@ public interface DBStore extends AutoCloseable {
* Gets an existing TableStore with implicit key/value conversion.
*
* @param name - Name of the TableStore to get
* @param keyType
* @param valueType
* @return - TableStore.
* @throws IOException on Failure
*/

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import java.io.IOException;
import java.util.Iterator;
@ -323,6 +324,35 @@ public void renameKey(String fromKeyName, String toKeyName)
proxy.renameKey(volumeName, name, fromKeyName, toKeyName);
}
/**
* Initiate multipart upload for a specified key.
* @param keyName
* @param type
* @param factor
* @return OmMultipartInfo
* @throws IOException
*/
public OmMultipartInfo initiateMultipartUpload(String keyName,
ReplicationType type,
ReplicationFactor factor)
throws IOException {
return proxy.initiateMultipartUpload(volumeName, name, keyName, type,
factor);
}
/**
* Initiate multipart upload for a specified key, with default replication
* type RATIS and with replication factor THREE.
* @param key Name of the key to be created.
* @return OmMultipartInfo.
* @throws IOException
*/
public OmMultipartInfo initiateMultipartUpload(String key)
throws IOException {
return initiateMultipartUpload(key, defaultReplicationType,
defaultReplication);
}
/**
* An Iterator to iterate over {@link OzoneKey} list.
*/

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import java.io.IOException;
import java.util.List;
@ -385,4 +386,20 @@ List<OzoneBucket> listS3Buckets(String userName, String bucketPrefix,
*/
void close() throws IOException;
/**
* Initiate Multipart upload.
* @param volumeName
* @param bucketName
* @param keyName
* @param type
* @param factor
* @return {@link OmMultipartInfo}
* @throws IOException
*/
OmMultipartInfo initiateMultipartUpload(String volumeName, String
bucketName, String keyName, ReplicationType type, ReplicationFactor
factor) throws IOException;
}

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
import org.apache.hadoop.ozone.web.response.ListBuckets;
@ -951,4 +952,15 @@ private void addQueryParamter(String param, String value,
builder.addParameter(param, value);
}
}
@Override
public OmMultipartInfo initiateMultipartUpload(String volumeName,
String bucketName,
String keyName,
ReplicationType type,
ReplicationFactor factor)
throws IOException {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
}

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@ -678,4 +679,26 @@ public void close() throws IOException {
IOUtils.cleanupWithLogger(LOG, ozoneManagerClient);
IOUtils.cleanupWithLogger(LOG, xceiverClientManager);
}
@Override
public OmMultipartInfo initiateMultipartUpload(String volumeName,
String bucketName,
String keyName,
ReplicationType type,
ReplicationFactor factor)
throws IOException {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
HddsClientUtils.checkNotNull(keyName, type, factor);
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setType(HddsProtos.ReplicationType.valueOf(type.toString()))
.setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
.build();
OmMultipartInfo multipartInfo = ozoneManagerClient
.initiateMultipartUpload(keyArgs);
return multipartInfo;
}
}

View File

@ -46,7 +46,8 @@ public enum OMAction implements AuditAction {
READ_VOLUME,
READ_BUCKET,
READ_KEY,
LIST_S3BUCKETS;
LIST_S3BUCKETS,
INITIATE_MULTIPART_UPLOAD;
@Override
public String getAction() {

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
import org.apache.hadoop.utils.db.DBStore;
@ -251,6 +252,27 @@ List<OmVolumeArgs> listVolumes(String userName, String prefix,
*/
Table<byte[], byte[]> getS3Table();
/**
* Returns the DB key name of a multipart upload key in OM metadata store.
*
* @param volume - volume name
* @param bucket - bucket name
* @param key - key name
* @param uploadId - the upload id for this key
* @return bytes of DB key.
*/
String getMultipartKey(String volume, String bucket, String key, String
uploadId);
/**
* Gets the multipart info table which holds the information about
* multipart upload information of the keys.
* @return Table
*/
Table<String, OmMultipartKeyInfo> getMultipartInfoTable();
/**
* Returns number of rows in a table. This should not be used for very
* large tables.

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.codec;
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.utils.db.Codec;
/**
* Codec Registry for OmMultipartKeyInfo.
*/
public class OmMultipartKeyInfoCodec implements Codec<OmMultipartKeyInfo> {
@Override
public byte[] toPersistedFormat(OmMultipartKeyInfo object) {
Preconditions.checkNotNull(object,
"Null object can't be converted to byte array.");
return object.getProto().toByteArray();
}
@Override
/**
* Construct {@link OmMultipartKeyInfo} from byte[]. If unable to convert
* return null.
*/
public OmMultipartKeyInfo fromPersistedFormat(byte[] rawData) {
Preconditions.checkNotNull(
"Null byte array can't converted to real object.");
try {
return OmMultipartKeyInfo.getFromProto(OzoneManagerProtocolProtos
.MultipartKeyInfo.parseFrom(rawData));
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(
"Can't encode the the raw data from the byte array", e);
}
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om.helpers;
/**
* Class which holds information about the response of initiate multipart
* upload request.
*/
public class OmMultipartInfo {
private String volumeName;
private String bucketName;
private String keyName;
private String uploadID;
/**
* Construct OmMultipartInfo object which holds information about the
* response from initiate multipart upload request.
* @param volume
* @param bucket
* @param key
* @param id
*/
public OmMultipartInfo(String volume, String bucket, String key, String id) {
this.volumeName = volume;
this.bucketName = bucket;
this.keyName = key;
this.uploadID = id;
}
/**
* Return volume name.
* @return volumeName
*/
public String getVolumeName() {
return volumeName;
}
/**
* Return bucket name.
* @return bucketName
*/
public String getBucketName() {
return bucketName;
}
/**
* Return key name.
* @return keyName
*/
public String getKeyName() {
return keyName;
}
/**
* Return uploadID.
* @return uploadID
*/
public String getUploadID() {
return uploadID;
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om.helpers;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartKeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.PartKeyInfo;
import java.util.HashMap;
import java.util.Map;
/**
* This class represents multipart upload information for a key, which holds
* upload part information of the key.
*/
public class OmMultipartKeyInfo {
private String uploadID;
private Map<Integer, PartKeyInfo> partKeyInfoList;
/**
* Construct OmMultipartKeyInfo object which holds multipart upload
* information for a key.
* @param id
* @param list upload parts of a key.
*/
public OmMultipartKeyInfo(String id, Map<Integer, PartKeyInfo> list) {
this.uploadID = id;
this.partKeyInfoList = new HashMap<>(list);
}
/**
* Returns the uploadID for this multi part upload of a key.
* @return uploadID
*/
public String getUploadID() {
return uploadID;
}
/**
* Construct OmMultipartInfo from MultipartKeyInfo proto object.
* @param multipartKeyInfo
* @return OmMultipartKeyInfo
*/
public static OmMultipartKeyInfo getFromProto(MultipartKeyInfo
multipartKeyInfo) {
Map<Integer, PartKeyInfo> list = new HashMap<>();
multipartKeyInfo.getPartKeyInfoListList().stream().forEach(partKeyInfo
-> list.put(partKeyInfo.getPartNumber(), partKeyInfo));
return new OmMultipartKeyInfo(multipartKeyInfo.getUploadID(), list);
}
/**
* Construct MultipartKeyInfo from this object.
* @return MultipartKeyInfo
*/
public MultipartKeyInfo getProto() {
MultipartKeyInfo.Builder builder = MultipartKeyInfo.newBuilder()
.setUploadID(uploadID);
partKeyInfoList.forEach((key, value) -> builder.addPartKeyInfoList(value));
return builder.build();
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
return other instanceof OmMultipartKeyInfo && uploadID.equals(
((OmMultipartKeyInfo)other).getUploadID());
}
@Override
public int hashCode() {
return uploadID.hashCode();
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.om.protocol;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@ -304,5 +305,13 @@ List<OmBucketInfo> listS3Buckets(String userName, String startBucketName,
String bucketPrefix, int maxNumOfBuckets)
throws IOException;
/**
* Initiate multipart upload for the specified key.
* @param keyArgs
* @return MultipartInfo
* @throws IOException
*/
OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException;
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@ -70,6 +71,10 @@
.OzoneManagerProtocolProtos.LocateKeyRequest;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.LocateKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateResponse;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.RenameKeyRequest;
import org.apache.hadoop.ozone.protocol.proto
@ -884,4 +889,34 @@ public List<OmBucketInfo> listS3Buckets(String userName, String startKey,
public Object getUnderlyingProxyObject() {
return null;
}
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws
IOException {
MultipartInfoInitiateRequest.Builder multipartInfoInitiateRequest =
MultipartInfoInitiateRequest.newBuilder();
KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
.setVolumeName(omKeyArgs.getVolumeName())
.setBucketName(omKeyArgs.getBucketName())
.setKeyName(omKeyArgs.getKeyName())
.setFactor(omKeyArgs.getFactor())
.setType(omKeyArgs.getType());
multipartInfoInitiateRequest.setKeyArgs(keyArgs.build());
MultipartInfoInitiateResponse resp;
try {
resp = rpcProxy.initiateMultiPartUpload(NULL_RPC_CONTROLLER,
multipartInfoInitiateRequest.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() != Status.OK) {
throw new IOException("Initiate Multipart upload failed, error:" + resp
.getStatus());
}
return new OmMultipartInfo(resp.getVolumeName(), resp.getBucketName(), resp
.getKeyName(), resp.getMultipartUploadID());
}
}

View File

@ -60,6 +60,7 @@ enum Status {
SCM_VERSION_MISMATCH_ERROR = 21;
S3_BUCKET_NOT_FOUND = 22;
S3_BUCKET_ALREADY_EXISTS = 23;
INITIATE_MULTIPART_UPLOAD_ERROR = 24;
}
@ -406,6 +407,31 @@ message S3ListBucketsResponse {
}
message MultipartInfoInitiateRequest {
required KeyArgs keyArgs = 1;
}
message MultipartInfoInitiateResponse {
required string volumeName = 1;
required string bucketName = 2;
required string keyName = 3;
required string multipartUploadID = 4;
required Status status = 5;
}
message MultipartKeyInfo {
required string uploadID = 4;
repeated PartKeyInfo partKeyInfoList = 5;
}
message PartKeyInfo {
required string partName = 1;
required uint32 partNumber = 2;
required KeyInfo partKeyInfo = 3;
}
/**
The OM service that takes care of Ozone namespace.
*/
@ -541,4 +567,7 @@ service OzoneManagerService {
rpc listS3Buckets(S3ListBucketsRequest)
returns(S3ListBucketsResponse);
rpc initiateMultiPartUpload(MultipartInfoInitiateRequest)
returns (MultipartInfoInitiateResponse);
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.codec;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.UUID;
/**
* This class tests OmMultipartKeyInfoCodec.
*/
public class TestOmMultipartKeyInfoCodec {
@Test
public void testOmMultipartKeyInfoCodec() {
OmMultipartKeyInfoCodec codec = new OmMultipartKeyInfoCodec();
OmMultipartKeyInfo omMultipartKeyInfo = new OmMultipartKeyInfo(UUID
.randomUUID().toString(), new HashMap<>());
byte[] data = codec.toPersistedFormat(omMultipartKeyInfo);
Assert.assertNotNull(data);
OmMultipartKeyInfo multipartKeyInfo = codec.fromPersistedFormat(data);
Assert.assertEquals(omMultipartKeyInfo, multipartKeyInfo);
// When random byte data passed returns null.
try {
codec.fromPersistedFormat("radom".getBytes());
} catch (IllegalArgumentException ex) {
GenericTestUtils.assertExceptionContains("Can't encode the the raw " +
"data from the byte array", ex);
}
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* <p>
* Utility classes to encode/decode DTO objects to/from byte array.
*/
/**
* Unit tests for codec's in OM.
*/
package org.apache.hadoop.ozone.om.codec;

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.protocolPB.
StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.AfterClass;
@ -75,6 +76,8 @@
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.either;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
/**
@ -1259,6 +1262,73 @@ public void testListKeyOnEmptyBucket()
}
}
@Test
public void testInitiateMultipartUploadWithReplicationInformationSet() throws
IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
assertNotNull(multipartInfo);
String uploadID = multipartInfo.getUploadID();
Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
Assert.assertEquals(bucketName, multipartInfo.getBucketName());
Assert.assertEquals(keyName, multipartInfo.getKeyName());
assertNotNull(multipartInfo.getUploadID());
// Call initiate multipart upload for the same key again, this should
// generate a new uploadID.
multipartInfo = bucket.initiateMultipartUpload(keyName,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
assertNotNull(multipartInfo);
Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
Assert.assertEquals(bucketName, multipartInfo.getBucketName());
Assert.assertEquals(keyName, multipartInfo.getKeyName());
assertNotEquals(multipartInfo.getUploadID(), uploadID);
assertNotNull(multipartInfo.getUploadID());
}
@Test
public void testInitiateMultipartUploadWithDefaultReplication() throws
IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName);
assertNotNull(multipartInfo);
String uploadID = multipartInfo.getUploadID();
Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
Assert.assertEquals(bucketName, multipartInfo.getBucketName());
Assert.assertEquals(keyName, multipartInfo.getKeyName());
assertNotNull(multipartInfo.getUploadID());
// Call initiate multipart upload for the same key again, this should
// generate a new uploadID.
multipartInfo = bucket.initiateMultipartUpload(keyName);
assertNotNull(multipartInfo);
Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
Assert.assertEquals(bucketName, multipartInfo.getBucketName());
Assert.assertEquals(keyName, multipartInfo.getKeyName());
assertNotEquals(multipartInfo.getUploadID(), uploadID);
assertNotNull(multipartInfo.getUploadID());
}
/**
* Close OzoneClient and shutdown MiniOzoneCluster.
*/

View File

@ -261,18 +261,21 @@ public void testKeyOps() throws IOException {
Mockito.doReturn(null).when(mockKm).lookupKey(null);
Mockito.doReturn(null).when(mockKm).listKeys(null, null, null, null, 0);
Mockito.doNothing().when(mockKm).commitKey(any(OmKeyArgs.class), anyLong());
Mockito.doReturn(null).when(mockKm).initiateMultipartUpload(
any(OmKeyArgs.class));
HddsWhiteboxTestUtils.setInternalState(
ozoneManager, "keyManager", mockKm);
doKeyOps();
MetricsRecordBuilder omMetrics = getMetrics("OMMetrics");
assertCounter("NumKeyOps", 5L, omMetrics);
assertCounter("NumKeyOps", 6L, omMetrics);
assertCounter("NumKeyAllocate", 1L, omMetrics);
assertCounter("NumKeyLookup", 1L, omMetrics);
assertCounter("NumKeyDeletes", 1L, omMetrics);
assertCounter("NumKeyLists", 1L, omMetrics);
assertCounter("NumKeys", 0L, omMetrics);
assertCounter("NumInitiateMultipartUploads", 1L, omMetrics);
ozoneManager.openKey(null);
@ -295,22 +298,27 @@ public void testKeyOps() throws IOException {
null, null, null, null, 0);
Mockito.doThrow(exception).when(mockKm).commitKey(any(OmKeyArgs.class),
anyLong());
Mockito.doThrow(exception).when(mockKm).initiateMultipartUpload(
any(OmKeyArgs.class));
HddsWhiteboxTestUtils.setInternalState(
ozoneManager, "keyManager", mockKm);
doKeyOps();
omMetrics = getMetrics("OMMetrics");
assertCounter("NumKeyOps", 17L, omMetrics);
assertCounter("NumKeyOps", 19L, omMetrics);
assertCounter("NumKeyAllocate", 5L, omMetrics);
assertCounter("NumKeyLookup", 2L, omMetrics);
assertCounter("NumKeyDeletes", 3L, omMetrics);
assertCounter("NumKeyLists", 2L, omMetrics);
assertCounter("NumInitiateMultipartUploads", 2L, omMetrics);
assertCounter("NumKeyAllocateFails", 1L, omMetrics);
assertCounter("NumKeyLookupFails", 1L, omMetrics);
assertCounter("NumKeyDeleteFails", 1L, omMetrics);
assertCounter("NumKeyListFails", 1L, omMetrics);
assertCounter("NumInitiateMultipartUploadFails", 1L, omMetrics);
assertCounter("NumKeys", 2L, omMetrics);
@ -412,6 +420,12 @@ private void doKeyOps() {
ozoneManager.commitKey(createKeyArgs(), 0);
} catch (IOException ignored) {
}
try {
ozoneManager.initiateMultipartUpload(null);
} catch (IOException ignored) {
}
}
private OmKeyArgs createKeyArgs() {

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.utils.BackgroundService;
@ -180,4 +181,13 @@ List<OmKeyInfo> listKeys(String volumeName,
*/
BackgroundService getDeletingService();
/**
* Initiate multipart upload for the specified key.
* @param keyArgs
* @return MultipartInfo
* @throws IOException
*/
OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException;
}

View File

@ -20,6 +20,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.StorageUnit;
@ -37,7 +40,11 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.PartKeyInfo;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.BackgroundService;
import org.apache.hadoop.utils.db.BatchOperation;
@ -54,6 +61,7 @@
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -532,4 +540,74 @@ public OMMetadataManager getMetadataManager() {
public BackgroundService getDeletingService() {
return keyDeletingService;
}
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws
IOException {
Preconditions.checkNotNull(omKeyArgs);
String volumeName = omKeyArgs.getVolumeName();
String bucketName = omKeyArgs.getBucketName();
String keyName = omKeyArgs.getKeyName();
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
long time = Time.monotonicNowNanos();
String uploadID = UUID.randomUUID().toString() + "-" + Long.toString(
time);
// We are adding uploadId to key, because if multiple users try to
// perform multipart upload on the same key, each will try to upload, who
// ever finally commit the key, we see that key in ozone. Suppose if we
// don't add id, and use the same key /volume/bucket/key, when multiple
// users try to upload the key, we update the parts of the key's from
// multiple users to same key, and the key output can be a mix of the
// parts from multiple users.
// So on same key if multiple time multipart upload is initiated we
// store multiple entries in the openKey Table.
// Checked AWS S3, when we try to run multipart upload, each time a
// new uploadId is returned.
String multipartKey = metadataManager.getMultipartKey(volumeName,
bucketName, keyName, uploadID);
// Not checking if there is an already key for this in the keyTable, as
// during final complete multipart upload we take care of this.
Map<Integer, PartKeyInfo> partKeyInfoMap = new HashMap<>();
OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(uploadID,
partKeyInfoMap);
List<OmKeyLocationInfo> locations = new ArrayList<>();
OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
.setVolumeName(omKeyArgs.getVolumeName())
.setBucketName(omKeyArgs.getBucketName())
.setKeyName(omKeyArgs.getKeyName())
.setCreationTime(Time.now())
.setModificationTime(Time.now())
.setReplicationType(omKeyArgs.getType())
.setReplicationFactor(omKeyArgs.getFactor())
.setOmKeyLocationInfos(Collections.singletonList(
new OmKeyLocationInfoGroup(0, locations)))
.build();
DBStore store = metadataManager.getStore();
try (BatchOperation batch = store.initBatchOperation()) {
// Create an entry in open key table and multipart info table for
// this key.
metadataManager.getMultipartInfoTable().putWithBatch(batch,
multipartKey, multipartKeyInfo);
metadataManager.getOpenKeyTable().putWithBatch(batch,
multipartKey, omKeyInfo);
store.commitBatchOperation(batch);
return new OmMultipartInfo(volumeName, bucketName, keyName, uploadID);
}
} catch (IOException ex) {
LOG.error("Initiate Multipart upload Failed for volume:{} bucket:{} " +
"key:{}", volumeName, bucketName, keyName, ex);
throw new OMException(ex.getMessage(),
OMException.ResultCodes.INITIATE_MULTIPART_UPLOAD_FAILED);
} finally {
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
}

View File

@ -61,6 +61,7 @@ public class OMMetrics {
private @Metric MutableCounterLong numAllocateBlockCalls;
private @Metric MutableCounterLong numGetServiceLists;
private @Metric MutableCounterLong numListS3Buckets;
private @Metric MutableCounterLong numInitiateMultipartUploads;
// Failure Metrics
@ -84,6 +85,7 @@ public class OMMetrics {
private @Metric MutableCounterLong numBlockAllocateCallFails;
private @Metric MutableCounterLong numGetServiceListFails;
private @Metric MutableCounterLong numListS3BucketsFails;
private @Metric MutableCounterLong numInitiateMultipartUploadFails;
// Metrics for total number of volumes, buckets and keys
@ -225,6 +227,15 @@ public void incNumListS3BucketsFails() {
numListS3BucketsFails.incr();
}
public void incNumInitiateMultipartUploads() {
numKeyOps.incr();
numInitiateMultipartUploads.incr();
}
public void incNumInitiateMultipartUploadFails() {
numInitiateMultipartUploadFails.incr();
}
public void incNumGetServiceLists() {
numGetServiceLists.incr();
}
@ -535,6 +546,14 @@ public long getNumListS3BucketsFails() {
return numListS3BucketsFails.value();
}
public long getNumInitiateMultipartUploads() {
return numInitiateMultipartUploads.value();
}
public long getNumInitiateMultipartUploadFails() {
return numInitiateMultipartUploadFails.value();
}
public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);

View File

@ -32,6 +32,8 @@
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.codec.OmBucketInfoCodec;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.codec.OmMultipartKeyInfoCodec;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.codec.OmKeyInfoCodec;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@ -97,6 +99,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
private static final String DELETED_TABLE = "deletedTable";
private static final String OPEN_KEY_TABLE = "openKeyTable";
private static final String S3_TABLE = "s3Table";
private static final String MULTIPARTINFO_TABLE = "multipartInfoTable";
private DBStore store;
@ -110,6 +113,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
private Table deletedTable;
private Table openKeyTable;
private Table s3Table;
private Table<String, OmMultipartKeyInfo> multipartInfoTable;
public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
this.lock = new OzoneManagerLock(conf);
@ -154,6 +158,11 @@ public Table<byte[], byte[]> getS3Table() {
return s3Table;
}
@Override
public Table<String, OmMultipartKeyInfo> getMultipartInfoTable() {
return multipartInfoTable;
}
private void checkTableStatus(Table table, String name) throws IOException {
String logMessage = "Unable to get a reference to %s table. Cannot " +
@ -186,10 +195,12 @@ public void start(OzoneConfiguration configuration) throws IOException {
.addTable(DELETED_TABLE)
.addTable(OPEN_KEY_TABLE)
.addTable(S3_TABLE)
.addTable(MULTIPARTINFO_TABLE)
.addCodec(OmKeyInfo.class, new OmKeyInfoCodec())
.addCodec(OmBucketInfo.class, new OmBucketInfoCodec())
.addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec())
.addCodec(VolumeList.class, new VolumeListCodec())
.addCodec(OmMultipartKeyInfo.class, new OmMultipartKeyInfoCodec())
.build();
userTable =
@ -219,6 +230,10 @@ public void start(OzoneConfiguration configuration) throws IOException {
s3Table = this.store.getTable(S3_TABLE);
checkTableStatus(s3Table, S3_TABLE);
multipartInfoTable = this.store.getTable(MULTIPARTINFO_TABLE,
String.class, OmMultipartKeyInfo.class);
checkTableStatus(multipartInfoTable, MULTIPARTINFO_TABLE);
}
}
@ -301,6 +316,15 @@ public String getOpenKey(String volume, String bucket,
return openKey;
}
@Override
public String getMultipartKey(String volume, String bucket, String key,
String
uploadId) {
String multipartKey = OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket +
OM_KEY_PREFIX + key + OM_KEY_PREFIX + uploadId;
return multipartKey;
}
/**
* Returns the OzoneManagerLock used on Metadata DB.
*

View File

@ -61,6 +61,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@ -1553,6 +1554,26 @@ public List<OmBucketInfo> listS3Buckets(String userName, String startKey,
}
}
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
IOException {
OmMultipartInfo multipartInfo;
metrics.incNumInitiateMultipartUploads();
try {
multipartInfo = keyManager.initiateMultipartUpload(keyArgs);
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
OMAction.INITIATE_MULTIPART_UPLOAD, (keyArgs == null) ? null :
keyArgs.toAuditMap()));
} catch (IOException ex) {
AUDIT.logWriteFailure(buildAuditMessageForFailure(
OMAction.INITIATE_MULTIPART_UPLOAD,
(keyArgs == null) ? null : keyArgs.toAuditMap(), ex));
metrics.incNumInitiateMultipartUploadFails();
throw ex;
}
return multipartInfo;
}

View File

@ -115,6 +115,7 @@ public enum ResultCodes {
SCM_VERSION_MISMATCH_ERROR,
SCM_IN_CHILL_MODE,
S3_BUCKET_ALREADY_EXISTS,
S3_BUCKET_NOT_FOUND
S3_BUCKET_NOT_FOUND,
INITIATE_MULTIPART_UPLOAD_FAILED;
}
}

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@ -85,6 +86,10 @@
.LocateKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.LocateKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.RenameKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@ -190,6 +195,8 @@ private Status exceptionToResponseStatus(IOException ex) {
return Status.S3_BUCKET_ALREADY_EXISTS;
case S3_BUCKET_NOT_FOUND:
return Status.S3_BUCKET_NOT_FOUND;
case INITIATE_MULTIPART_UPLOAD_FAILED:
return Status.INITIATE_MULTIPART_UPLOAD_ERROR;
default:
return Status.INTERNAL_ERROR;
}
@ -651,4 +658,30 @@ public S3ListBucketsResponse listS3Buckets(RpcController controller,
}
return resp.build();
}
@Override
public MultipartInfoInitiateResponse initiateMultiPartUpload(
RpcController controller, MultipartInfoInitiateRequest request) {
MultipartInfoInitiateResponse.Builder resp = MultipartInfoInitiateResponse
.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setType(keyArgs.getType())
.setFactor(keyArgs.getFactor())
.build();
OmMultipartInfo multipartInfo = impl.initiateMultipartUpload(omKeyArgs);
resp.setVolumeName(multipartInfo.getVolumeName());
resp.setBucketName(multipartInfo.getBucketName());
resp.setKeyName(multipartInfo.getKeyName());
resp.setMultipartUploadID(multipartInfo.getUploadID());
resp.setStatus(Status.OK);
} catch (IOException ex) {
resp.setStatus(exceptionToResponseStatus(ex));
}
return resp.build();
}
}