HDFS-12195. Ozone: DeleteKey-1: KSM replies delete key request asynchronously. Contributed by Yuanbo Liu.
This commit is contained in:
parent
86b42907cf
commit
bc413a69cc
@ -93,6 +93,8 @@ public enum Versioning {NOT_DEFINED, ENABLED, DISABLED}
|
|||||||
public static final String OZONE_HANDLER_DISTRIBUTED = "distributed";
|
public static final String OZONE_HANDLER_DISTRIBUTED = "distributed";
|
||||||
public static final String OZONE_HANDLER_LOCAL = "local";
|
public static final String OZONE_HANDLER_LOCAL = "local";
|
||||||
|
|
||||||
|
public static final String DELETING_KEY_PREFIX = "#deleting#";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* KSM LevelDB prefixes.
|
* KSM LevelDB prefixes.
|
||||||
*/
|
*/
|
||||||
|
@ -22,17 +22,15 @@
|
|||||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
|
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
|
|
||||||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||||
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
|
|
||||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.apache.hadoop.utils.BatchOperation;
|
||||||
import org.iq80.leveldb.DBException;
|
import org.iq80.leveldb.DBException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -154,29 +152,24 @@ public KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException {
|
|||||||
@Override
|
@Override
|
||||||
public void deleteKey(KsmKeyArgs args) throws IOException {
|
public void deleteKey(KsmKeyArgs args) throws IOException {
|
||||||
Preconditions.checkNotNull(args);
|
Preconditions.checkNotNull(args);
|
||||||
KsmKeyInfo keyInfo = lookupKey(args);
|
|
||||||
|
|
||||||
metadataManager.writeLock().lock();
|
metadataManager.writeLock().lock();
|
||||||
String volumeName = args.getVolumeName();
|
String volumeName = args.getVolumeName();
|
||||||
String bucketName = args.getBucketName();
|
String bucketName = args.getBucketName();
|
||||||
String keyName = args.getKeyName();
|
String keyName = args.getKeyName();
|
||||||
try {
|
try {
|
||||||
List<DeleteBlockResult> resultList =
|
byte[] objectKey = metadataManager.getDBKeyForKey(
|
||||||
scmBlockClient.deleteBlocks(
|
volumeName, bucketName, keyName);
|
||||||
Collections.singleton(keyInfo.getBlockID()));
|
byte[] objectValue = metadataManager.get(objectKey);
|
||||||
if (resultList.size() != 1) {
|
if (objectValue == null) {
|
||||||
throw new KSMException("Delete result size from SCM is wrong",
|
throw new KSMException("Key not found",
|
||||||
ResultCodes.FAILED_KEY_DELETION);
|
KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
|
||||||
}
|
|
||||||
|
|
||||||
if (resultList.get(0).getResult() == Result.success) {
|
|
||||||
byte[] objectKey = metadataManager.getDBKeyForKey(
|
|
||||||
volumeName, bucketName, keyName);
|
|
||||||
metadataManager.deleteKey(objectKey);
|
|
||||||
} else {
|
|
||||||
throw new KSMException("Cannot delete key from SCM",
|
|
||||||
ResultCodes.FAILED_KEY_DELETION);
|
|
||||||
}
|
}
|
||||||
|
byte[] deletingKey = metadataManager.getDeletedKeyName(objectKey);
|
||||||
|
BatchOperation batch = new BatchOperation();
|
||||||
|
batch.put(deletingKey, objectValue);
|
||||||
|
batch.delete(objectKey);
|
||||||
|
metadataManager.writeBatch(batch);
|
||||||
} catch (DBException ex) {
|
} catch (DBException ex) {
|
||||||
LOG.error(String.format("Delete key failed for volume:%s "
|
LOG.error(String.format("Delete key failed for volume:%s "
|
||||||
+ "bucket:%s key:%s", volumeName, bucketName, keyName), ex);
|
+ "bucket:%s key:%s", volumeName, bucketName, keyName), ex);
|
||||||
|
@ -155,6 +155,14 @@ private static RPC.Server startRpcServer(OzoneConfiguration conf,
|
|||||||
return rpcServer;
|
return rpcServer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get metadata manager.
|
||||||
|
* @return metadata manager.
|
||||||
|
*/
|
||||||
|
public MetadataManager getMetadataManager() {
|
||||||
|
return metadataManager;
|
||||||
|
}
|
||||||
|
|
||||||
public KSMMetrics getMetrics() {
|
public KSMMetrics getMetrics() {
|
||||||
return metrics;
|
return metrics;
|
||||||
}
|
}
|
||||||
|
@ -16,10 +16,12 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.ksm;
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.utils.BatchOperation;
|
import org.apache.hadoop.utils.BatchOperation;
|
||||||
|
import org.apache.hadoop.utils.MetadataStore;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -39,6 +41,13 @@ public interface MetadataManager {
|
|||||||
*/
|
*/
|
||||||
void stop() throws IOException;
|
void stop() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get metadata store.
|
||||||
|
* @return metadata store.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
MetadataStore getStore();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the read lock used on Metadata DB.
|
* Returns the read lock used on Metadata DB.
|
||||||
* @return readLock
|
* @return readLock
|
||||||
@ -106,6 +115,15 @@ public interface MetadataManager {
|
|||||||
*/
|
*/
|
||||||
byte[] getDBKeyForKey(String volume, String bucket, String key);
|
byte[] getDBKeyForKey(String volume, String bucket, String key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the DB key name of a deleted key in KSM metadata store.
|
||||||
|
* The name for a deleted key has prefix #deleting# followed by
|
||||||
|
* the actual key name.
|
||||||
|
* @param keyName - key name
|
||||||
|
* @return bytes of DB key.
|
||||||
|
*/
|
||||||
|
byte[] getDeletedKeyName(byte[] keyName);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes the key from DB.
|
* Deletes the key from DB.
|
||||||
*
|
*
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.ksm;
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||||
@ -47,6 +48,7 @@
|
|||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
|
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
|
||||||
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
|
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
|
||||||
.OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
|
.OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
|
||||||
@ -93,6 +95,16 @@ public void stop() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get metadata store.
|
||||||
|
* @return store - metadata store.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
@Override
|
||||||
|
public MetadataStore getStore() {
|
||||||
|
return store;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a volume return the corresponding DB key.
|
* Given a volume return the corresponding DB key.
|
||||||
* @param volume - Volume name
|
* @param volume - Volume name
|
||||||
@ -148,6 +160,12 @@ public byte[] getDBKeyForKey(String volume, String bucket, String key) {
|
|||||||
return DFSUtil.string2Bytes(keyKeyString);
|
return DFSUtil.string2Bytes(keyKeyString);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] getDeletedKeyName(byte[] keyName) {
|
||||||
|
return DFSUtil.string2Bytes(
|
||||||
|
DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes the key on Metadata DB.
|
* Deletes the key on Metadata DB.
|
||||||
*
|
*
|
||||||
|
@ -45,6 +45,8 @@
|
|||||||
import org.apache.hadoop.ozone.web.response.ListKeys;
|
import org.apache.hadoop.ozone.web.response.ListKeys;
|
||||||
import org.apache.hadoop.ozone.web.response.ListVolumes;
|
import org.apache.hadoop.ozone.web.response.ListVolumes;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.apache.hadoop.utils.MetadataKeyFilters;
|
||||||
|
import org.apache.hadoop.utils.MetadataStore;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
@ -59,11 +61,13 @@
|
|||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
|
||||||
/**
|
/**
|
||||||
* Test Key Space Manager operation in distributed handler scenario.
|
* Test Key Space Manager operation in distributed handler scenario.
|
||||||
*/
|
*/
|
||||||
@ -72,6 +76,7 @@ public class TestKeySpaceManager {
|
|||||||
private static StorageHandler storageHandler;
|
private static StorageHandler storageHandler;
|
||||||
private static UserArgs userArgs;
|
private static UserArgs userArgs;
|
||||||
private static KSMMetrics ksmMetrics;
|
private static KSMMetrics ksmMetrics;
|
||||||
|
private static OzoneConfiguration conf;
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public ExpectedException exception = ExpectedException.none();
|
public ExpectedException exception = ExpectedException.none();
|
||||||
@ -86,7 +91,7 @@ public class TestKeySpaceManager {
|
|||||||
*/
|
*/
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void init() throws Exception {
|
public static void init() throws Exception {
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
conf = new OzoneConfiguration();
|
||||||
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
|
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
|
||||||
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
|
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
|
||||||
cluster = new MiniOzoneCluster.Builder(conf)
|
cluster = new MiniOzoneCluster.Builder(conf)
|
||||||
@ -597,6 +602,13 @@ public void testDeleteKey() throws IOException, OzoneException {
|
|||||||
storageHandler.deleteKey(keyArgs);
|
storageHandler.deleteKey(keyArgs);
|
||||||
Assert.assertEquals(1 + numKeyDeletes, ksmMetrics.getNumKeyDeletes());
|
Assert.assertEquals(1 + numKeyDeletes, ksmMetrics.getNumKeyDeletes());
|
||||||
|
|
||||||
|
// Make sure the deleted key has been renamed.
|
||||||
|
MetadataStore store = cluster.getKeySpaceManager().
|
||||||
|
getMetadataManager().getStore();
|
||||||
|
List<Map.Entry<byte[], byte[]>> list = store.getRangeKVs(null, 10,
|
||||||
|
new MetadataKeyFilters.KeyPrefixFilter(DELETING_KEY_PREFIX));
|
||||||
|
Assert.assertEquals(1, list.size());
|
||||||
|
|
||||||
// Check the block key in SCM, make sure it's deleted.
|
// Check the block key in SCM, make sure it's deleted.
|
||||||
Set<String> keys = new HashSet<>();
|
Set<String> keys = new HashSet<>();
|
||||||
keys.add(keyArgs.getResourceName());
|
keys.add(keyArgs.getResourceName());
|
||||||
|
Loading…
Reference in New Issue
Block a user