HDFS-12626. Ozone : delete open key entries that will no longer be closed. Contributed by Chen Liang.

This commit is contained in:
Xiaoyu Yao 2017-12-12 15:46:31 -08:00 committed by Owen O'Malley
parent 6c630d0b09
commit c0c87dea9b
9 changed files with 352 additions and 4 deletions

View File

@ -171,6 +171,23 @@ public final class OzoneConfigKeys {
public static final int OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT public static final int OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT
= 60000; = 60000;
/**
* The interval of open key clean service.
*/
public static final String OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS =
"ozone.open.key.cleanup.service.interval.seconds";
public static final int
OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT
= 24 * 3600; // a total of 24 hour
/**
* An open key gets cleaned up when it is being in open state for too long.
*/
public static final String OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS =
"ozone.open.key.expire.threshold";
public static final int OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT =
24 * 3600;
public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT = public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT =
"ozone.block.deleting.service.timeout"; "ozone.block.deleting.service.timeout";
public static final int OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT public static final int OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.ksm.helpers; package org.apache.hadoop.ozone.ksm.helpers;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo; import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
import org.apache.hadoop.util.Time;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -73,6 +74,10 @@ public List<KsmKeyLocationInfo> getKeyLocationList() {
return keyLocationList; return keyLocationList;
} }
public void updateModifcationTime() {
this.modificationTime = Time.monotonicNow();
}
public void appendKeyLocation(KsmKeyLocationInfo newLocation) { public void appendKeyLocation(KsmKeyLocationInfo newLocation) {
keyLocationList.add(newLocation); keyLocationList.add(newLocation);
} }

View File

@ -240,4 +240,14 @@ List<KsmVolumeArgs> listVolumes(String userName, String prefix,
* @throws IOException * @throws IOException
*/ */
List<BlockGroup> getPendingDeletionKeys(int count) throws IOException; List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
/**
* Returns a list of all still open key info. Which contains the info about
* the key name and all its associated block IDs. A pending open key has
* prefix #open# in KSM DB.
*
* @return a list of {@link BlockGroup} representing keys and blocks.
* @throws IOException
*/
List<BlockGroup> getExpiredOpenKeys() throws IOException;
} }

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo; import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList; import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.BatchOperation; import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataKeyFilters; import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
@ -52,6 +53,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME; import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR; import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR;
@ -68,7 +71,7 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
private final MetadataStore store; private final MetadataStore store;
private final ReadWriteLock lock; private final ReadWriteLock lock;
private final long openKeyExpireThresholdMS;
public KSMMetadataManagerImpl(OzoneConfiguration conf) throws IOException { public KSMMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
File metaDir = OzoneUtils.getScmMetadirPath(conf); File metaDir = OzoneUtils.getScmMetadirPath(conf);
@ -81,6 +84,9 @@ public KSMMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
.setCacheSize(cacheSize * OzoneConsts.MB) .setCacheSize(cacheSize * OzoneConsts.MB)
.build(); .build();
this.lock = new ReentrantReadWriteLock(); this.lock = new ReentrantReadWriteLock();
this.openKeyExpireThresholdMS = 1000 * conf.getInt(
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS,
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT);
} }
/** /**
@ -478,4 +484,34 @@ public List<BlockGroup> getPendingDeletionKeys(final int count)
} }
return keyBlocksList; return keyBlocksList;
} }
@Override
public List<BlockGroup> getExpiredOpenKeys() throws IOException {
List<BlockGroup> keyBlocksList = Lists.newArrayList();
long now = Time.now();
final MetadataKeyFilter openKeyFilter =
new KeyPrefixFilter(OPEN_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> rangeResult =
store.getSequentialRangeKVs(null, Integer.MAX_VALUE,
openKeyFilter);
for (Map.Entry<byte[], byte[]> entry : rangeResult) {
KsmKeyInfo info =
KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
long lastModify = info.getModificationTime();
if (now - lastModify < this.openKeyExpireThresholdMS) {
// consider as may still be active, not hanging.
continue;
}
// Get block keys as a list.
List<String> item = info.getKeyLocationList().stream()
.map(KsmKeyLocationInfo::getBlockID)
.collect(Collectors.toList());
BlockGroup keyBlocks = BlockGroup.newBuilder()
.setKeyName(DFSUtil.bytes2String(entry.getKey()))
.addAllBlockIDs(item)
.build();
keyBlocksList.add(keyBlocks);
}
return keyBlocksList;
}
} }

View File

@ -142,4 +142,24 @@ List<KsmKeyInfo> listKeys(String volumeName,
* @throws IOException if specified key doesn't exist or other I/O errors. * @throws IOException if specified key doesn't exist or other I/O errors.
*/ */
void deletePendingDeletionKey(String objectKeyName) throws IOException; void deletePendingDeletionKey(String objectKeyName) throws IOException;
/**
* Returns a list of all still open key info. Which contains the info about
* the key name and all its associated block IDs. A pending open key has
* prefix #open# in KSM DB.
*
* @return a list of {@link BlockGroup} representing keys and blocks.
* @throws IOException
*/
List<BlockGroup> getExpiredOpenKeys() throws IOException;
/**
* Deletes a expired open key by its name. Called when a hanging key has been
* lingering for too long. Once called, the open key entries gets removed
* from KSM mdata data.
*
* @param objectKeyName object key name with #open# prefix.
* @throws IOException if specified key doesn't exist or other I/O errors.
*/
void deleteExpiredOpenKey(String objectKeyName) throws IOException;
} }

View File

@ -60,6 +60,10 @@
.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE; .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
import static org.apache.hadoop.ozone import static org.apache.hadoop.ozone
.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT; .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT;
import static org.apache.hadoop.ozone import static org.apache.hadoop.ozone
.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.ozone import static org.apache.hadoop.ozone
@ -85,6 +89,7 @@ public class KeyManagerImpl implements KeyManager {
private final long scmBlockSize; private final long scmBlockSize;
private final boolean useRatis; private final boolean useRatis;
private final BackgroundService keyDeletingService; private final BackgroundService keyDeletingService;
private final BackgroundService openKeyCleanupService;
private final long preallocateMax; private final long preallocateMax;
private final Random random; private final Random random;
@ -97,7 +102,7 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
OZONE_SCM_BLOCK_SIZE_DEFAULT) * OzoneConsts.MB; OZONE_SCM_BLOCK_SIZE_DEFAULT) * OzoneConsts.MB;
this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY, this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY,
DFS_CONTAINER_RATIS_ENABLED_DEFAULT); DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
int svcInterval = conf.getInt( int blockDeleteInterval = conf.getInt(
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT); OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
long serviceTimeout = conf.getTimeDuration( long serviceTimeout = conf.getTimeDuration(
@ -107,18 +112,25 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
OZONE_KEY_PREALLOCATION_MAXSIZE, OZONE_KEY_PREALLOCATION_MAXSIZE,
OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT); OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
keyDeletingService = new KeyDeletingService( keyDeletingService = new KeyDeletingService(
scmBlockClient, this, svcInterval, serviceTimeout, conf); scmBlockClient, this, blockDeleteInterval, serviceTimeout, conf);
int openkeyCheckInterval = conf.getInt(
OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS,
OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT);
openKeyCleanupService = new OpenKeyCleanupService(
scmBlockClient, this, openkeyCheckInterval, serviceTimeout);
random = new Random(); random = new Random();
} }
@Override @Override
public void start() { public void start() {
keyDeletingService.start(); keyDeletingService.start();
openKeyCleanupService.start();
} }
@Override @Override
public void stop() throws IOException { public void stop() throws IOException {
keyDeletingService.shutdown(); keyDeletingService.shutdown();
openKeyCleanupService.shutdown();
} }
private void validateBucket(String volumeName, String bucketName) private void validateBucket(String volumeName, String bucketName)
@ -186,6 +198,7 @@ public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
.setIndex(keyInfo.getKeyLocationList().size()) .setIndex(keyInfo.getKeyLocationList().size())
.build(); .build();
keyInfo.appendKeyLocation(info); keyInfo.appendKeyLocation(info);
keyInfo.updateModifcationTime();
metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray()); metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
return info; return info;
} finally { } finally {
@ -435,4 +448,37 @@ public void deletePendingDeletionKey(String objectKeyName)
metadataManager.writeLock().unlock(); metadataManager.writeLock().unlock();
} }
} }
@Override
public List<BlockGroup> getExpiredOpenKeys() throws IOException {
metadataManager.readLock().lock();
try {
return metadataManager.getExpiredOpenKeys();
} finally {
metadataManager.readLock().unlock();
}
}
@Override
public void deleteExpiredOpenKey(String objectKeyName) throws IOException {
Preconditions.checkNotNull(objectKeyName);
if (!objectKeyName.startsWith(OzoneConsts.OPEN_KEY_PREFIX)) {
throw new IllegalArgumentException("Invalid key name,"
+ " the name should be the key name with open key prefix");
}
// Simply removes the entry from KSM DB.
metadataManager.writeLock().lock();
try {
byte[] openKey = DFSUtil.string2Bytes(objectKeyName);
byte[] delKeyValue = metadataManager.get(openKey);
if (delKeyValue == null) {
throw new IOException("Failed to delete key " + objectKeyName
+ " because it is not found in DB");
}
metadataManager.delete(openKey);
} finally {
metadataManager.writeLock().unlock();
}
}
} }

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.utils.BackgroundService;
import org.apache.hadoop.utils.BackgroundTask;
import org.apache.hadoop.utils.BackgroundTaskQueue;
import org.apache.hadoop.utils.BackgroundTaskResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* This is the background service to delete hanging open keys.
* Scan the metadata of ksm periodically to get
* the keys with prefix "#open#" and ask scm to
* delete metadata accordingly, if scm returns
* success for keys, then clean up those keys.
*/
public class OpenKeyCleanupService extends BackgroundService {
private static final Logger LOG =
LoggerFactory.getLogger(OpenKeyCleanupService.class);
private final static int OPEN_KEY_DELETING_CORE_POOL_SIZE = 2;
private final KeyManager keyManager;
private final ScmBlockLocationProtocol scmClient;
public OpenKeyCleanupService(ScmBlockLocationProtocol scmClient,
KeyManager keyManager, int serviceInterval,
long serviceTimeout) {
super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS,
OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
this.keyManager = keyManager;
this.scmClient = scmClient;
}
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
queue.add(new OpenKeyDeletingTask());
return queue;
}
private class OpenKeyDeletingTask
implements BackgroundTask<BackgroundTaskResult> {
@Override
public int getPriority() {
return 0;
}
@Override
public BackgroundTaskResult call() throws Exception {
try {
List<BlockGroup> keyBlocksList = keyManager.getExpiredOpenKeys();
if (keyBlocksList.size() > 0) {
int toDeleteSize = keyBlocksList.size();
LOG.debug("Found {} to-delete open keys in KSM", toDeleteSize);
List<DeleteBlockGroupResult> results =
scmClient.deleteKeyBlocks(keyBlocksList);
int deletedSize = 0;
for (DeleteBlockGroupResult result : results) {
if (result.isSuccess()) {
try {
keyManager.deleteExpiredOpenKey(result.getObjectKey());
LOG.debug("Key {} deleted from KSM DB", result.getObjectKey());
deletedSize += 1;
} catch (IOException e) {
LOG.warn("Failed to delete hanging-open key {}",
result.getObjectKey(), e);
}
} else {
LOG.warn("Deleting open Key {} failed because some of the blocks"
+ " were failed to delete, failed blocks: {}",
result.getObjectKey(),
String.join(",", result.getFailedBlocks()));
}
}
LOG.info("Found {} expired open key entries, successfully " +
"cleaned up {} entries", toDeleteSize, deletedSize);
return results::size;
} else {
LOG.debug("No hanging open key fond in KSM");
}
} catch (IOException e) {
LOG.error("Unable to get hanging open keys, retry in"
+ " next interval", e);
}
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
}
}

View File

@ -1196,4 +1196,25 @@
connection is limited by ozone.rest.client.http.connection.max property. connection is limited by ozone.rest.client.http.connection.max property.
</description> </description>
</property> </property>
<property>
<name>ozone.open.key.cleanup.service.interval.seconds</name>
<value>86400</value>
<tag>OZONE, KSM, PERFORMANCE</tag>
<description>
A background job periodically checks open key entries and delete the expired ones. This entry controls the
interval of this cleanup check.
</description>
</property>
<property>
<name>ozone.open.key.expire.threshold </name>
<value>86400</value>
<tag>OZONE, KSM, PERFORMANCE</tag>
<description>
Controls how long an open key operation is considered active. Specifically, if a key
has been open longer than the value of this config entry, that open key is considered as
expired (e.g. due to client crash). Default to 24 hours.
</description>
</property>
</configuration> </configuration>

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.client.rest.OzoneException; import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs; import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs; import org.apache.hadoop.ozone.web.handlers.KeyArgs;
@ -69,7 +70,10 @@
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
/** /**
* Test Key Space Manager operation in distributed handler scenario. * Test Key Space Manager operation in distributed handler scenario.
@ -101,6 +105,8 @@ public static void init() throws Exception {
scmId = UUID.randomUUID().toString(); scmId = UUID.randomUUID().toString();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY, conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED); OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
conf.setInt(OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS, 2);
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
cluster = new MiniOzoneClassicCluster.Builder(conf) cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED) .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.setClusterId(clusterId) .setClusterId(clusterId)
@ -955,7 +961,7 @@ public void testListVolumes() throws IOException, OzoneException {
*/ */
@Test @Test
public void testGetKeyInfo() throws IOException, public void testGetKeyInfo() throws IOException,
OzoneException, ParseException { OzoneException, ParseException {
String userName = "user" + RandomStringUtils.randomNumeric(5); String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5); String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5); String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
@ -1053,4 +1059,75 @@ public void testGetScmInfo() throws IOException {
Assert.assertEquals(clusterId, info.getClusterId()); Assert.assertEquals(clusterId, info.getClusterId());
Assert.assertEquals(scmId, info.getScmId()); Assert.assertEquals(scmId, info.getScmId());
} }
@Test
public void testExpiredOpenKey() throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
bucketArgs.setAddAcls(new LinkedList<>());
bucketArgs.setRemoveAcls(new LinkedList<>());
bucketArgs.setStorageType(StorageType.DISK);
storageHandler.createBucket(bucketArgs);
// open some keys.
Thread.sleep(1000);
KeyArgs keyArgs1 = new KeyArgs("testKey1", bucketArgs);
KeyArgs keyArgs2 = new KeyArgs("testKey2", bucketArgs);
KeyArgs keyArgs3 = new KeyArgs("testKey3", bucketArgs);
KeyArgs keyArgs4 = new KeyArgs("testKey4", bucketArgs);
List<BlockGroup> openKeys;
try (OutputStream s1 = storageHandler.newKeyWriter(keyArgs1);
OutputStream s2 = storageHandler.newKeyWriter(keyArgs2)) {
storageHandler.newKeyWriter(keyArgs3);
storageHandler.newKeyWriter(keyArgs4);
// now all k1-k4 should be in open state
openKeys = cluster.getKeySpaceManager()
.getMetadataManager().getExpiredOpenKeys();
Assert.assertEquals(0, openKeys.size());
Thread.sleep(2000);
openKeys = cluster.getKeySpaceManager().getMetadataManager()
.getExpiredOpenKeys();
Assert.assertEquals(4, openKeys.size());
Set<String> expected = Stream.of(
"testKey1", "testKey2", "testKey3", "testKey4")
.collect(Collectors.toSet());
openKeys =
cluster.getKeySpaceManager().getMetadataManager().getExpiredOpenKeys();
for (BlockGroup bg : openKeys) {
String[] subs = bg.getGroupID().split("/");
String keyName = subs[subs.length - 1];
Assert.assertTrue(expected.remove(keyName));
}
Assert.assertEquals(0, expected.size());
}
KeyArgs keyArgs5 = new KeyArgs("testKey5", bucketArgs);
storageHandler.newKeyWriter(keyArgs5);
// k1 and k2 are closed, so should be removed from meta data, k3 and k4
// should still be there.
Thread.sleep(2000);
openKeys =
cluster.getKeySpaceManager().getMetadataManager().getExpiredOpenKeys();
Assert.assertEquals(1, openKeys.size());
String[] subs = openKeys.get(0).getGroupID().split("/");
String keyName = subs[subs.length - 1];
Assert.assertEquals("testKey5", keyName);
}
} }