diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java index 280461a1c4..be879d8388 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java @@ -210,6 +210,8 @@ public static boolean isReadOnly( case GetDelegationToken: case RenewDelegationToken: case CancelDelegationToken: + case ApplyCreateKey: + case ApplyInitiateMultiPartUpload: return false; default: LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java index 3863a52fc6..0cbab08799 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java @@ -26,6 +26,7 @@ public enum OMAction implements AuditAction { ALLOCATE_BLOCK, ADD_ALLOCATE_BLOCK, ALLOCATE_KEY, + APPLY_ALLOCATE_KEY, COMMIT_KEY, CREATE_VOLUME, CREATE_BUCKET, diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java index 34980f66c1..b2f805acd3 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java @@ -23,6 +23,8 @@ * Exception thrown by Ozone Manager. */ public class OMException extends IOException { + + public static final String STATUS_CODE = "STATUS_CODE="; private final OMException.ResultCodes result; /** diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java index 7390fe2cb6..8357df22e0 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java @@ -20,6 +20,11 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .KeyLocation; @@ -52,4 +57,29 @@ OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID, KeyLocation keyLocation) throws IOException; + /** + * Add the openKey entry with given keyInfo and clientID in to openKeyTable. + * This will be called only from applyTransaction, once after calling + * applyKey in startTransaction. + * + * @param omKeyArgs + * @param keyInfo + * @param clientID + * @throws IOException + */ + void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID) + throws IOException; + + /** + * Initiate multipart upload for the specified key. + * + * This will be called only from applyTransaction. + * @param omKeyArgs + * @param multipartUploadID + * @return OmMultipartInfo + * @throws IOException + */ + OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs, + String multipartUploadID) throws IOException; + } diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 674edbb7ce..eac8d2a3bf 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -60,6 +60,7 @@ enum Type { ListKeys = 35; CommitKey = 36; AllocateBlock = 37; + ApplyCreateKey = 38; CreateS3Bucket = 41; DeleteS3Bucket = 42; @@ -74,6 +75,8 @@ enum Type { ServiceList = 51; + ApplyInitiateMultiPartUpload = 52; + GetDelegationToken = 61; RenewDelegationToken = 62; CancelDelegationToken = 63; @@ -110,6 +113,8 @@ message OMRequest { optional ListKeysRequest listKeysRequest = 35; optional CommitKeyRequest commitKeyRequest = 36; optional AllocateBlockRequest allocateBlockRequest = 37; + optional ApplyCreateKeyRequest applyCreateKeyRequest = 38; + optional S3CreateBucketRequest createS3BucketRequest = 41; optional S3DeleteBucketRequest deleteS3BucketRequest = 42; @@ -123,6 +128,7 @@ message OMRequest { optional MultipartUploadListPartsRequest listMultipartUploadPartsRequest = 50; optional ServiceListRequest serviceListRequest = 51; + optional MultipartInfoApplyInitiateRequest initiateMultiPartUploadApplyRequest = 52; optional hadoop.common.GetDelegationTokenRequestProto getDelegationTokenRequest = 61; optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62; @@ -555,6 +561,11 @@ message CreateKeyResponse { optional uint64 openVersion = 4; } +message ApplyCreateKeyRequest { + required CreateKeyRequest createKeyRequest = 1; + required CreateKeyResponse createKeyResponse = 2; +} + message LookupKeyRequest { required KeyArgs keyArgs = 1; } @@ -722,6 +733,11 @@ message MultipartInfoInitiateRequest { required KeyArgs keyArgs = 1; } +message MultipartInfoApplyInitiateRequest { + required KeyArgs keyArgs = 1; + required string multipartUploadID = 2; +} + message MultipartInfoInitiateResponse { required string volumeName = 1; required string bucketName = 2; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java index 86a83b78c8..f565ad0961 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -18,14 +18,21 @@ import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdfs.LogVerificationAppender; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.OzoneVolume; @@ -42,7 +49,9 @@ import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl @@ -120,6 +129,7 @@ public void shutdown() { @Test public void testAllOMNodesRunning() throws Exception { createVolumeTest(true); + createKeyTest(true); } /** @@ -131,6 +141,8 @@ public void testOneOMNodeDown() throws Exception { Thread.sleep(NODE_FAILURE_TIMEOUT * 2); createVolumeTest(true); + + createKeyTest(true); } /** @@ -143,8 +155,181 @@ public void testTwoOMNodesDown() throws Exception { Thread.sleep(NODE_FAILURE_TIMEOUT * 2); createVolumeTest(false); + + createKeyTest(false); + } + private OzoneBucket setupBucket() throws Exception { + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + + VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() + .setOwner(userName) + .setAdmin(adminName) + .build(); + + objectStore.createVolume(volumeName, createVolumeArgs); + OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); + + Assert.assertTrue(retVolumeinfo.getName().equals(volumeName)); + Assert.assertTrue(retVolumeinfo.getOwner().equals(userName)); + Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName)); + + String bucketName = UUID.randomUUID().toString(); + retVolumeinfo.createBucket(bucketName); + + OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName); + + Assert.assertTrue(ozoneBucket.getName().equals(bucketName)); + Assert.assertTrue(ozoneBucket.getVolumeName().equals(volumeName)); + + return ozoneBucket; + } + + @Test + public void testMultipartUpload() throws Exception { + + // Happy scenario when all OM's are up. + OzoneBucket ozoneBucket = setupBucket(); + + String keyName = UUID.randomUUID().toString(); + String uploadID = initiateMultipartUpload(ozoneBucket, keyName); + + createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID); + + } + + @Test + public void testMultipartUploadWithOneOmNodeDown() throws Exception { + + OzoneBucket ozoneBucket = setupBucket(); + + String keyName = UUID.randomUUID().toString(); + String uploadID = initiateMultipartUpload(ozoneBucket, keyName); + + // After initiate multipartupload, shutdown leader OM. + // Stop leader OM, to see when the OM leader changes + // multipart upload is happening successfully or not. + + OMFailoverProxyProvider omFailoverProxyProvider = + objectStore.getClientProxy().getOMProxyProvider(); + + // The OMFailoverProxyProvider will point to the current leader OM node. + String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); + + // Stop one of the ozone manager, to see when the OM leader changes + // multipart upload is happening successfully or not. + cluster.stopOzoneManager(leaderOMNodeId); + + + createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID); + + String newLeaderOMNodeId = + omFailoverProxyProvider.getCurrentProxyOMNodeId(); + + Assert.assertTrue(leaderOMNodeId != newLeaderOMNodeId); + } + + + private String initiateMultipartUpload(OzoneBucket ozoneBucket, + String keyName) throws Exception { + + OmMultipartInfo omMultipartInfo = + ozoneBucket.initiateMultipartUpload(keyName, + ReplicationType.RATIS, + ReplicationFactor.ONE); + + String uploadID = omMultipartInfo.getUploadID(); + Assert.assertTrue(uploadID != null); + return uploadID; + } + + private void createMultipartKeyAndReadKey(OzoneBucket ozoneBucket, + String keyName, String uploadID) throws Exception { + + String value = "random data"; + OzoneOutputStream ozoneOutputStream = ozoneBucket.createMultipartKey( + keyName, value.length(), 1, uploadID); + ozoneOutputStream.write(value.getBytes(), 0, value.length()); + ozoneOutputStream.close(); + + + Map partsMap = new HashMap<>(); + partsMap.put(1, ozoneOutputStream.getCommitUploadPartInfo().getPartName()); + OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = + ozoneBucket.completeMultipartUpload(keyName, uploadID, partsMap); + + Assert.assertTrue(omMultipartUploadCompleteInfo != null); + Assert.assertTrue(omMultipartUploadCompleteInfo.getHash() != null); + + + OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName); + + byte[] fileContent = new byte[value.getBytes().length]; + ozoneInputStream.read(fileContent); + Assert.assertEquals(value, new String(fileContent)); + } + + + private void createKeyTest(boolean checkSuccess) throws Exception { + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + + VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() + .setOwner(userName) + .setAdmin(adminName) + .build(); + + try { + objectStore.createVolume(volumeName, createVolumeArgs); + + OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); + + Assert.assertTrue(retVolumeinfo.getName().equals(volumeName)); + Assert.assertTrue(retVolumeinfo.getOwner().equals(userName)); + Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName)); + + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + retVolumeinfo.createBucket(bucketName); + + OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName); + + Assert.assertTrue(ozoneBucket.getName().equals(bucketName)); + Assert.assertTrue(ozoneBucket.getVolumeName().equals(volumeName)); + + String value = "random data"; + OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName, + value.length(), ReplicationType.STAND_ALONE, + ReplicationFactor.ONE, new HashMap<>()); + ozoneOutputStream.write(value.getBytes(), 0, value.length()); + ozoneOutputStream.close(); + + OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName); + + byte[] fileContent = new byte[value.getBytes().length]; + ozoneInputStream.read(fileContent); + Assert.assertEquals(value, new String(fileContent)); + + } catch (ConnectException | RemoteException e) { + if (!checkSuccess) { + // If the last OM to be tried by the RetryProxy is down, we would get + // ConnectException. Otherwise, we would get a RemoteException from the + // last running OM as it would fail to get a quorum. + if (e instanceof RemoteException) { + GenericTestUtils.assertExceptionContains( + "RaftRetryFailureException", e); + } + } else { + throw e; + } + } + + + } /** * Create a volume and test its attribute. */ @@ -186,6 +371,8 @@ private void createVolumeTest(boolean checkSuccess) throws Exception { } } + + /** * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the * cluster. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index e5cf20061d..0006e93fa9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -29,7 +29,12 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.fs.OzoneManagerFS; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyLocation; import org.apache.hadoop.utils.BackgroundService; import java.io.IOException; @@ -89,7 +94,7 @@ OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, * @throws IOException */ OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID, - OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException; + KeyLocation keyLocation) throws IOException; /** * Given the args of a key to put, write an open key entry to meta data. @@ -104,6 +109,19 @@ OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID, */ OpenKeySession openKey(OmKeyArgs args) throws IOException; + /** + * Add the openKey entry with given keyInfo and clientID in to openKeyTable. + * This will be called only from applyTransaction, once after calling + * applyKey in startTransaction. + * + * @param omKeyArgs + * @param keyInfo + * @param clientID + * @throws IOException + */ + void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID) + throws IOException; + /** * Look up an existing key. Return the info of the key to client side, which * DistributedStorageHandler will use to access the data on datanode. @@ -213,6 +231,17 @@ List listKeys(String volumeName, */ OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException; + /** + * Initiate multipart upload for the specified key. + * + * @param keyArgs + * @param multipartUploadID + * @return MultipartInfo + * @throws IOException + */ + OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs, + String multipartUploadID) throws IOException; + /** * Commit Multipart upload part file. * @param omKeyArgs diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 591bb00de2..83a60c0289 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -65,7 +65,12 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts; import org.apache.hadoop.ozone.om.helpers.OmPartInfo; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyLocation; import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -233,7 +238,7 @@ private void validateS3Bucket(String volumeName, String bucketName) @Override public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID, - OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException { + KeyLocation keyLocation) throws IOException { Preconditions.checkNotNull(args); Preconditions.checkNotNull(keyLocation); @@ -518,10 +523,49 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { allocateBlock(keyInfo, new ExcludeList(), args.getDataSize()); keyInfo.appendNewBlocks(locationInfos); } - metadataManager.getOpenKeyTable().put(openKey, keyInfo); + + // When OM is not managed via ratis we should write in to Om db in + // openKey call. + if (!isRatisEnabled) { + metadataManager.getOpenKeyTable().put(openKey, keyInfo); + } return new OpenKeySession(currentTime, keyInfo, openVersion); } + public void applyOpenKey(KeyArgs omKeyArgs, + KeyInfo keyInfo, long clientID) throws IOException { + Preconditions.checkNotNull(omKeyArgs); + String volumeName = omKeyArgs.getVolumeName(); + String bucketName = omKeyArgs.getBucketName(); + + // Do we need to call again validateBucket, as this is just called after + // start Transaction from applyTransaction. Can we remove this double + // check? + validateBucket(volumeName, bucketName); + + metadataManager.getLock().acquireBucketLock(volumeName, bucketName); + String keyName = omKeyArgs.getKeyName(); + + // TODO: here if on OM machines clocks are skewed and there is a chance + // for override of the openKey entries. + try { + String openKey = metadataManager.getOpenKey( + volumeName, bucketName, keyName, clientID); + + OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo); + + metadataManager.getOpenKeyTable().put(openKey, + omKeyInfo); + } catch (IOException ex) { + LOG.error("Apply Open Key failed for volume:{} bucket:{} key:{}", + volumeName, bucketName, keyName, ex); + throw new OMException(ex.getMessage(), + ResultCodes.KEY_ALLOCATION_ERROR); + } finally { + metadataManager.getLock().releaseBucketLock(volumeName, bucketName); + } + } + /** * Create OmKeyInfo object. * @param keyArgs @@ -826,17 +870,22 @@ public BackgroundService getDeletingService() { @Override public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws IOException { - Preconditions.checkNotNull(omKeyArgs); - String volumeName = omKeyArgs.getVolumeName(); - String bucketName = omKeyArgs.getBucketName(); - String keyName = omKeyArgs.getKeyName(); + long time = Time.monotonicNowNanos(); + String uploadID = UUID.randomUUID().toString() + "-" + time; + return applyInitiateMultipartUpload(omKeyArgs, uploadID); + } + + public OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs, + String multipartUploadID) throws IOException { + Preconditions.checkNotNull(keyArgs); + Preconditions.checkNotNull(multipartUploadID); + String volumeName = keyArgs.getVolumeName(); + String bucketName = keyArgs.getBucketName(); + String keyName = keyArgs.getKeyName(); metadataManager.getLock().acquireBucketLock(volumeName, bucketName); validateS3Bucket(volumeName, bucketName); try { - long time = Time.monotonicNowNanos(); - String uploadID = UUID.randomUUID().toString() + "-" + Long.toString( - time); // We are adding uploadId to key, because if multiple users try to // perform multipart upload on the same key, each will try to upload, who @@ -852,24 +901,24 @@ public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws // new uploadId is returned. String multipartKey = metadataManager.getMultipartKey(volumeName, - bucketName, keyName, uploadID); + bucketName, keyName, multipartUploadID); // Not checking if there is an already key for this in the keyTable, as // during final complete multipart upload we take care of this. Map partKeyInfoMap = new HashMap<>(); - OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(uploadID, - partKeyInfoMap); + OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo( + multipartUploadID, partKeyInfoMap); List locations = new ArrayList<>(); OmKeyInfo omKeyInfo = new OmKeyInfo.Builder() - .setVolumeName(omKeyArgs.getVolumeName()) - .setBucketName(omKeyArgs.getBucketName()) - .setKeyName(omKeyArgs.getKeyName()) + .setVolumeName(keyArgs.getVolumeName()) + .setBucketName(keyArgs.getBucketName()) + .setKeyName(keyArgs.getKeyName()) .setCreationTime(Time.now()) .setModificationTime(Time.now()) - .setReplicationType(omKeyArgs.getType()) - .setReplicationFactor(omKeyArgs.getFactor()) + .setReplicationType(keyArgs.getType()) + .setReplicationFactor(keyArgs.getFactor()) .setOmKeyLocationInfos(Collections.singletonList( new OmKeyLocationInfoGroup(0, locations))) .build(); @@ -882,11 +931,12 @@ public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws metadataManager.getOpenKeyTable().putWithBatch(batch, multipartKey, omKeyInfo); store.commitBatchOperation(batch); - return new OmMultipartInfo(volumeName, bucketName, keyName, uploadID); + return new OmMultipartInfo(volumeName, bucketName, keyName, + multipartUploadID); } } catch (IOException ex) { LOG.error("Initiate Multipart upload Failed for volume:{} bucket:{} " + - "key:{}", volumeName, bucketName, keyName, ex); + "key:{}", volumeName, bucketName, keyName, ex); throw new OMException(ex.getMessage(), ResultCodes.INITIATE_MULTIPART_UPLOAD_ERROR); } finally { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 8f0781f8a7..9fa297d1f9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -72,7 +72,12 @@ import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyLocation; import org.apache.hadoop.ozone.security.OzoneSecurityException; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -1985,6 +1990,51 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { } } + @Override + public void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID) + throws IOException { + // Do we need to check again Acl's for apply OpenKey call? + if(isAclEnabled) { + checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ, + omKeyArgs.getVolumeName(), omKeyArgs.getBucketName(), + omKeyArgs.getKeyName()); + } + boolean auditSuccess = true; + try { + keyManager.applyOpenKey(omKeyArgs, keyInfo, clientID); + } catch (Exception ex) { + metrics.incNumKeyAllocateFails(); + auditSuccess = false; + AUDIT.logWriteFailure(buildAuditMessageForFailure( + OMAction.APPLY_ALLOCATE_KEY, + (omKeyArgs == null) ? null : toAuditMap(omKeyArgs), ex)); + throw ex; + } finally { + if(auditSuccess){ + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + OMAction.APPLY_ALLOCATE_KEY, (omKeyArgs == null) ? null : + toAuditMap(omKeyArgs))); + } + } + } + + private Map toAuditMap(KeyArgs omKeyArgs) { + Map auditMap = new LinkedHashMap<>(); + auditMap.put(OzoneConsts.VOLUME, omKeyArgs.getVolumeName()); + auditMap.put(OzoneConsts.BUCKET, omKeyArgs.getBucketName()); + auditMap.put(OzoneConsts.KEY, omKeyArgs.getKeyName()); + auditMap.put(OzoneConsts.DATA_SIZE, + String.valueOf(omKeyArgs.getDataSize())); + auditMap.put(OzoneConsts.REPLICATION_TYPE, + omKeyArgs.hasType() ? omKeyArgs.getType().name() : null); + auditMap.put(OzoneConsts.REPLICATION_FACTOR, + omKeyArgs.hasFactor() ? omKeyArgs.getFactor().name() : null); + auditMap.put(OzoneConsts.KEY_LOCATION_INFO, + (omKeyArgs.getKeyLocationsList() != null) ? + omKeyArgs.getKeyLocationsList().toString() : null); + return auditMap; + } + @Override public void commitKey(OmKeyArgs args, long clientID) throws IOException { @@ -2474,6 +2524,28 @@ public List listS3Buckets(String userName, String startKey, } } + + @Override + public OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs keyArgs, + String multipartUploadID) throws IOException { + OmMultipartInfo multipartInfo; + metrics.incNumInitiateMultipartUploads(); + try { + multipartInfo = keyManager.applyInitiateMultipartUpload(keyArgs, + multipartUploadID); + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + OMAction.INITIATE_MULTIPART_UPLOAD, (keyArgs == null) ? null : + keyArgs.toAuditMap())); + } catch (IOException ex) { + AUDIT.logWriteFailure(buildAuditMessageForFailure( + OMAction.INITIATE_MULTIPART_UPLOAD, + (keyArgs == null) ? null : keyArgs.toAuditMap(), ex)); + metrics.incNumInitiateMultipartUploadFails(); + throw ex; + } + return multipartInfo; + } + @Override public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java index c9c48a4422..cd99cd1fab 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java @@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -40,6 +41,7 @@ import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftRetryFailureException; +import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.retry.RetryPolicies; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.RpcType; @@ -49,6 +51,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE; + /** * OM Ratis client to interact with OM Ratis server endpoint. */ @@ -128,10 +132,31 @@ public OMResponse sendCommand(OMRequest request) throws ServiceException { CompletableFuture reply = sendCommandAsync(request); return reply.get(); } catch (ExecutionException | InterruptedException e) { + if (e.getCause() instanceof StateMachineException) { + OMResponse.Builder omResponse = OMResponse.newBuilder(); + omResponse.setCmdType(request.getCmdType()); + omResponse.setSuccess(false); + omResponse.setMessage(e.getCause().getMessage()); + omResponse.setStatus(parseErrorStatus(e.getCause().getMessage())); + return omResponse.build(); + } throw new ServiceException(e); } } + private OzoneManagerProtocolProtos.Status parseErrorStatus(String message) { + if (message.contains(STATUS_CODE)) { + String errorCode = message.substring(message.indexOf(STATUS_CODE) + + STATUS_CODE.length()); + LOG.debug("Parsing error message for error code " + + errorCode); + return OzoneManagerProtocolProtos.Status.valueOf(errorCode.trim()); + } else { + return OzoneManagerProtocolProtos.Status.INTERNAL_ERROR; + } + + } + /** * Sends a given command to server gets a waitable future back. * diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 590b359d16..420ffb5a79 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -22,6 +22,7 @@ import com.google.protobuf.ServiceException; import java.io.IOException; import java.util.Collection; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.ozone.container.common.transport.server.ratis .ContainerStateMachine; @@ -29,12 +30,15 @@ import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartInfoApplyInitiateRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMResponse; import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler; import org.apache.hadoop.ozone.protocolPB.RequestHandler; +import org.apache.hadoop.util.Time; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; @@ -48,6 +52,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE; + /** * The OM StateMachine is the state machine for OM Ratis server. It is * responsible for applying ratis committed transactions to @@ -108,11 +114,67 @@ public TransactionContext startTransaction( ctxt.setException(ioe); return ctxt; } + return handleStartTransactionRequests(raftClientRequest, omRequest); - if (omRequest.getCmdType() == - OzoneManagerProtocolProtos.Type.AllocateBlock) { + } + + /** + * Handle the RaftClientRequest and return TransactionContext object. + * @param raftClientRequest + * @param omRequest + * @return TransactionContext + */ + private TransactionContext handleStartTransactionRequests( + RaftClientRequest raftClientRequest, OMRequest omRequest) { + + switch (omRequest.getCmdType()) { + case AllocateBlock: return handleAllocateBlock(raftClientRequest, omRequest); + case CreateKey: + return handleCreateKeyRequest(raftClientRequest, omRequest); + case InitiateMultiPartUpload: + return handleInitiateMultipartUpload(raftClientRequest, omRequest); + default: + return TransactionContext.newBuilder() + .setClientRequest(raftClientRequest) + .setStateMachine(this) + .setServerRole(RaftProtos.RaftPeerRole.LEADER) + .setLogData(raftClientRequest.getMessage().getContent()) + .build(); } + + } + + + private TransactionContext handleInitiateMultipartUpload( + RaftClientRequest raftClientRequest, OMRequest omRequest) { + + // Generate a multipart uploadID, and create a new request. + // When applyTransaction happen's all OM's use the same multipartUploadID + // for the key. + + long time = Time.monotonicNowNanos(); + String multipartUploadID = UUID.randomUUID().toString() + "-" + time; + + MultipartInfoApplyInitiateRequest multipartInfoApplyInitiateRequest = + MultipartInfoApplyInitiateRequest.newBuilder() + .setKeyArgs(omRequest.getInitiateMultiPartUploadRequest() + .getKeyArgs()).setMultipartUploadID(multipartUploadID).build(); + + OMRequest.Builder newOmRequest = + OMRequest.newBuilder().setCmdType( + OzoneManagerProtocolProtos.Type.ApplyInitiateMultiPartUpload) + .setInitiateMultiPartUploadApplyRequest( + multipartInfoApplyInitiateRequest) + .setClientId(omRequest.getClientId()); + + if (omRequest.hasTraceID()) { + newOmRequest.setTraceID(omRequest.getTraceID()); + } + + ByteString messageContent = + ByteString.copyFrom(newOmRequest.build().toByteArray()); + return TransactionContext.newBuilder() .setClientRequest(raftClientRequest) .setStateMachine(this) @@ -121,6 +183,61 @@ public TransactionContext startTransaction( .build(); } + /** + * Handle createKey Request, which needs a special handling. This request + * needs to be executed on the leader, and the response received from this + * request we need to create a ApplyKeyRequest and create a + * TransactionContext object. + */ + private TransactionContext handleCreateKeyRequest( + RaftClientRequest raftClientRequest, OMRequest omRequest) { + OMResponse omResponse = handler.handle(omRequest); + + // TODO: if not success should we retry depending on the error if it is + // retriable? + if (!omResponse.getSuccess()) { + TransactionContext transactionContext = TransactionContext.newBuilder() + .setClientRequest(raftClientRequest) + .setStateMachine(this) + .setServerRole(RaftProtos.RaftPeerRole.LEADER) + .build(); + transactionContext.setException( + constructExceptionForFailedRequest(omResponse)); + return transactionContext; + } + + // Get original request + OzoneManagerProtocolProtos.CreateKeyRequest createKeyRequest = + omRequest.getCreateKeyRequest(); + + // Create Applykey Request. + OzoneManagerProtocolProtos.ApplyCreateKeyRequest applyCreateKeyRequest = + OzoneManagerProtocolProtos.ApplyCreateKeyRequest.newBuilder() + .setCreateKeyRequest(createKeyRequest) + .setCreateKeyResponse(omResponse.getCreateKeyResponse()).build(); + + OMRequest.Builder newOmRequest = + OMRequest.newBuilder().setCmdType( + OzoneManagerProtocolProtos.Type.ApplyCreateKey) + .setApplyCreateKeyRequest(applyCreateKeyRequest) + .setClientId(omRequest.getClientId()); + + if (omRequest.hasTraceID()) { + newOmRequest.setTraceID(omRequest.getTraceID()); + } + + ByteString messageContent = + ByteString.copyFrom(newOmRequest.build().toByteArray()); + + return TransactionContext.newBuilder() + .setClientRequest(raftClientRequest) + .setStateMachine(this) + .setServerRole(RaftProtos.RaftPeerRole.LEADER) + .setLogData(messageContent) + .build(); + } + + /** * Handle AllocateBlock Request, which needs a special handling. This * request needs to be executed on the leader, where it connects to SCM and @@ -148,9 +265,8 @@ private TransactionContext handleAllocateBlock( .setStateMachine(this) .setServerRole(RaftProtos.RaftPeerRole.LEADER) .build(); - IOException ioe = new IOException(omResponse.getMessage() + - " Status code " + omResponse.getStatus()); - transactionContext.setException(ioe); + transactionContext.setException( + constructExceptionForFailedRequest(omResponse)); return transactionContext; } @@ -181,6 +297,17 @@ private TransactionContext handleAllocateBlock( } + /** + * Construct IOException message for failed requests in StartTransaction. + * @param omResponse + * @return + */ + private IOException constructExceptionForFailedRequest( + OMResponse omResponse) { + return new IOException(omResponse.getMessage() + " " + + STATUS_CODE + omResponse.getStatus()); + } + /* * Apply a committed log entry to the state machine. */ diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java index 3fccf1b686..7660ed1346 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java @@ -47,6 +47,10 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .ApplyCreateKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .MultipartInfoApplyInitiateRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelDelegationTokenResponseProto; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessResponse; @@ -205,6 +209,11 @@ public OMResponse handle(OMRequest request) { request.getCreateKeyRequest()); responseBuilder.setCreateKeyResponse(createKeyResponse); break; + case ApplyCreateKey: + CreateKeyResponse applyKeyResponse = + applyCreateKey(request.getApplyCreateKeyRequest()); + responseBuilder.setCreateKeyResponse(applyKeyResponse); + break; case LookupKey: LookupKeyResponse lookupKeyResponse = lookupKey( request.getLookupKeyRequest()); @@ -262,6 +271,13 @@ public OMResponse handle(OMRequest request) { responseBuilder.setInitiateMultiPartUploadResponse( multipartInfoInitiateResponse); break; + case ApplyInitiateMultiPartUpload: + MultipartInfoInitiateResponse response = + applyInitiateMultiPartUpload( + request.getInitiateMultiPartUploadApplyRequest()); + responseBuilder.setInitiateMultiPartUploadResponse( + response); + break; case CommitMultiPartUpload: MultipartCommitUploadPartResponse commitUploadPartResponse = commitMultipartUploadPart( @@ -498,6 +514,20 @@ private CreateKeyResponse createKey(CreateKeyRequest request) return resp.build(); } + private CreateKeyResponse applyCreateKey(ApplyCreateKeyRequest request) + throws IOException { + + CreateKeyRequest createKeyRequest = request.getCreateKeyRequest(); + CreateKeyResponse createKeyResponse = request.getCreateKeyResponse(); + + impl.applyOpenKey(createKeyRequest.getKeyArgs(), + createKeyResponse.getKeyInfo(), createKeyResponse.getID()); + + // If applying to om DB successful just return createKeyResponse. + return createKeyResponse; + + } + private LookupKeyResponse lookupKey(LookupKeyRequest request) throws IOException { LookupKeyResponse.Builder resp = @@ -731,6 +761,30 @@ private MultipartInfoInitiateResponse initiateMultiPartUpload( return resp.build(); } + private MultipartInfoInitiateResponse applyInitiateMultiPartUpload( + MultipartInfoApplyInitiateRequest request) throws IOException { + MultipartInfoInitiateResponse.Builder resp = MultipartInfoInitiateResponse + .newBuilder(); + + KeyArgs keyArgs = request.getKeyArgs(); + OmKeyArgs omKeyArgs = new OmKeyArgs.Builder() + .setVolumeName(keyArgs.getVolumeName()) + .setBucketName(keyArgs.getBucketName()) + .setKeyName(keyArgs.getKeyName()) + .setType(keyArgs.getType()) + .setFactor(keyArgs.getFactor()) + .build(); + OmMultipartInfo multipartInfo = + impl.applyInitiateMultipartUpload(omKeyArgs, + request.getMultipartUploadID()); + resp.setVolumeName(multipartInfo.getVolumeName()); + resp.setBucketName(multipartInfo.getBucketName()); + resp.setKeyName(multipartInfo.getKeyName()); + resp.setMultipartUploadID(multipartInfo.getUploadID()); + + return resp.build(); + } + private MultipartCommitUploadPartResponse commitMultipartUploadPart( MultipartCommitUploadPartRequest request) throws IOException { MultipartCommitUploadPartResponse.Builder resp = diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java index 4406af6722..9613582fb7 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java @@ -251,7 +251,7 @@ public void testAllocateBlockWithFailure() throws Exception{ // As the request failed, check for keyLocation and the transaction // context error message Assert.assertFalse(newOmRequest.getAllocateBlockRequest().hasKeyLocation()); - Assert.assertEquals("Scm in Chill mode Status code " + Assert.assertEquals("Scm in Chill mode " + OMException.STATUS_CODE + OMException.ResultCodes.SCM_IN_CHILL_MODE, transactionContext.getException().getMessage()); Assert.assertTrue(transactionContext.getException() instanceof IOException);