HDDS-1379. Convert all OM Volume related operations to HA model. (#689)

This commit is contained in:
Bharat Viswanadham 2019-04-04 21:09:50 -07:00 committed by GitHub
parent 67020f0950
commit 77fe51e136
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 925 additions and 120 deletions

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.helpers;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.VolumeList;
/**
* OM response for delete volume request for a ozone volume.
*/
public class OmDeleteVolumeResponse {
private String volume;
private String owner;
private VolumeList updatedVolumeList;
public OmDeleteVolumeResponse(String volume, String owner,
VolumeList updatedVolumeList) {
this.volume = volume;
this.owner = owner;
this.updatedVolumeList = updatedVolumeList;
}
public String getVolume() {
return volume;
}
public String getOwner() {
return owner;
}
public VolumeList getUpdatedVolumeList() {
return updatedVolumeList;
}
}

View File

@ -36,10 +36,10 @@
*/ */
public final class OmVolumeArgs extends WithMetadata implements Auditable { public final class OmVolumeArgs extends WithMetadata implements Auditable {
private final String adminName; private final String adminName;
private final String ownerName; private String ownerName;
private final String volume; private final String volume;
private final long creationTime; private long creationTime;
private final long quotaInBytes; private long quotaInBytes;
private final OmOzoneAclMap aclMap; private final OmOzoneAclMap aclMap;
/** /**
@ -64,6 +64,19 @@ private OmVolumeArgs(String adminName, String ownerName, String volume,
this.creationTime = creationTime; this.creationTime = creationTime;
} }
public void setOwnerName(String newOwner) {
this.ownerName = newOwner;
}
public void setQuotaInBytes(long quotaInBytes) {
this.quotaInBytes = quotaInBytes;
}
public void setCreationTime(long time) {
this.creationTime = time;
}
/** /**
* Returns the Admin Name. * Returns the Admin Name.
* @return String. * @return String.

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om.helpers;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.VolumeList;
/**
* OM response for owner change request for a ozone volume.
*/
public class OmVolumeOwnerChangeResponse {
private VolumeList originalOwnerVolumeList;
private VolumeList newOwnerVolumeList;
private OmVolumeArgs newOwnerVolumeArgs;
private String originalOwner;
public OmVolumeOwnerChangeResponse(VolumeList originalOwnerVolumeList,
VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs,
String originalOwner) {
this.originalOwnerVolumeList = originalOwnerVolumeList;
this.newOwnerVolumeList = newOwnerVolumeList;
this.newOwnerVolumeArgs = newOwnerVolumeArgs;
this.originalOwner = originalOwner;
}
public String getOriginalOwner() {
return originalOwner;
}
public VolumeList getOriginalOwnerVolumeList() {
return originalOwnerVolumeList;
}
public VolumeList getNewOwnerVolumeList() {
return newOwnerVolumeList;
}
public OmVolumeArgs getNewOwnerVolumeArgs() {
return newOwnerVolumeArgs;
}
}

View File

@ -18,15 +18,20 @@
package org.apache.hadoop.ozone.om.protocol; package org.apache.hadoop.ozone.om.protocol;
import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs; .KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyInfo; .KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation; .KeyLocation;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.VolumeList;
import java.io.IOException; import java.io.IOException;
@ -88,4 +93,77 @@ void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
*/ */
OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs, OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs,
String multipartUploadID) throws IOException; String multipartUploadID) throws IOException;
/**
* Start Create Volume Transaction.
* @param omVolumeArgs
* @return VolumeList
* @throws IOException
*/
VolumeList startCreateVolume(OmVolumeArgs omVolumeArgs) throws IOException;
/**
* Apply Create Volume changes to OM DB.
* @param omVolumeArgs
* @param volumeList
* @throws IOException
*/
void applyCreateVolume(OmVolumeArgs omVolumeArgs,
VolumeList volumeList) throws IOException;
/**
* Start setOwner Transaction.
* @param volume
* @param owner
* @return OmVolumeOwnerChangeResponse
* @throws IOException
*/
OmVolumeOwnerChangeResponse startSetOwner(String volume,
String owner) throws IOException;
/**
* Apply Set Quota changes to OM DB.
* @param oldOwner
* @param oldOwnerVolumeList
* @param newOwnerVolumeList
* @param newOwnerVolumeArgs
* @throws IOException
*/
void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs)
throws IOException;
/**
* Start Set Quota Transaction.
* @param volume
* @param quota
* @return OmVolumeArgs
* @throws IOException
*/
OmVolumeArgs startSetQuota(String volume, long quota) throws IOException;
/**
* Apply Set Quota Changes to OM DB.
* @param omVolumeArgs
* @throws IOException
*/
void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException;
/**
* Start Delete Volume Transaction.
* @param volume
* @return OmDeleteVolumeResponse
* @throws IOException
*/
OmDeleteVolumeResponse startDeleteVolume(String volume) throws IOException;
/**
* Apply Delete Volume changes to OM DB.
* @param volume
* @param owner
* @param newVolumeList
* @throws IOException
*/
void applyDeleteVolume(String volume, String owner,
VolumeList newVolumeList) throws IOException;
} }

View File

@ -273,6 +273,7 @@ message VolumeInfo {
*/ */
message CreateVolumeRequest { message CreateVolumeRequest {
required VolumeInfo volumeInfo = 1; required VolumeInfo volumeInfo = 1;
optional VolumeList volumeList = 2;
} }
message CreateVolumeResponse { message CreateVolumeResponse {
@ -290,6 +291,10 @@ message SetVolumePropertyRequest {
required string volumeName = 1; required string volumeName = 1;
optional string ownerName = 2; optional string ownerName = 2;
optional uint64 quotaInBytes = 3; optional uint64 quotaInBytes = 3;
optional string originalOwner = 4;
optional VolumeList oldOwnerVolumeList = 5;
optional VolumeList newOwnerVolumeList = 6;
optional VolumeInfo volumeInfo = 7;
} }
message SetVolumePropertyResponse { message SetVolumePropertyResponse {
@ -326,6 +331,8 @@ message InfoVolumeResponse {
*/ */
message DeleteVolumeRequest { message DeleteVolumeRequest {
required string volumeName = 1; required string volumeName = 1;
optional string owner = 2;
optional VolumeList volumeList = 3;
} }
message DeleteVolumeResponse { message DeleteVolumeResponse {

View File

@ -87,11 +87,11 @@ public void testVolumeOps() throws IOException {
ozoneManager, "volumeManager"); ozoneManager, "volumeManager");
VolumeManager mockVm = Mockito.spy(volumeManager); VolumeManager mockVm = Mockito.spy(volumeManager);
Mockito.doNothing().when(mockVm).createVolume(null); Mockito.doReturn(null).when(mockVm).createVolume(null);
Mockito.doNothing().when(mockVm).deleteVolume(null); Mockito.doReturn(null).when(mockVm).deleteVolume(null);
Mockito.doReturn(null).when(mockVm).getVolumeInfo(null); Mockito.doReturn(null).when(mockVm).getVolumeInfo(null);
Mockito.doReturn(true).when(mockVm).checkVolumeAccess(null, null); Mockito.doReturn(true).when(mockVm).checkVolumeAccess(null, null);
Mockito.doNothing().when(mockVm).setOwner(null, null); Mockito.doReturn(null).when(mockVm).setOwner(null, null);
Mockito.doReturn(null).when(mockVm).listVolumes(null, null, null, 0); Mockito.doReturn(null).when(mockVm).listVolumes(null, null, null, 0);
HddsWhiteboxTestUtils.setInternalState( HddsWhiteboxTestUtils.setInternalState(

View File

@ -24,11 +24,13 @@
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.OzoneTestUtils;
import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@ -123,6 +125,33 @@ public void shutdown() {
} }
} }
@Test
public void testAllVolumeOperations() throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner(userName)
.setAdmin(adminName)
.build();
objectStore.createVolume(volumeName, createVolumeArgs);
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
objectStore.deleteVolume(volumeName);
OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND,
() -> objectStore.getVolume(volumeName));
OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND,
() -> objectStore.deleteVolume(volumeName));
}
/** /**
* Test a client request when all OM nodes are running. The request should * Test a client request when all OM nodes are running. The request should
* succeed. * succeed.

View File

@ -71,6 +71,8 @@
import org.apache.hadoop.ozone.OzoneIllegalArgumentException; import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@ -79,6 +81,8 @@
.KeyInfo; .KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyLocation; .KeyLocation;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.VolumeList;
import org.apache.hadoop.ozone.security.OzoneSecurityException; import org.apache.hadoop.ozone.security.OzoneSecurityException;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -1630,6 +1634,79 @@ public void createVolume(OmVolumeArgs args) throws IOException {
} }
} }
@Override
public VolumeList startCreateVolume(OmVolumeArgs args) throws IOException {
// TODO: Need to add metrics and Audit log for HA requests
if(isAclEnabled) {
checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.CREATE,
args.getVolume(), null, null);
}
VolumeList volumeList = volumeManager.createVolume(args);
return volumeList;
}
public void applyCreateVolume(OmVolumeArgs omVolumeArgs,
VolumeList volumeList) throws IOException {
// TODO: Need to add metrics and Audit log for HA requests
volumeManager.applyCreateVolume(omVolumeArgs, volumeList);
}
@Override
public OmVolumeOwnerChangeResponse startSetOwner(String volume,
String owner) throws IOException {
// TODO: Need to add metrics and Audit log for HA requests
if (isAclEnabled) {
checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.WRITE_ACL, volume,
null, null);
}
return volumeManager.setOwner(volume, owner);
}
@Override
public void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs)
throws IOException {
// TODO: Need to add metrics and Audit log for HA requests
volumeManager.applySetOwner(oldOwner, oldOwnerVolumeList,
newOwnerVolumeList, newOwnerVolumeArgs);
}
@Override
public OmVolumeArgs startSetQuota(String volume, long quota)
throws IOException {
// TODO: Need to add metrics and Audit log for HA requests
if (isAclEnabled) {
checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.WRITE_ACL, volume,
null, null);
}
return volumeManager.setQuota(volume, quota);
}
@Override
public void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException {
// TODO: Need to add metrics and Audit log for HA requests
volumeManager.applySetQuota(omVolumeArgs);
}
@Override
public OmDeleteVolumeResponse startDeleteVolume(String volume)
throws IOException {
if(isAclEnabled) {
checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.DELETE, volume,
null, null);
}
// TODO: Need to add metrics and Audit log for HA requests
return volumeManager.deleteVolume(volume);
}
@Override
public void applyDeleteVolume(String volume, String owner,
VolumeList newVolumeList) throws IOException {
// TODO: Need to add metrics and Audit log for HA requests
volumeManager.applyDeleteVolume(volume, owner, newVolumeList);
}
/** /**
* Checks if current caller has acl permissions. * Checks if current caller has acl permissions.
* *

View File

@ -50,6 +50,7 @@ public class S3BucketManagerImpl implements S3BucketManager {
private final OMMetadataManager omMetadataManager; private final OMMetadataManager omMetadataManager;
private final VolumeManager volumeManager; private final VolumeManager volumeManager;
private final BucketManager bucketManager; private final BucketManager bucketManager;
private final boolean isRatisEnabled;
/** /**
* Construct an S3 Bucket Manager Object. * Construct an S3 Bucket Manager Object.
@ -66,6 +67,9 @@ public S3BucketManagerImpl(
this.omMetadataManager = omMetadataManager; this.omMetadataManager = omMetadataManager;
this.volumeManager = volumeManager; this.volumeManager = volumeManager;
this.bucketManager = bucketManager; this.bucketManager = bucketManager;
isRatisEnabled = configuration.getBoolean(
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
} }
@Override @Override
@ -166,7 +170,12 @@ public boolean createOzoneVolumeIfNeeded(String userName)
.setVolume(ozoneVolumeName) .setVolume(ozoneVolumeName)
.setQuotaInBytes(OzoneConsts.MAX_QUOTA_IN_BYTES) .setQuotaInBytes(OzoneConsts.MAX_QUOTA_IN_BYTES)
.build(); .build();
volumeManager.createVolume(args); if (isRatisEnabled) {
// When ratis is enabled we need to call apply also.
volumeManager.applyCreateVolume(args, volumeManager.createVolume(args));
} else {
volumeManager.createVolume(args);
}
} catch (OMException exp) { } catch (OMException exp) {
newVolumeCreate = false; newVolumeCreate = false;
if (exp.getResult().compareTo(VOLUME_ALREADY_EXISTS) == 0) { if (exp.getResult().compareTo(VOLUME_ALREADY_EXISTS) == 0) {

View File

@ -16,9 +16,13 @@
*/ */
package org.apache.hadoop.ozone.om; package org.apache.hadoop.ozone.om;
import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.OzoneAclInfo; .OzoneManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.VolumeList;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -32,7 +36,17 @@ public interface VolumeManager {
* Create a new volume. * Create a new volume.
* @param args - Volume args to create a volume * @param args - Volume args to create a volume
*/ */
void createVolume(OmVolumeArgs args) throws IOException; VolumeList createVolume(OmVolumeArgs args)
throws IOException;
/**
* Apply Create Volume changes to OM DB.
* @param omVolumeArgs
* @param volumeList
* @throws IOException
*/
void applyCreateVolume(OmVolumeArgs omVolumeArgs,
VolumeList volumeList) throws IOException;
/** /**
* Changes the owner of a volume. * Changes the owner of a volume.
@ -41,7 +55,20 @@ public interface VolumeManager {
* @param owner - Name of the owner. * @param owner - Name of the owner.
* @throws IOException * @throws IOException
*/ */
void setOwner(String volume, String owner) throws IOException; OmVolumeOwnerChangeResponse setOwner(String volume, String owner)
throws IOException;
/**
* Apply Set Owner changes to OM DB.
* @param oldOwner
* @param oldOwnerVolumeList
* @param newOwnerVolumeList
* @param newOwnerVolumeArgs
* @throws IOException
*/
void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs)
throws IOException;
/** /**
* Changes the Quota on a volume. * Changes the Quota on a volume.
@ -50,7 +77,14 @@ public interface VolumeManager {
* @param quota - Quota in bytes. * @param quota - Quota in bytes.
* @throws IOException * @throws IOException
*/ */
void setQuota(String volume, long quota) throws IOException; OmVolumeArgs setQuota(String volume, long quota) throws IOException;
/**
* Apply Set Quota changes to OM DB.
* @param omVolumeArgs
* @throws IOException
*/
void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException;
/** /**
* Gets the volume information. * Gets the volume information.
@ -66,7 +100,17 @@ public interface VolumeManager {
* @param volume - Name of the volume. * @param volume - Name of the volume.
* @throws IOException * @throws IOException
*/ */
void deleteVolume(String volume) throws IOException; OmDeleteVolumeResponse deleteVolume(String volume) throws IOException;
/**
* Apply Delete Volume changes to OM DB.
* @param volume
* @param owner
* @param newVolumeList
* @throws IOException
*/
void applyDeleteVolume(String volume, String owner,
VolumeList newVolumeList) throws IOException;
/** /**
* Checks if the specified user with a role can access this volume. * Checks if the specified user with a role can access this volume.

View File

@ -22,17 +22,18 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
import org.apache.hadoop.utils.RocksDBStore;
import org.apache.hadoop.utils.db.BatchOperation; import org.apache.hadoop.utils.db.BatchOperation;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -45,6 +46,7 @@ public class VolumeManagerImpl implements VolumeManager {
private final OMMetadataManager metadataManager; private final OMMetadataManager metadataManager;
private final int maxUserVolumeCount; private final int maxUserVolumeCount;
private final boolean isRatisEnabled;
/** /**
* Constructor. * Constructor.
@ -52,15 +54,18 @@ public class VolumeManagerImpl implements VolumeManager {
* @throws IOException * @throws IOException
*/ */
public VolumeManagerImpl(OMMetadataManager metadataManager, public VolumeManagerImpl(OMMetadataManager metadataManager,
OzoneConfiguration conf) throws IOException { OzoneConfiguration conf) {
this.metadataManager = metadataManager; this.metadataManager = metadataManager;
this.maxUserVolumeCount = conf.getInt(OZONE_OM_USER_MAX_VOLUME, this.maxUserVolumeCount = conf.getInt(OZONE_OM_USER_MAX_VOLUME,
OZONE_OM_USER_MAX_VOLUME_DEFAULT); OZONE_OM_USER_MAX_VOLUME_DEFAULT);
isRatisEnabled = conf.getBoolean(
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
} }
// Helpers to add and delete volume from user list // Helpers to add and delete volume from user list
private void addVolumeToOwnerList(String volume, String owner, private VolumeList addVolumeToOwnerList(String volume, String owner)
BatchOperation batchOperation) throws IOException { throws IOException {
// Get the volume list // Get the volume list
String dbUserKey = metadataManager.getUserKey(owner); String dbUserKey = metadataManager.getUserKey(owner);
VolumeList volumeList = metadataManager.getUserTable().get(dbUserKey); VolumeList volumeList = metadataManager.getUserTable().get(dbUserKey);
@ -72,22 +77,22 @@ private void addVolumeToOwnerList(String volume, String owner,
// Check the volume count // Check the volume count
if (prevVolList.size() >= maxUserVolumeCount) { if (prevVolList.size() >= maxUserVolumeCount) {
LOG.debug("Too many volumes for user:{}", owner); LOG.debug("Too many volumes for user:{}", owner);
throw new OMException(ResultCodes.USER_TOO_MANY_VOLUMES); throw new OMException("Too many volumes for user:" + owner,
ResultCodes.USER_TOO_MANY_VOLUMES);
} }
// Add the new volume to the list // Add the new volume to the list
prevVolList.add(volume); prevVolList.add(volume);
VolumeList newVolList = VolumeList.newBuilder() VolumeList newVolList = VolumeList.newBuilder()
.addAllVolumeNames(prevVolList).build(); .addAllVolumeNames(prevVolList).build();
metadataManager.getUserTable().putWithBatch(batchOperation,
dbUserKey, newVolList); return newVolList;
} }
private void delVolumeFromOwnerList(String volume, String owner, private VolumeList delVolumeFromOwnerList(String volume, String owner)
BatchOperation batch) throws RocksDBException, IOException { throws IOException {
// Get the volume list // Get the volume list
String dbUserKey = metadataManager.getUserKey(owner); VolumeList volumeList = metadataManager.getUserTable().get(owner);
VolumeList volumeList = metadataManager.getUserTable().get(dbUserKey);
List<String> prevVolList = new ArrayList<>(); List<String> prevVolList = new ArrayList<>();
if (volumeList != null) { if (volumeList != null) {
prevVolList.addAll(volumeList.getVolumeNamesList()); prevVolList.addAll(volumeList.getVolumeNamesList());
@ -98,58 +103,90 @@ private void delVolumeFromOwnerList(String volume, String owner,
// Remove the volume from the list // Remove the volume from the list
prevVolList.remove(volume); prevVolList.remove(volume);
if (prevVolList.size() == 0) { VolumeList newVolList = VolumeList.newBuilder()
metadataManager.getUserTable().deleteWithBatch(batch, dbUserKey); .addAllVolumeNames(prevVolList).build();
} else { return newVolList;
VolumeList newVolList = VolumeList.newBuilder()
.addAllVolumeNames(prevVolList).build();
metadataManager.getUserTable().putWithBatch(batch,
dbUserKey, newVolList);
}
} }
/** /**
* Creates a volume. * Creates a volume.
* @param args - OmVolumeArgs. * @param omVolumeArgs - OmVolumeArgs.
* @return VolumeList
*/ */
@Override @Override
public void createVolume(OmVolumeArgs args) throws IOException { public VolumeList createVolume(OmVolumeArgs omVolumeArgs) throws IOException {
Preconditions.checkNotNull(args); Preconditions.checkNotNull(omVolumeArgs);
metadataManager.getLock().acquireUserLock(args.getOwnerName()); metadataManager.getLock().acquireUserLock(omVolumeArgs.getOwnerName());
metadataManager.getLock().acquireVolumeLock(args.getVolume()); metadataManager.getLock().acquireVolumeLock(omVolumeArgs.getVolume());
try { try {
String dbVolumeKey = metadataManager.getVolumeKey(args.getVolume()); String dbVolumeKey = metadataManager.getVolumeKey(
omVolumeArgs.getVolume());
String dbUserKey = metadataManager.getUserKey(
omVolumeArgs.getOwnerName());
OmVolumeArgs volumeInfo = OmVolumeArgs volumeInfo =
metadataManager.getVolumeTable().get(dbVolumeKey); metadataManager.getVolumeTable().get(dbVolumeKey);
// Check of the volume already exists // Check of the volume already exists
if (volumeInfo != null) { if (volumeInfo != null) {
LOG.debug("volume:{} already exists", args.getVolume()); LOG.debug("volume:{} already exists", omVolumeArgs.getVolume());
throw new OMException(ResultCodes.VOLUME_ALREADY_EXISTS); throw new OMException(ResultCodes.VOLUME_ALREADY_EXISTS);
} }
try (BatchOperation batch = metadataManager.getStore() VolumeList volumeList = addVolumeToOwnerList(omVolumeArgs.getVolume(),
.initBatchOperation()) { omVolumeArgs.getOwnerName());
// Write the vol info
metadataManager.getVolumeTable().putWithBatch(batch,
dbVolumeKey, args);
// Add volume to user list // Set creation time
addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch); omVolumeArgs.setCreationTime(System.currentTimeMillis());
metadataManager.getStore().commitBatchOperation(batch);
if (!isRatisEnabled) {
createVolumeCommitToDB(omVolumeArgs, volumeList, dbVolumeKey,
dbUserKey);
} }
LOG.debug("created volume:{} user:{}", args.getVolume(), LOG.debug("created volume:{} user:{}", omVolumeArgs.getVolume(),
args.getOwnerName()); omVolumeArgs.getOwnerName());
return volumeList;
} catch (IOException ex) { } catch (IOException ex) {
if (!(ex instanceof OMException)) { if (!(ex instanceof OMException)) {
LOG.error("Volume creation failed for user:{} volume:{}", LOG.error("Volume creation failed for user:{} volume:{}",
args.getOwnerName(), args.getVolume(), ex); omVolumeArgs.getOwnerName(), omVolumeArgs.getVolume(), ex);
} else {
throw (IOException) ex;
} }
throw ex;
} finally { } finally {
metadataManager.getLock().releaseVolumeLock(args.getVolume()); metadataManager.getLock().releaseVolumeLock(omVolumeArgs.getVolume());
metadataManager.getLock().releaseUserLock(args.getOwnerName()); metadataManager.getLock().releaseUserLock(omVolumeArgs.getOwnerName());
}
}
@Override
public void applyCreateVolume(OmVolumeArgs omVolumeArgs,
VolumeList volumeList) throws IOException {
// Do we need to hold lock in apply Transactions requests?
String dbVolumeKey = metadataManager.getVolumeKey(omVolumeArgs.getVolume());
String dbUserKey = metadataManager.getUserKey(omVolumeArgs.getOwnerName());
try {
createVolumeCommitToDB(omVolumeArgs, volumeList, dbVolumeKey, dbUserKey);
} catch (IOException ex) {
LOG.error("Volume creation failed for user:{} volume:{}",
omVolumeArgs.getOwnerName(), omVolumeArgs.getVolume(), ex);
throw ex;
}
}
private void createVolumeCommitToDB(OmVolumeArgs omVolumeArgs,
VolumeList volumeList, String dbVolumeKey, String dbUserKey)
throws IOException {
try (BatchOperation batch = metadataManager.getStore()
.initBatchOperation()) {
// Write the vol info
metadataManager.getVolumeTable().putWithBatch(batch, dbVolumeKey,
omVolumeArgs);
metadataManager.getUserTable().putWithBatch(batch, dbUserKey,
volumeList);
// Add volume to user list
metadataManager.getStore().commitBatchOperation(batch);
} catch (IOException ex) {
throw ex;
} }
} }
@ -161,7 +198,8 @@ public void createVolume(OmVolumeArgs args) throws IOException {
* @throws IOException * @throws IOException
*/ */
@Override @Override
public void setOwner(String volume, String owner) throws IOException { public OmVolumeOwnerChangeResponse setOwner(String volume, String owner)
throws IOException {
Preconditions.checkNotNull(volume); Preconditions.checkNotNull(volume);
Preconditions.checkNotNull(owner); Preconditions.checkNotNull(owner);
metadataManager.getLock().acquireUserLock(owner); metadataManager.getLock().acquireUserLock(owner);
@ -179,49 +217,84 @@ public void setOwner(String volume, String owner) throws IOException {
Preconditions.checkState(volume.equals(volumeArgs.getVolume())); Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
try (BatchOperation batch = metadataManager.getStore() String originalOwner =
.initBatchOperation()) { metadataManager.getUserKey(volumeArgs.getOwnerName());
delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch); VolumeList oldOwnerVolumeList = delVolumeFromOwnerList(volume,
addVolumeToOwnerList(volume, owner, batch); originalOwner);
OmVolumeArgs newVolumeArgs = String newOwner = metadataManager.getUserKey(owner);
OmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume()) VolumeList newOwnerVolumeList = addVolumeToOwnerList(volume, newOwner);
.setAdminName(volumeArgs.getAdminName())
.setOwnerName(owner)
.setQuotaInBytes(volumeArgs.getQuotaInBytes())
.setCreationTime(volumeArgs.getCreationTime())
.build();
metadataManager.getVolumeTable().putWithBatch(batch, volumeArgs.setOwnerName(owner);
dbVolumeKey, newVolumeArgs); if (!isRatisEnabled) {
metadataManager.getStore().commitBatchOperation(batch); setOwnerCommitToDB(oldOwnerVolumeList, newOwnerVolumeList,
volumeArgs, owner);
} }
} catch (RocksDBException | IOException ex) { return new OmVolumeOwnerChangeResponse(oldOwnerVolumeList,
newOwnerVolumeList, volumeArgs, originalOwner);
} catch (IOException ex) {
if (!(ex instanceof OMException)) { if (!(ex instanceof OMException)) {
LOG.error("Changing volume ownership failed for user:{} volume:{}", LOG.error("Changing volume ownership failed for user:{} volume:{}",
owner, volume, ex); owner, volume, ex);
} }
if(ex instanceof RocksDBException) { throw ex;
throw RocksDBStore.toIOException("Volume creation failed.",
(RocksDBException) ex);
} else {
throw (IOException) ex;
}
} finally { } finally {
metadataManager.getLock().releaseVolumeLock(volume); metadataManager.getLock().releaseVolumeLock(volume);
metadataManager.getLock().releaseUserLock(owner); metadataManager.getLock().releaseUserLock(owner);
} }
} }
@Override
public void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs)
throws IOException {
try {
setOwnerCommitToDB(oldOwnerVolumeList, newOwnerVolumeList,
newOwnerVolumeArgs, oldOwner);
} catch (IOException ex) {
LOG.error("Changing volume ownership failed for user:{} volume:{}",
newOwnerVolumeArgs.getOwnerName(), newOwnerVolumeArgs.getVolume(),
ex);
throw ex;
}
}
private void setOwnerCommitToDB(VolumeList oldOwnerVolumeList,
VolumeList newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs,
String oldOwner) throws IOException {
try (BatchOperation batch = metadataManager.getStore()
.initBatchOperation()) {
if (oldOwnerVolumeList.getVolumeNamesList().size() == 0) {
metadataManager.getUserTable().deleteWithBatch(batch, oldOwner);
} else {
metadataManager.getUserTable().putWithBatch(batch, oldOwner,
oldOwnerVolumeList);
}
metadataManager.getUserTable().putWithBatch(batch,
newOwnerVolumeArgs.getOwnerName(),
newOwnerVolumeList);
String dbVolumeKey =
metadataManager.getVolumeKey(newOwnerVolumeArgs.getVolume());
metadataManager.getVolumeTable().putWithBatch(batch,
dbVolumeKey, newOwnerVolumeArgs);
metadataManager.getStore().commitBatchOperation(batch);
}
}
/** /**
* Changes the Quota on a volume. * Changes the Quota on a volume.
* *
* @param volume - Name of the volume. * @param volume - Name of the volume.
* @param quota - Quota in bytes. * @param quota - Quota in bytes.
*
* @return OmVolumeArgs
* @throws IOException * @throws IOException
*/ */
@Override @Override
public void setQuota(String volume, long quota) throws IOException { public OmVolumeArgs setQuota(String volume, long quota) throws IOException {
Preconditions.checkNotNull(volume); Preconditions.checkNotNull(volume);
metadataManager.getLock().acquireVolumeLock(volume); metadataManager.getLock().acquireVolumeLock(volume);
try { try {
@ -235,15 +308,13 @@ public void setQuota(String volume, long quota) throws IOException {
Preconditions.checkState(volume.equals(volumeArgs.getVolume())); Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
OmVolumeArgs newVolumeArgs =
OmVolumeArgs.newBuilder()
.setVolume(volumeArgs.getVolume())
.setAdminName(volumeArgs.getAdminName())
.setOwnerName(volumeArgs.getOwnerName())
.setQuotaInBytes(quota)
.setCreationTime(volumeArgs.getCreationTime()).build();
metadataManager.getVolumeTable().put(dbVolumeKey, newVolumeArgs); volumeArgs.setQuotaInBytes(quota);
if (!isRatisEnabled) {
metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
}
return volumeArgs;
} catch (IOException ex) { } catch (IOException ex) {
if (!(ex instanceof OMException)) { if (!(ex instanceof OMException)) {
LOG.error("Changing volume quota failed for volume:{} quota:{}", volume, LOG.error("Changing volume quota failed for volume:{} quota:{}", volume,
@ -255,6 +326,19 @@ public void setQuota(String volume, long quota) throws IOException {
} }
} }
@Override
public void applySetQuota(OmVolumeArgs omVolumeArgs) throws IOException {
try {
String dbVolumeKey = metadataManager.getVolumeKey(
omVolumeArgs.getVolume());
metadataManager.getVolumeTable().put(dbVolumeKey, omVolumeArgs);
} catch (IOException ex) {
LOG.error("Changing volume quota failed for volume:{} quota:{}",
omVolumeArgs.getVolume(), omVolumeArgs.getQuotaInBytes(), ex);
throw ex;
}
}
/** /**
* Gets the volume information. * Gets the volume information.
* @param volume - Volume name. * @param volume - Volume name.
@ -290,10 +374,12 @@ public OmVolumeArgs getVolumeInfo(String volume) throws IOException {
* Deletes an existing empty volume. * Deletes an existing empty volume.
* *
* @param volume - Name of the volume. * @param volume - Name of the volume.
*
* @return OmDeleteVolumeResponse
* @throws IOException * @throws IOException
*/ */
@Override @Override
public void deleteVolume(String volume) throws IOException { public OmDeleteVolumeResponse deleteVolume(String volume) throws IOException {
Preconditions.checkNotNull(volume); Preconditions.checkNotNull(volume);
String owner; String owner;
metadataManager.getLock().acquireVolumeLock(volume); metadataManager.getLock().acquireVolumeLock(volume);
@ -305,7 +391,6 @@ public void deleteVolume(String volume) throws IOException {
metadataManager.getLock().acquireUserLock(owner); metadataManager.getLock().acquireUserLock(owner);
metadataManager.getLock().acquireVolumeLock(volume); metadataManager.getLock().acquireVolumeLock(volume);
try { try {
String dbVolumeKey = metadataManager.getVolumeKey(volume); String dbVolumeKey = metadataManager.getVolumeKey(volume);
OmVolumeArgs volumeArgs = OmVolumeArgs volumeArgs =
metadataManager.getVolumeTable().get(dbVolumeKey); metadataManager.getVolumeTable().get(dbVolumeKey);
@ -322,28 +407,54 @@ public void deleteVolume(String volume) throws IOException {
Preconditions.checkState(volume.equals(volumeArgs.getVolume())); Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
// delete the volume from the owner list // delete the volume from the owner list
// as well as delete the volume entry // as well as delete the volume entry
try (BatchOperation batch = metadataManager.getStore() VolumeList newVolumeList = delVolumeFromOwnerList(volume,
.initBatchOperation()) { volumeArgs.getOwnerName());
delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
metadataManager.getVolumeTable().deleteWithBatch(batch, dbVolumeKey); if (!isRatisEnabled) {
metadataManager.getStore().commitBatchOperation(batch); deleteVolumeCommitToDB(newVolumeList,
volume, owner);
} }
} catch (RocksDBException| IOException ex) { return new OmDeleteVolumeResponse(volume, owner, newVolumeList);
} catch (IOException ex) {
if (!(ex instanceof OMException)) { if (!(ex instanceof OMException)) {
LOG.error("Delete volume failed for volume:{}", volume, ex); LOG.error("Delete volume failed for volume:{}", volume, ex);
} }
if(ex instanceof RocksDBException) { throw ex;
throw RocksDBStore.toIOException("Volume creation failed.",
(RocksDBException) ex);
} else {
throw (IOException) ex;
}
} finally { } finally {
metadataManager.getLock().releaseVolumeLock(volume); metadataManager.getLock().releaseVolumeLock(volume);
metadataManager.getLock().releaseUserLock(owner); metadataManager.getLock().releaseUserLock(owner);
} }
} }
@Override
public void applyDeleteVolume(String volume, String owner,
VolumeList newVolumeList) throws IOException {
try {
deleteVolumeCommitToDB(newVolumeList, volume, owner);
} catch (IOException ex) {
LOG.error("Delete volume failed for volume:{}", volume,
ex);
throw ex;
}
}
private void deleteVolumeCommitToDB(VolumeList newVolumeList,
String volume, String owner) throws IOException {
try (BatchOperation batch = metadataManager.getStore()
.initBatchOperation()) {
String dbUserKey = metadataManager.getUserKey(owner);
if (newVolumeList.getVolumeNamesList().size() == 0) {
metadataManager.getUserTable().deleteWithBatch(batch, dbUserKey);
} else {
metadataManager.getUserTable().putWithBatch(batch, dbUserKey,
newVolumeList);
}
metadataManager.getVolumeTable().deleteWithBatch(batch,
metadataManager.getVolumeKey(volume));
metadataManager.getStore().commitBatchOperation(batch);
}
}
/** /**
* Checks if the specified user with a role can access this volume. * Checks if the specified user with a role can access this volume.
* *

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.ozone.container.common.transport.server.ratis import org.apache.hadoop.ozone.container.common.transport.server.ratis
.ContainerStateMachine; .ContainerStateMachine;
import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@ -36,8 +37,8 @@
.OMRequest; .OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse; .OMResponse;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler; import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
import org.apache.hadoop.ozone.protocolPB.RequestHandler; import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.Message;
@ -67,14 +68,14 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
new SimpleStateMachineStorage(); new SimpleStateMachineStorage();
private final OzoneManagerRatisServer omRatisServer; private final OzoneManagerRatisServer omRatisServer;
private final OzoneManagerServerProtocol ozoneManager; private final OzoneManagerServerProtocol ozoneManager;
private RequestHandler handler; private OzoneManagerHARequestHandler handler;
private RaftGroupId raftGroupId; private RaftGroupId raftGroupId;
private long lastAppliedIndex = 0; private long lastAppliedIndex = 0;
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) { public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer; this.omRatisServer = ratisServer;
this.ozoneManager = omRatisServer.getOzoneManager(); this.ozoneManager = omRatisServer.getOzoneManager();
this.handler = new OzoneManagerRequestHandler(ozoneManager); this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager);
} }
/** /**
@ -185,21 +186,53 @@ public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
private TransactionContext handleStartTransactionRequests( private TransactionContext handleStartTransactionRequests(
RaftClientRequest raftClientRequest, OMRequest omRequest) { RaftClientRequest raftClientRequest, OMRequest omRequest) {
switch (omRequest.getCmdType()) { OMRequest newOmRequest = null;
case AllocateBlock: try {
return handleAllocateBlock(raftClientRequest, omRequest); switch (omRequest.getCmdType()) {
case CreateKey: case CreateVolume:
return handleCreateKeyRequest(raftClientRequest, omRequest); case SetVolumeProperty:
case InitiateMultiPartUpload: case DeleteVolume:
return handleInitiateMultipartUpload(raftClientRequest, omRequest); newOmRequest = handler.handleStartTransaction(omRequest);
default: break;
return TransactionContext.newBuilder() case AllocateBlock:
return handleAllocateBlock(raftClientRequest, omRequest);
case CreateKey:
return handleCreateKeyRequest(raftClientRequest, omRequest);
case InitiateMultiPartUpload:
return handleInitiateMultipartUpload(raftClientRequest, omRequest);
default:
return TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(raftClientRequest.getMessage().getContent())
.build();
}
} catch (IOException ex) {
TransactionContext transactionContext = TransactionContext.newBuilder()
.setClientRequest(raftClientRequest) .setClientRequest(raftClientRequest)
.setStateMachine(this) .setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER) .setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(raftClientRequest.getMessage().getContent())
.build(); .build();
if (ex instanceof OMException) {
IOException ioException =
new IOException(ex.getMessage() + STATUS_CODE +
((OMException) ex).getResult());
transactionContext.setException(ioException);
} else {
transactionContext.setException(ex);
}
LOG.error("Exception in startTransaction for cmdType " +
omRequest.getCmdType(), ex);
return transactionContext;
} }
TransactionContext transactionContext = TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(OMRatisHelper.convertRequestToByteString(newOmRequest))
.build();
return transactionContext;
} }
private TransactionContext handleInitiateMultipartUpload( private TransactionContext handleInitiateMultipartUpload(
@ -367,7 +400,7 @@ private IOException constructExceptionForFailedRequest(
* @throws ServiceException * @throws ServiceException
*/ */
private Message runCommand(OMRequest request, long trxLogIndex) { private Message runCommand(OMRequest request, long trxLogIndex) {
OMResponse response = handler.handle(request); OMResponse response = handler.handleApplyTransaction(request);
lastAppliedIndex = trxLogIndex; lastAppliedIndex = trxLogIndex;
return OMRatisHelper.convertResponseToMessage(response); return OMRatisHelper.convertResponseToMessage(response);
} }
@ -394,7 +427,7 @@ private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
} }
@VisibleForTesting @VisibleForTesting
public void setHandler(RequestHandler handler) { public void setHandler(OzoneManagerHARequestHandler handler) {
this.handler = handler; this.handler = handler;
} }

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.protocolPB;
import java.io.IOException;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
/**
* Handler to handle OM requests in OM HA.
*/
public interface OzoneManagerHARequestHandler extends RequestHandler {
/**
* Handle start Transaction Requests from OzoneManager StateMachine.
* @param omRequest
* @return OMRequest - New OM Request which will be applied during apply
* Transaction
* @throws IOException
*/
OMRequest handleStartTransaction(OMRequest omRequest) throws IOException;
/**
* Handle Apply Transaction Requests from OzoneManager StateMachine.
* @param omRequest
* @return OMResponse
*/
OMResponse handleApplyTransaction(OMRequest omRequest);
}

View File

@ -0,0 +1,247 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.protocolPB;
import java.io.IOException;
import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.SetVolumePropertyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.SetVolumePropertyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.Type;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.VolumeList;
/**
* Command Handler for OM requests. OM State Machine calls this handler for
* deserializing the client request and sending it to OM.
*/
public class OzoneManagerHARequestHandlerImpl
extends OzoneManagerRequestHandler implements OzoneManagerHARequestHandler {
public OzoneManagerHARequestHandlerImpl(OzoneManagerServerProtocol om) {
super(om);
}
@Override
public OMRequest handleStartTransaction(OMRequest omRequest)
throws IOException {
LOG.debug("Received OMRequest: {}, ", omRequest);
Type cmdType = omRequest.getCmdType();
OMRequest newOmRequest = null;
switch (cmdType) {
case CreateVolume:
newOmRequest = handleCreateVolumeStart(omRequest);
break;
case SetVolumeProperty:
newOmRequest = handleSetVolumePropertyStart(omRequest);
break;
case DeleteVolume:
newOmRequest = handleDeleteVolumeStart(omRequest);
break;
default:
throw new IOException("Unrecognized Command Type:" + cmdType);
}
return newOmRequest;
}
@Override
public OMResponse handleApplyTransaction(OMRequest omRequest) {
LOG.debug("Received OMRequest: {}, ", omRequest);
Type cmdType = omRequest.getCmdType();
OMResponse.Builder responseBuilder =
OMResponse.newBuilder().setCmdType(cmdType)
.setStatus(Status.OK);
try {
switch (cmdType) {
case CreateVolume:
responseBuilder.setCreateVolumeResponse(
handleCreateVolumeApply(omRequest));
break;
case SetVolumeProperty:
responseBuilder.setSetVolumePropertyResponse(
handleSetVolumePropertyApply(omRequest));
break;
case DeleteVolume:
responseBuilder.setDeleteVolumeResponse(
handleDeleteVolumeApply(omRequest));
break;
default:
// As all request types are not changed so we need to call handle
// here.
return handle(omRequest);
}
responseBuilder.setSuccess(true);
} catch (IOException ex) {
responseBuilder.setSuccess(false);
responseBuilder.setStatus(exceptionToResponseStatus(ex));
if (ex.getMessage() != null) {
responseBuilder.setMessage(ex.getMessage());
}
}
return responseBuilder.build();
}
private OMRequest handleCreateVolumeStart(OMRequest omRequest)
throws IOException {
VolumeInfo volumeInfo = omRequest.getCreateVolumeRequest().getVolumeInfo();
OzoneManagerProtocolProtos.VolumeList volumeList =
getOzoneManagerServerProtocol().startCreateVolume(
OmVolumeArgs.getFromProtobuf(volumeInfo));
CreateVolumeRequest createVolumeRequest =
CreateVolumeRequest.newBuilder().setVolumeInfo(volumeInfo)
.setVolumeList(volumeList).build();
return omRequest.toBuilder().setCreateVolumeRequest(createVolumeRequest)
.build();
}
private CreateVolumeResponse handleCreateVolumeApply(OMRequest omRequest)
throws IOException {
OzoneManagerProtocolProtos.VolumeInfo volumeInfo =
omRequest.getCreateVolumeRequest().getVolumeInfo();
VolumeList volumeList =
omRequest.getCreateVolumeRequest().getVolumeList();
getOzoneManagerServerProtocol().applyCreateVolume(
OmVolumeArgs.getFromProtobuf(volumeInfo),
volumeList);
return CreateVolumeResponse.newBuilder().build();
}
private OMRequest handleSetVolumePropertyStart(OMRequest omRequest)
throws IOException {
SetVolumePropertyRequest setVolumePropertyRequest =
omRequest.getSetVolumePropertyRequest();
String volume = setVolumePropertyRequest.getVolumeName();
OMRequest newOmRequest = null;
if (setVolumePropertyRequest.hasQuotaInBytes()) {
long quota = setVolumePropertyRequest.getQuotaInBytes();
OmVolumeArgs omVolumeArgs =
getOzoneManagerServerProtocol().startSetQuota(volume, quota);
SetVolumePropertyRequest newSetVolumePropertyRequest =
SetVolumePropertyRequest.newBuilder().setVolumeName(volume)
.setVolumeInfo(omVolumeArgs.getProtobuf()).build();
newOmRequest =
omRequest.toBuilder().setSetVolumePropertyRequest(
newSetVolumePropertyRequest).build();
} else {
String owner = setVolumePropertyRequest.getOwnerName();
OmVolumeOwnerChangeResponse omVolumeOwnerChangeResponse =
getOzoneManagerServerProtocol().startSetOwner(volume, owner);
// If volumeLists become large and as ratis writes the request to disk we
// might take more space if the lists become very big in size. We might
// need to revisit this if it becomes problem
SetVolumePropertyRequest newSetVolumePropertyRequest =
SetVolumePropertyRequest.newBuilder().setVolumeName(volume)
.setOwnerName(owner)
.setOriginalOwner(omVolumeOwnerChangeResponse.getOriginalOwner())
.setNewOwnerVolumeList(
omVolumeOwnerChangeResponse.getNewOwnerVolumeList())
.setOldOwnerVolumeList(
omVolumeOwnerChangeResponse.getOriginalOwnerVolumeList())
.setVolumeInfo(
omVolumeOwnerChangeResponse.getNewOwnerVolumeArgs()
.getProtobuf()).build();
newOmRequest =
omRequest.toBuilder().setSetVolumePropertyRequest(
newSetVolumePropertyRequest).build();
}
return newOmRequest;
}
private SetVolumePropertyResponse handleSetVolumePropertyApply(
OMRequest omRequest) throws IOException {
SetVolumePropertyRequest setVolumePropertyRequest =
omRequest.getSetVolumePropertyRequest();
if (setVolumePropertyRequest.hasQuotaInBytes()) {
getOzoneManagerServerProtocol().applySetQuota(
OmVolumeArgs.getFromProtobuf(
setVolumePropertyRequest.getVolumeInfo()));
} else {
getOzoneManagerServerProtocol().applySetOwner(
setVolumePropertyRequest.getOriginalOwner(),
setVolumePropertyRequest.getOldOwnerVolumeList(),
setVolumePropertyRequest.getNewOwnerVolumeList(),
OmVolumeArgs.getFromProtobuf(
setVolumePropertyRequest.getVolumeInfo()));
}
return SetVolumePropertyResponse.newBuilder().build();
}
private OMRequest handleDeleteVolumeStart(OMRequest omRequest)
throws IOException {
DeleteVolumeRequest deleteVolumeRequest =
omRequest.getDeleteVolumeRequest();
String volume = deleteVolumeRequest.getVolumeName();
OmDeleteVolumeResponse omDeleteVolumeResponse =
getOzoneManagerServerProtocol().startDeleteVolume(volume);
DeleteVolumeRequest newDeleteVolumeRequest =
DeleteVolumeRequest.newBuilder().setVolumeList(
omDeleteVolumeResponse.getUpdatedVolumeList())
.setVolumeName(omDeleteVolumeResponse.getVolume())
.setOwner(omDeleteVolumeResponse.getOwner()).build();
return omRequest.toBuilder().setDeleteVolumeRequest(
newDeleteVolumeRequest).build();
}
private DeleteVolumeResponse handleDeleteVolumeApply(OMRequest omRequest)
throws IOException {
DeleteVolumeRequest deleteVolumeRequest =
omRequest.getDeleteVolumeRequest();
getOzoneManagerServerProtocol().applyDeleteVolume(
deleteVolumeRequest.getVolumeName(), deleteVolumeRequest.getOwner(),
deleteVolumeRequest.getVolumeList());
return DeleteVolumeResponse.newBuilder().build();
}
}

View File

@ -364,7 +364,7 @@ public OMResponse handle(OMRequest request) {
} }
// Convert and exception to corresponding status code // Convert and exception to corresponding status code
private Status exceptionToResponseStatus(IOException ex) { protected Status exceptionToResponseStatus(IOException ex) {
if (ex instanceof OMException) { if (ex instanceof OMException) {
return Status.values()[((OMException) ex).getResult().ordinal()]; return Status.values()[((OMException) ex).getResult().ordinal()];
} else { } else {
@ -1027,4 +1027,8 @@ private OzoneManagerProtocolProtos.LookupFileResponse lookupFile(
.setKeyInfo(impl.lookupFile(omKeyArgs).getProtobuf()) .setKeyInfo(impl.lookupFile(omKeyArgs).getProtobuf())
.build(); .build();
} }
protected OzoneManagerServerProtocol getOzoneManagerServerProtocol() {
return impl;
}
} }

View File

@ -28,7 +28,6 @@
*/ */
public interface RequestHandler { public interface RequestHandler {
/** /**
* Handle the OmRequest, and returns OmResponse. * Handle the OmRequest, and returns OmResponse.
* @param request * @param request
@ -36,6 +35,7 @@ public interface RequestHandler {
*/ */
OMResponse handle(OMRequest request); OMResponse handle(OMRequest request);
/** /**
* Validates that the incoming OM request has required parameters. * Validates that the incoming OM request has required parameters.
* TODO: Add more validation checks before writing the request to Ratis log. * TODO: Add more validation checks before writing the request to Ratis log.

View File

@ -39,8 +39,8 @@
.OMRequest; .OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse; .OMResponse;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler; import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
import org.apache.hadoop.ozone.protocolPB.RequestHandler; import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.Message;
@ -72,7 +72,7 @@ public class TestOzoneManagerStateMachine {
private OzoneConfiguration conf; private OzoneConfiguration conf;
private OzoneManagerRatisServer omRatisServer; private OzoneManagerRatisServer omRatisServer;
private String omID; private String omID;
private RequestHandler requestHandler; private OzoneManagerHARequestHandler requestHandler;
private RaftGroupId raftGroupId; private RaftGroupId raftGroupId;
private OzoneManagerStateMachine ozoneManagerStateMachine; private OzoneManagerStateMachine ozoneManagerStateMachine;
@ -105,7 +105,7 @@ public void setup() throws Exception {
ozoneManagerStateMachine = ozoneManagerStateMachine =
new OzoneManagerStateMachine(omRatisServer); new OzoneManagerStateMachine(omRatisServer);
requestHandler = Mockito.mock(OzoneManagerRequestHandler.class); requestHandler = Mockito.mock(OzoneManagerHARequestHandlerImpl.class);
raftGroupId = omRatisServer.getRaftGroup().getGroupId(); raftGroupId = omRatisServer.getRaftGroup().getGroupId();
ozoneManagerStateMachine.setHandler(requestHandler); ozoneManagerStateMachine.setHandler(requestHandler);