diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 9474ba3842..d23af37056 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -171,6 +171,23 @@ public final class OzoneConfigKeys { public static final int OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT = 60000; + /** + * The interval of open key clean service. + */ + public static final String OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS = + "ozone.open.key.cleanup.service.interval.seconds"; + public static final int + OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT + = 24 * 3600; // a total of 24 hour + + /** + * An open key gets cleaned up when it is being in open state for too long. + */ + public static final String OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS = + "ozone.open.key.expire.threshold"; + public static final int OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT = + 24 * 3600; + public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT = "ozone.block.deleting.service.timeout"; public static final int OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java index 3b532fe421..b6054ebc46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.ksm.helpers; import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo; +import org.apache.hadoop.util.Time; import java.util.List; import java.util.stream.Collectors; @@ -73,6 +74,10 @@ public List getKeyLocationList() { return keyLocationList; } + public void updateModifcationTime() { + this.modificationTime = Time.monotonicNow(); + } + public void appendKeyLocation(KsmKeyLocationInfo newLocation) { keyLocationList.add(newLocation); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java index bada9bfbf1..f5a2d5bb0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java @@ -240,4 +240,14 @@ List listVolumes(String userName, String prefix, * @throws IOException */ List getPendingDeletionKeys(int count) throws IOException; + + /** + * Returns a list of all still open key info. Which contains the info about + * the key name and all its associated block IDs. A pending open key has + * prefix #open# in KSM DB. + * + * @return a list of {@link BlockGroup} representing keys and blocks. + * @throws IOException + */ + List getExpiredOpenKeys() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java index cb04668bfa..fbc1131c07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo; import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList; import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.utils.BatchOperation; import org.apache.hadoop.utils.MetadataKeyFilters; import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; @@ -52,6 +53,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT; import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME; import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR; @@ -68,7 +71,7 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager { private final MetadataStore store; private final ReadWriteLock lock; - + private final long openKeyExpireThresholdMS; public KSMMetadataManagerImpl(OzoneConfiguration conf) throws IOException { File metaDir = OzoneUtils.getScmMetadirPath(conf); @@ -81,6 +84,9 @@ public KSMMetadataManagerImpl(OzoneConfiguration conf) throws IOException { .setCacheSize(cacheSize * OzoneConsts.MB) .build(); this.lock = new ReentrantReadWriteLock(); + this.openKeyExpireThresholdMS = 1000 * conf.getInt( + OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, + OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT); } /** @@ -478,4 +484,34 @@ public List getPendingDeletionKeys(final int count) } return keyBlocksList; } + + @Override + public List getExpiredOpenKeys() throws IOException { + List keyBlocksList = Lists.newArrayList(); + long now = Time.now(); + final MetadataKeyFilter openKeyFilter = + new KeyPrefixFilter(OPEN_KEY_PREFIX); + List> rangeResult = + store.getSequentialRangeKVs(null, Integer.MAX_VALUE, + openKeyFilter); + for (Map.Entry entry : rangeResult) { + KsmKeyInfo info = + KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue())); + long lastModify = info.getModificationTime(); + if (now - lastModify < this.openKeyExpireThresholdMS) { + // consider as may still be active, not hanging. + continue; + } + // Get block keys as a list. + List item = info.getKeyLocationList().stream() + .map(KsmKeyLocationInfo::getBlockID) + .collect(Collectors.toList()); + BlockGroup keyBlocks = BlockGroup.newBuilder() + .setKeyName(DFSUtil.bytes2String(entry.getKey())) + .addAllBlockIDs(item) + .build(); + keyBlocksList.add(keyBlocks); + } + return keyBlocksList; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java index ccc97aa80b..e71ce5ffe6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java @@ -142,4 +142,24 @@ List listKeys(String volumeName, * @throws IOException if specified key doesn't exist or other I/O errors. */ void deletePendingDeletionKey(String objectKeyName) throws IOException; + + /** + * Returns a list of all still open key info. Which contains the info about + * the key name and all its associated block IDs. A pending open key has + * prefix #open# in KSM DB. + * + * @return a list of {@link BlockGroup} representing keys and blocks. + * @throws IOException + */ + List getExpiredOpenKeys() throws IOException; + + /** + * Deletes a expired open key by its name. Called when a hanging key has been + * lingering for too long. Once called, the open key entries gets removed + * from KSM mdata data. + * + * @param objectKeyName object key name with #open# prefix. + * @throws IOException if specified key doesn't exist or other I/O errors. + */ + void deleteExpiredOpenKey(String objectKeyName) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java index 1f2af955bb..620816aeea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java @@ -60,6 +60,10 @@ .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE; import static org.apache.hadoop.ozone .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT; import static org.apache.hadoop.ozone .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.ozone @@ -85,6 +89,7 @@ public class KeyManagerImpl implements KeyManager { private final long scmBlockSize; private final boolean useRatis; private final BackgroundService keyDeletingService; + private final BackgroundService openKeyCleanupService; private final long preallocateMax; private final Random random; @@ -97,7 +102,7 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, OZONE_SCM_BLOCK_SIZE_DEFAULT) * OzoneConsts.MB; this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT); - int svcInterval = conf.getInt( + int blockDeleteInterval = conf.getInt( OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT); long serviceTimeout = conf.getTimeDuration( @@ -107,18 +112,25 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, OZONE_KEY_PREALLOCATION_MAXSIZE, OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT); keyDeletingService = new KeyDeletingService( - scmBlockClient, this, svcInterval, serviceTimeout, conf); + scmBlockClient, this, blockDeleteInterval, serviceTimeout, conf); + int openkeyCheckInterval = conf.getInt( + OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS, + OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT); + openKeyCleanupService = new OpenKeyCleanupService( + scmBlockClient, this, openkeyCheckInterval, serviceTimeout); random = new Random(); } @Override public void start() { keyDeletingService.start(); + openKeyCleanupService.start(); } @Override public void stop() throws IOException { keyDeletingService.shutdown(); + openKeyCleanupService.shutdown(); } private void validateBucket(String volumeName, String bucketName) @@ -186,6 +198,7 @@ public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID) .setIndex(keyInfo.getKeyLocationList().size()) .build(); keyInfo.appendKeyLocation(info); + keyInfo.updateModifcationTime(); metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray()); return info; } finally { @@ -435,4 +448,37 @@ public void deletePendingDeletionKey(String objectKeyName) metadataManager.writeLock().unlock(); } } + + @Override + public List getExpiredOpenKeys() throws IOException { + metadataManager.readLock().lock(); + try { + return metadataManager.getExpiredOpenKeys(); + } finally { + metadataManager.readLock().unlock(); + } + } + + @Override + public void deleteExpiredOpenKey(String objectKeyName) throws IOException { + Preconditions.checkNotNull(objectKeyName); + if (!objectKeyName.startsWith(OzoneConsts.OPEN_KEY_PREFIX)) { + throw new IllegalArgumentException("Invalid key name," + + " the name should be the key name with open key prefix"); + } + + // Simply removes the entry from KSM DB. + metadataManager.writeLock().lock(); + try { + byte[] openKey = DFSUtil.string2Bytes(objectKeyName); + byte[] delKeyValue = metadataManager.get(openKey); + if (delKeyValue == null) { + throw new IOException("Failed to delete key " + objectKeyName + + " because it is not found in DB"); + } + metadataManager.delete(openKey); + } finally { + metadataManager.writeLock().unlock(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/OpenKeyCleanupService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/OpenKeyCleanupService.java new file mode 100644 index 0000000000..7f60bf83c8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/OpenKeyCleanupService.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.ksm; + +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.utils.BackgroundService; +import org.apache.hadoop.utils.BackgroundTask; +import org.apache.hadoop.utils.BackgroundTaskQueue; +import org.apache.hadoop.utils.BackgroundTaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * This is the background service to delete hanging open keys. + * Scan the metadata of ksm periodically to get + * the keys with prefix "#open#" and ask scm to + * delete metadata accordingly, if scm returns + * success for keys, then clean up those keys. + */ +public class OpenKeyCleanupService extends BackgroundService { + + private static final Logger LOG = + LoggerFactory.getLogger(OpenKeyCleanupService.class); + + private final static int OPEN_KEY_DELETING_CORE_POOL_SIZE = 2; + + private final KeyManager keyManager; + private final ScmBlockLocationProtocol scmClient; + + public OpenKeyCleanupService(ScmBlockLocationProtocol scmClient, + KeyManager keyManager, int serviceInterval, + long serviceTimeout) { + super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS, + OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); + this.keyManager = keyManager; + this.scmClient = scmClient; + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new OpenKeyDeletingTask()); + return queue; + } + + private class OpenKeyDeletingTask + implements BackgroundTask { + + @Override + public int getPriority() { + return 0; + } + + @Override + public BackgroundTaskResult call() throws Exception { + try { + List keyBlocksList = keyManager.getExpiredOpenKeys(); + if (keyBlocksList.size() > 0) { + int toDeleteSize = keyBlocksList.size(); + LOG.debug("Found {} to-delete open keys in KSM", toDeleteSize); + List results = + scmClient.deleteKeyBlocks(keyBlocksList); + int deletedSize = 0; + for (DeleteBlockGroupResult result : results) { + if (result.isSuccess()) { + try { + keyManager.deleteExpiredOpenKey(result.getObjectKey()); + LOG.debug("Key {} deleted from KSM DB", result.getObjectKey()); + deletedSize += 1; + } catch (IOException e) { + LOG.warn("Failed to delete hanging-open key {}", + result.getObjectKey(), e); + } + } else { + LOG.warn("Deleting open Key {} failed because some of the blocks" + + " were failed to delete, failed blocks: {}", + result.getObjectKey(), + String.join(",", result.getFailedBlocks())); + } + } + LOG.info("Found {} expired open key entries, successfully " + + "cleaned up {} entries", toDeleteSize, deletedSize); + return results::size; + } else { + LOG.debug("No hanging open key fond in KSM"); + } + } catch (IOException e) { + LOG.error("Unable to get hanging open keys, retry in" + + " next interval", e); + } + return BackgroundTaskResult.EmptyTaskResult.newResult(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index c7c78b3ae1..0f928bd2ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -1196,4 +1196,25 @@ connection is limited by ozone.rest.client.http.connection.max property. + + + ozone.open.key.cleanup.service.interval.seconds + 86400 + OZONE, KSM, PERFORMANCE + + A background job periodically checks open key entries and delete the expired ones. This entry controls the + interval of this cleanup check. + + + + + ozone.open.key.expire.threshold + 86400 + OZONE, KSM, PERFORMANCE + + Controls how long an open key operation is considered active. Specifically, if a key + has been open longer than the value of this config entry, that open key is considered as + expired (e.g. due to client crash). Default to 24 hours. + + \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java index 09cf38f895..3ff753cf93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.client.rest.OzoneException; import org.apache.hadoop.ozone.web.handlers.BucketArgs; import org.apache.hadoop.ozone.web.handlers.KeyArgs; @@ -69,7 +70,10 @@ import java.util.List; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.Stream; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX; /** * Test Key Space Manager operation in distributed handler scenario. @@ -101,6 +105,8 @@ public static void init() throws Exception { scmId = UUID.randomUUID().toString(); conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY, OzoneConsts.OZONE_HANDLER_DISTRIBUTED); + conf.setInt(OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS, 2); + conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); cluster = new MiniOzoneClassicCluster.Builder(conf) .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED) .setClusterId(clusterId) @@ -955,7 +961,7 @@ public void testListVolumes() throws IOException, OzoneException { */ @Test public void testGetKeyInfo() throws IOException, - OzoneException, ParseException { + OzoneException, ParseException { String userName = "user" + RandomStringUtils.randomNumeric(5); String adminName = "admin" + RandomStringUtils.randomNumeric(5); String volumeName = "volume" + RandomStringUtils.randomNumeric(5); @@ -1053,4 +1059,75 @@ public void testGetScmInfo() throws IOException { Assert.assertEquals(clusterId, info.getClusterId()); Assert.assertEquals(scmId, info.getScmId()); } + + + @Test + public void testExpiredOpenKey() throws Exception { + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + String bucketName = "bucket" + RandomStringUtils.randomNumeric(5); + + VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs); + createVolumeArgs.setUserName(userName); + createVolumeArgs.setAdminName(adminName); + storageHandler.createVolume(createVolumeArgs); + + BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs); + bucketArgs.setAddAcls(new LinkedList<>()); + bucketArgs.setRemoveAcls(new LinkedList<>()); + bucketArgs.setStorageType(StorageType.DISK); + storageHandler.createBucket(bucketArgs); + + // open some keys. + + Thread.sleep(1000); + + KeyArgs keyArgs1 = new KeyArgs("testKey1", bucketArgs); + KeyArgs keyArgs2 = new KeyArgs("testKey2", bucketArgs); + KeyArgs keyArgs3 = new KeyArgs("testKey3", bucketArgs); + KeyArgs keyArgs4 = new KeyArgs("testKey4", bucketArgs); + List openKeys; + try (OutputStream s1 = storageHandler.newKeyWriter(keyArgs1); + OutputStream s2 = storageHandler.newKeyWriter(keyArgs2)) { + storageHandler.newKeyWriter(keyArgs3); + storageHandler.newKeyWriter(keyArgs4); + // now all k1-k4 should be in open state + openKeys = cluster.getKeySpaceManager() + .getMetadataManager().getExpiredOpenKeys(); + Assert.assertEquals(0, openKeys.size()); + + Thread.sleep(2000); + + openKeys = cluster.getKeySpaceManager().getMetadataManager() + .getExpiredOpenKeys(); + Assert.assertEquals(4, openKeys.size()); + + Set expected = Stream.of( + "testKey1", "testKey2", "testKey3", "testKey4") + .collect(Collectors.toSet()); + openKeys = + cluster.getKeySpaceManager().getMetadataManager().getExpiredOpenKeys(); + for (BlockGroup bg : openKeys) { + String[] subs = bg.getGroupID().split("/"); + String keyName = subs[subs.length - 1]; + Assert.assertTrue(expected.remove(keyName)); + } + Assert.assertEquals(0, expected.size()); + } + + KeyArgs keyArgs5 = new KeyArgs("testKey5", bucketArgs); + storageHandler.newKeyWriter(keyArgs5); + + // k1 and k2 are closed, so should be removed from meta data, k3 and k4 + // should still be there. + Thread.sleep(2000); + + openKeys = + cluster.getKeySpaceManager().getMetadataManager().getExpiredOpenKeys(); + Assert.assertEquals(1, openKeys.size()); + String[] subs = openKeys.get(0).getGroupID().split("/"); + String keyName = subs[subs.length - 1]; + Assert.assertEquals("testKey5", keyName); + } }