HDFS-11774. Ozone: KSM: add deleteVolume. Contributed by Mukul Kumar Singh.

This commit is contained in:
Xiaoyu Yao 2017-05-31 14:21:17 -07:00 committed by Owen O'Malley
parent 236c410881
commit 6b9915fcbd
11 changed files with 195 additions and 10 deletions

View File

@ -58,6 +58,10 @@
.KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.SetVolumePropertyResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.DeleteVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.DeleteVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.InfoVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto
@ -233,7 +237,18 @@ public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
*/
@Override
public void deleteVolume(String volume) throws IOException {
DeleteVolumeRequest.Builder req = DeleteVolumeRequest.newBuilder();
req.setVolumeName(volume);
final DeleteVolumeResponse resp;
try {
resp = rpcProxy.deleteVolume(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() != Status.OK) {
throw new
IOException("Delete Volume failed, error:" + resp.getStatus());
}
}
/**

View File

@ -32,6 +32,7 @@ public class KSMMetrics {
private @Metric MutableCounterLong numVolumeModifies;
private @Metric MutableCounterLong numVolumeInfos;
private @Metric MutableCounterLong numBucketCreates;
private @Metric MutableCounterLong numVolumeDeletes;
private @Metric MutableCounterLong numBucketInfos;
private @Metric MutableCounterLong numBucketModifies;
private @Metric MutableCounterLong numKeyAllocate;
@ -41,6 +42,7 @@ public class KSMMetrics {
private @Metric MutableCounterLong numVolumeCreateFails;
private @Metric MutableCounterLong numVolumeModifyFails;
private @Metric MutableCounterLong numVolumeInfoFails;
private @Metric MutableCounterLong numVolumeDeleteFails;
private @Metric MutableCounterLong numBucketCreateFails;
private @Metric MutableCounterLong numBucketInfoFails;
private @Metric MutableCounterLong numBucketModifyFails;
@ -69,6 +71,10 @@ public void incNumVolumeInfos() {
numVolumeInfos.incr();
}
public void incNumVolumeDeletes() {
numVolumeDeletes.incr();
}
public void incNumBucketCreates() {
numBucketCreates.incr();
}
@ -93,6 +99,10 @@ public void incNumVolumeInfoFails() {
numVolumeInfoFails.incr();
}
public void incNumVolumeDeleteFails() {
numVolumeDeleteFails.incr();
}
public void incNumBucketCreateFails() {
numBucketCreateFails.incr();
}
@ -136,6 +146,11 @@ public long getNumVolumeInfos() {
return numVolumeInfos.value();
}
@VisibleForTesting
public long getNumVolumeDeletes() {
return numVolumeDeletes.value();
}
@VisibleForTesting
public long getNumBucketCreates() {
return numBucketCreates.value();
@ -166,6 +181,11 @@ public long getNumVolumeInfoFails() {
return numVolumeInfoFails.value();
}
@VisibleForTesting
public long getNumVolumeDeleteFails() {
return numVolumeDeleteFails.value();
}
@VisibleForTesting
public long getNumBucketCreateFails() {
return numBucketCreateFails.value();

View File

@ -335,7 +335,13 @@ public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
*/
@Override
public void deleteVolume(String volume) throws IOException {
try {
metrics.incNumVolumeDeletes();
volumeManager.deleteVolume(volume);
} catch (Exception ex) {
metrics.incNumVolumeDeleteFails();
throw ex;
}
}
/**

View File

@ -104,4 +104,10 @@ void batchPutDelete(List<Map.Entry<byte[], byte[]>> putList,
* @return bytes of DB key.
*/
byte[] getDBKeyForKey(String volume, String bucket, String key);
/**
* Given a volume, check if it is empty, i.e there are no buckets inside it.
* @param volume - Volume name
*/
boolean isVolumeEmpty(String volume) throws IOException;
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
@ -188,4 +189,22 @@ public void batchPut(List<Map.Entry<byte[], byte[]>> list)
}
}
/**
* Given a volume, check if it is empty, i.e there are no buckets inside it.
* @param volume - Volume name
* @return true if the volume is empty
*/
public boolean isVolumeEmpty(String volume) throws IOException {
try (DBIterator iterator = store.getIterator()) {
String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
+ OzoneConsts.KSM_BUCKET_PREFIX;
byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
// Seek to the root of the volume and look for the next key
iterator.seek(dbVolumeRootKey);
String firstBucketKey = DFSUtil.bytes2String(iterator.next().getKey());
// if the key starts with /<volume name>
// then there is at least one bucket
return !firstBucketKey.startsWith(OzoneConsts.KSM_VOLUME_PREFIX + volume);
}
}
}

View File

@ -56,4 +56,12 @@ public interface VolumeManager {
* @throws IOException
*/
KsmVolumeArgs getVolumeInfo(String volume) throws IOException;
/**
* Deletes an existing empty volume.
*
* @param volume - Name of the volume.
* @throws IOException
*/
void deleteVolume(String volume) throws IOException;
}

View File

@ -167,8 +167,8 @@ public void createVolume(KsmVolumeArgs args) throws IOException {
public void setOwner(String volume, String owner) throws IOException {
Preconditions.checkNotNull(volume);
Preconditions.checkNotNull(owner);
List<Map.Entry<byte[], byte[]>> putbatch = new LinkedList<>();
List<byte[]> deletebatch = new LinkedList<>();
List<Map.Entry<byte[], byte[]>> putBatch = new LinkedList<>();
List<byte[]> deleteBatch = new LinkedList<>();
metadataManager.writeLock().lock();
try {
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
@ -182,8 +182,8 @@ public void setOwner(String volume, String owner) throws IOException {
Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(),
putbatch, deletebatch);
addVolumeToOwnerList(volume, owner, putbatch);
putBatch, deleteBatch);
addVolumeToOwnerList(volume, owner, putBatch);
KsmVolumeArgs newVolumeArgs =
KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
@ -193,9 +193,9 @@ public void setOwner(String volume, String owner) throws IOException {
.build();
VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
putbatch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray()));
putBatch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray()));
metadataManager.batchPutDelete(putbatch, deletebatch);
metadataManager.batchPutDelete(putBatch, deleteBatch);
} catch (IOException ex) {
LOG.error("Changing volume ownership failed for user:{} volume:{}",
owner, volume, ex);
@ -271,4 +271,43 @@ public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
metadataManager.readLock().unlock();
}
}
/**
* Deletes an existing empty volume.
*
* @param volume - Name of the volume.
* @throws IOException
*/
@Override
public void deleteVolume(String volume) throws IOException {
Preconditions.checkNotNull(volume);
metadataManager.writeLock().lock();
try {
List<Map.Entry<byte[], byte[]>> putBatch = new LinkedList<>();
List<byte[]> deleteBatch = new LinkedList<>();
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
byte[] volInfo = metadataManager.get(dbVolumeKey);
if (volInfo == null) {
throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
}
if (!metadataManager.isVolumeEmpty(volume)) {
throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_EMPTY);
}
VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
// delete the volume from the owner list
// as well as delete the volume entry
delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(),
putBatch, deleteBatch);
deleteBatch.add(dbVolumeKey);
metadataManager.batchPutDelete(putBatch, deleteBatch);
} catch (IOException ex) {
LOG.error("Delete volume failed for volume:{}", volume, ex);
throw ex;
} finally {
metadataManager.writeLock().unlock();
}
}
}

View File

@ -99,6 +99,7 @@ public enum ResultCodes {
FAILED_TOO_MANY_USER_VOLUMES,
FAILED_VOLUME_ALREADY_EXISTS,
FAILED_VOLUME_NOT_FOUND,
FAILED_VOLUME_NOT_EMPTY,
FAILED_USER_NOT_FOUND,
FAILED_BUCKET_ALREADY_EXISTS,
FAILED_BUCKET_NOT_FOUND,

View File

@ -104,6 +104,8 @@ private Status exceptionToResponseStatus(IOException ex) {
return Status.USER_TOO_MANY_VOLUMES;
case FAILED_VOLUME_NOT_FOUND:
return Status.VOLUME_NOT_FOUND;
case FAILED_VOLUME_NOT_EMPTY:
return Status.VOLUME_NOT_EMPTY;
case FAILED_USER_NOT_FOUND:
return Status.USER_NOT_FOUND;
case FAILED_BUCKET_ALREADY_EXISTS:
@ -186,7 +188,14 @@ public InfoVolumeResponse infoVolume(
public DeleteVolumeResponse deleteVolume(
RpcController controller, DeleteVolumeRequest request)
throws ServiceException {
return null;
DeleteVolumeResponse.Builder resp = DeleteVolumeResponse.newBuilder();
resp.setStatus(Status.OK);
try {
impl.deleteVolume(request.getVolumeName());
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
@Override

View File

@ -162,7 +162,7 @@ public ListVolumes listVolumes(ListArgs args)
@Override
public void deleteVolume(VolumeArgs args)
throws IOException, OzoneException {
throw new UnsupportedOperationException("deleteVolume not implemented");
keySpaceManagerClient.deleteVolume(args.getVolumeName());
}
@Override

View File

@ -181,6 +181,68 @@ public void testChangeVolumeQuota() throws IOException, OzoneException {
Assert.assertEquals(0, ksmMetrics.getNumVolumeInfoFails());
}
// Create a volume and then delete it and then check for deletion
@Test(timeout = 60000)
public void testDeleteVolume() throws IOException, OzoneException {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(userName));
Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
storageHandler.deleteVolume(createVolumeArgs);
try {
retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
} catch (IOException ex) {
Assert.assertEquals(ex.getMessage(),
"Info Volume failed, error:VOLUME_NOT_FOUND");
}
}
// Create a volume and a bucket inside the volume,
// then delete it and then check for deletion failure
@Test(timeout = 60000)
public void testFailedDeleteVolume() throws IOException, OzoneException {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(userName));
Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
storageHandler.createBucket(bucketArgs);
try {
storageHandler.deleteVolume(createVolumeArgs);
} catch (IOException ex) {
Assert.assertEquals(ex.getMessage(),
"Delete Volume failed, error:VOLUME_NOT_EMPTY");
}
retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(userName));
}
@Test(timeout = 60000)
public void testCreateBucket() throws IOException, OzoneException {
String userName = "user" + RandomStringUtils.randomNumeric(5);