From bc413a69cc6eb86eb24a2937723d1d919ef0a4b1 Mon Sep 17 00:00:00 2001
From: Anu Engineer <aengineer@apache.org>
Date: Mon, 31 Jul 2017 10:23:11 -0700
Subject: [PATCH] HDFS-12195. Ozone: DeleteKey-1: KSM replies delete key
 request asynchronously. Contributed by Yuanbo Liu.

---
 .../org/apache/hadoop/ozone/OzoneConsts.java  |  2 ++
 .../hadoop/ozone/ksm/KeyManagerImpl.java      | 31 +++++++------------
 .../hadoop/ozone/ksm/KeySpaceManager.java     |  8 +++++
 .../hadoop/ozone/ksm/MetadataManager.java     | 18 +++++++++++
 .../hadoop/ozone/ksm/MetadataManagerImpl.java | 18 +++++++++++
 .../hadoop/ozone/ksm/TestKeySpaceManager.java | 14 ++++++++-
 6 files changed, 71 insertions(+), 20 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 07c98cb5c3..5e73045aa2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -93,6 +93,8 @@ public final class OzoneConsts {
   public static final String OZONE_HANDLER_DISTRIBUTED = "distributed";
   public static final String OZONE_HANDLER_LOCAL = "local";
 
+  public static final String DELETING_KEY_PREFIX = "#deleting#";
+
   /**
    * KSM LevelDB prefixes.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
index 70e687bbdf..d3b7e48a0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
@@ -22,17 +22,15 @@ import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
-import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
 import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BatchOperation;
 import org.iq80.leveldb.DBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -154,29 +152,24 @@ public class KeyManagerImpl implements KeyManager {
   @Override
   public void deleteKey(KsmKeyArgs args) throws IOException {
     Preconditions.checkNotNull(args);
-    KsmKeyInfo keyInfo = lookupKey(args);
 
     metadataManager.writeLock().lock();
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
     try {
-      List<DeleteBlockResult> resultList =
-          scmBlockClient.deleteBlocks(
-              Collections.singleton(keyInfo.getBlockID()));
-      if (resultList.size() != 1) {
-        throw new KSMException("Delete result size from SCM is wrong",
-            ResultCodes.FAILED_KEY_DELETION);
-      }
-
-      if (resultList.get(0).getResult() == Result.success) {
-        byte[] objectKey = metadataManager.getDBKeyForKey(
-            volumeName, bucketName, keyName);
-        metadataManager.deleteKey(objectKey);
-      } else {
-        throw new KSMException("Cannot delete key from SCM",
-                ResultCodes.FAILED_KEY_DELETION);
+      byte[] objectKey = metadataManager.getDBKeyForKey(
+          volumeName, bucketName, keyName);
+      byte[] objectValue = metadataManager.get(objectKey);
+      if (objectValue == null) {
+        throw new KSMException("Key not found",
+            KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
       }
+      byte[] deletingKey = metadataManager.getDeletedKeyName(objectKey);
+      BatchOperation batch = new BatchOperation();
+      batch.put(deletingKey, objectValue);
+      batch.delete(objectKey);
+      metadataManager.writeBatch(batch);
     } catch (DBException ex) {
       LOG.error(String.format("Delete key failed for volume:%s "
           + "bucket:%s key:%s", volumeName, bucketName, keyName), ex);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
index 7cf349cadf..435b24384b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
@@ -155,6 +155,14 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
     return rpcServer;
   }
 
+  /**
+   * Get metadata manager.
+   * @return metadata manager.
+   */
+  public MetadataManager getMetadataManager() {
+    return metadataManager;
+  }
+
   public KSMMetrics getMetrics() {
     return metrics;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
index 36e5b3ac6a..d1a79031d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
@@ -16,10 +16,12 @@
  */
 package org.apache.hadoop.ozone.ksm;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
 
 import java.io.IOException;
 import java.util.List;
@@ -39,6 +41,13 @@ public interface MetadataManager {
    */
   void stop() throws IOException;
 
+  /**
+   * Get metadata store.
+   * @return metadata store.
+   */
+  @VisibleForTesting
+  MetadataStore getStore();
+
   /**
    * Returns the read lock used on Metadata DB.
    * @return readLock
@@ -106,6 +115,15 @@ public interface MetadataManager {
    */
   byte[] getDBKeyForKey(String volume, String bucket, String key);
 
+  /**
+   * Returns the DB key name of a deleted key in KSM metadata store.
+   * The name for a deleted key has prefix #deleting# followed by
+   * the actual key name.
+   * @param keyName - key name
+   * @return bytes of DB key.
+   */
+  byte[] getDeletedKeyName(byte[] keyName);
+
   /**
    * Deletes the key from DB.
    *
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
index 7e48eda4fc..8bcd1f2c13 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.ksm;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -47,6 +48,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
 import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
     .OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
@@ -93,6 +95,16 @@ public class MetadataManagerImpl implements MetadataManager {
     }
   }
 
+  /**
+   * Get metadata store.
+   * @return store - metadata store.
+   */
+  @VisibleForTesting
+  @Override
+  public MetadataStore getStore() {
+    return store;
+  }
+
   /**
    * Given a volume return the corresponding DB key.
    * @param volume - Volume name
@@ -148,6 +160,12 @@ public class MetadataManagerImpl implements MetadataManager {
     return DFSUtil.string2Bytes(keyKeyString);
   }
 
+  @Override
+  public byte[] getDeletedKeyName(byte[] keyName) {
+    return DFSUtil.string2Bytes(
+        DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName));
+  }
+
   /**
    * Deletes the key on Metadata DB.
    *
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
index 5936ccde5d..1a68eb2d74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.ozone.web.response.ListBuckets;
 import org.apache.hadoop.ozone.web.response.ListKeys;
 import org.apache.hadoop.ozone.web.response.ListVolumes;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.MetadataKeyFilters;
+import org.apache.hadoop.utils.MetadataStore;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -59,11 +61,13 @@ import java.io.OutputStream;
 import java.text.ParseException;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
 /**
  * Test Key Space Manager operation in distributed handler scenario.
  */
@@ -72,6 +76,7 @@ public class TestKeySpaceManager {
   private static StorageHandler storageHandler;
   private static UserArgs userArgs;
   private static KSMMetrics ksmMetrics;
+  private static OzoneConfiguration conf;
 
   @Rule
   public ExpectedException exception = ExpectedException.none();
@@ -86,7 +91,7 @@ public class TestKeySpaceManager {
    */
   @BeforeClass
   public static void init() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
+    conf = new OzoneConfiguration();
     conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
         OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
     cluster = new MiniOzoneCluster.Builder(conf)
@@ -597,6 +602,13 @@ public class TestKeySpaceManager {
     storageHandler.deleteKey(keyArgs);
     Assert.assertEquals(1 + numKeyDeletes, ksmMetrics.getNumKeyDeletes());
 
+    // Make sure the deleted key has been renamed.
+    MetadataStore store = cluster.getKeySpaceManager().
+        getMetadataManager().getStore();
+    List<Map.Entry<byte[], byte[]>> list = store.getRangeKVs(null, 10,
+        new MetadataKeyFilters.KeyPrefixFilter(DELETING_KEY_PREFIX));
+    Assert.assertEquals(1, list.size());
+
     // Check the block key in SCM, make sure it's deleted.
     Set<String> keys = new HashSet<>();
     keys.add(keyArgs.getResourceName());