HDFS-12940. Ozone: KSM: TestKeySpaceManager#testExpiredOpenKey fails occasionally. Contributed by Nanda kumar.
This commit is contained in:
parent
1fb1ce107f
commit
4a051ba494
@ -16,6 +16,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.ksm;
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
@ -128,6 +129,11 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
|
|||||||
this.ksmId = ksmId;
|
this.ksmId = ksmId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public BackgroundService getOpenKeyCleanupService() {
|
||||||
|
return openKeyCleanupService;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
keyDeletingService.start();
|
keyDeletingService.start();
|
||||||
|
@ -234,6 +234,11 @@ private static StorageContainerLocationProtocol getScmContainerClient(
|
|||||||
return scmContainerClient;
|
return scmContainerClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public KeyManager getKeyManager() {
|
||||||
|
return keyManager;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public ScmInfo getScmInfo() throws IOException {
|
public ScmInfo getScmInfo() throws IOException {
|
||||||
return scmBlockClient.getScmInfo();
|
return scmBlockClient.getScmInfo();
|
||||||
|
@ -55,6 +55,7 @@ public abstract class BackgroundService {
|
|||||||
private final long interval;
|
private final long interval;
|
||||||
private final long serviceTimeout;
|
private final long serviceTimeout;
|
||||||
private final TimeUnit unit;
|
private final TimeUnit unit;
|
||||||
|
private final PeriodicalTask service;
|
||||||
|
|
||||||
public BackgroundService(String serviceName, long interval,
|
public BackgroundService(String serviceName, long interval,
|
||||||
TimeUnit unit, int threadPoolSize, long serviceTimeout) {
|
TimeUnit unit, int threadPoolSize, long serviceTimeout) {
|
||||||
@ -70,6 +71,7 @@ public BackgroundService(String serviceName, long interval,
|
|||||||
.setNameFormat(serviceName + "#%d")
|
.setNameFormat(serviceName + "#%d")
|
||||||
.build();
|
.build();
|
||||||
exec = Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
|
exec = Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
|
||||||
|
service = new PeriodicalTask();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ExecutorService getExecutorService() {
|
protected ExecutorService getExecutorService() {
|
||||||
@ -81,10 +83,14 @@ public int getThreadCount() {
|
|||||||
return threadGroup.activeCount();
|
return threadGroup.activeCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void triggerBackgroundTaskForTesting() {
|
||||||
|
service.run();
|
||||||
|
}
|
||||||
|
|
||||||
// start service
|
// start service
|
||||||
public void start() {
|
public void start() {
|
||||||
exec.scheduleWithFixedDelay(new PeriodicalTask(), 0, interval, unit);
|
exec.scheduleWithFixedDelay(service, 0, interval, unit);
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract BackgroundTaskQueue getTasks();
|
public abstract BackgroundTaskQueue getTasks();
|
||||||
@ -95,7 +101,7 @@ public void start() {
|
|||||||
*/
|
*/
|
||||||
public class PeriodicalTask implements Runnable {
|
public class PeriodicalTask implements Runnable {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public synchronized void run() {
|
||||||
LOG.debug("Running background service : {}", serviceName);
|
LOG.debug("Running background service : {}", serviceName);
|
||||||
BackgroundTaskQueue tasks = getTasks();
|
BackgroundTaskQueue tasks = getTasks();
|
||||||
if (tasks.isEmpty()) {
|
if (tasks.isEmpty()) {
|
||||||
|
@ -56,6 +56,7 @@
|
|||||||
import org.apache.hadoop.ozone.web.response.ListKeys;
|
import org.apache.hadoop.ozone.web.response.ListKeys;
|
||||||
import org.apache.hadoop.ozone.web.response.ListVolumes;
|
import org.apache.hadoop.ozone.web.response.ListVolumes;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.apache.hadoop.utils.BackgroundService;
|
||||||
import org.apache.hadoop.utils.MetadataKeyFilters;
|
import org.apache.hadoop.utils.MetadataKeyFilters;
|
||||||
import org.apache.hadoop.utils.MetadataStore;
|
import org.apache.hadoop.utils.MetadataStore;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
@ -83,7 +84,6 @@
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS;
|
|
||||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
|
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
|
||||||
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
|
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
|
||||||
@ -121,7 +121,6 @@ public static void init() throws Exception {
|
|||||||
ksmId = UUID.randomUUID().toString();
|
ksmId = UUID.randomUUID().toString();
|
||||||
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
|
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
|
||||||
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
|
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
|
||||||
conf.setInt(OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS, 2);
|
|
||||||
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
|
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
|
||||||
cluster = new MiniOzoneClassicCluster.Builder(conf)
|
cluster = new MiniOzoneClassicCluster.Builder(conf)
|
||||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
|
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
|
||||||
@ -1080,6 +1079,9 @@ public void testGetScmInfo() throws IOException {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExpiredOpenKey() throws Exception {
|
public void testExpiredOpenKey() throws Exception {
|
||||||
|
BackgroundService openKeyCleanUpService = ((KeyManagerImpl)cluster
|
||||||
|
.getKeySpaceManager().getKeyManager()).getOpenKeyCleanupService();
|
||||||
|
|
||||||
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
||||||
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
|
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
|
||||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
|
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
|
||||||
@ -1098,54 +1100,67 @@ public void testExpiredOpenKey() throws Exception {
|
|||||||
|
|
||||||
// open some keys.
|
// open some keys.
|
||||||
|
|
||||||
Thread.sleep(1000);
|
|
||||||
|
|
||||||
KeyArgs keyArgs1 = new KeyArgs("testKey1", bucketArgs);
|
KeyArgs keyArgs1 = new KeyArgs("testKey1", bucketArgs);
|
||||||
KeyArgs keyArgs2 = new KeyArgs("testKey2", bucketArgs);
|
KeyArgs keyArgs2 = new KeyArgs("testKey2", bucketArgs);
|
||||||
KeyArgs keyArgs3 = new KeyArgs("testKey3", bucketArgs);
|
KeyArgs keyArgs3 = new KeyArgs("testKey3", bucketArgs);
|
||||||
KeyArgs keyArgs4 = new KeyArgs("testKey4", bucketArgs);
|
KeyArgs keyArgs4 = new KeyArgs("testKey4", bucketArgs);
|
||||||
List<BlockGroup> openKeys;
|
List<BlockGroup> openKeys;
|
||||||
try (OutputStream s1 = storageHandler.newKeyWriter(keyArgs1);
|
storageHandler.newKeyWriter(keyArgs1);
|
||||||
OutputStream s2 = storageHandler.newKeyWriter(keyArgs2)) {
|
storageHandler.newKeyWriter(keyArgs2);
|
||||||
storageHandler.newKeyWriter(keyArgs3);
|
storageHandler.newKeyWriter(keyArgs3);
|
||||||
storageHandler.newKeyWriter(keyArgs4);
|
storageHandler.newKeyWriter(keyArgs4);
|
||||||
// now all k1-k4 should be in open state
|
|
||||||
openKeys = cluster.getKeySpaceManager()
|
|
||||||
.getMetadataManager().getExpiredOpenKeys();
|
|
||||||
Assert.assertEquals(0, openKeys.size());
|
|
||||||
|
|
||||||
Thread.sleep(2000);
|
Set<String> expected = Stream.of(
|
||||||
|
"testKey1", "testKey2", "testKey3", "testKey4")
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
openKeys = cluster.getKeySpaceManager().getMetadataManager()
|
// Now all k1-k4 should be in open state, so ExpiredOpenKeys should not
|
||||||
.getExpiredOpenKeys();
|
// contain these values.
|
||||||
Assert.assertEquals(4, openKeys.size());
|
openKeys = cluster.getKeySpaceManager()
|
||||||
|
.getMetadataManager().getExpiredOpenKeys();
|
||||||
|
|
||||||
Set<String> expected = Stream.of(
|
for (BlockGroup bg : openKeys) {
|
||||||
"testKey1", "testKey2", "testKey3", "testKey4")
|
String[] subs = bg.getGroupID().split("/");
|
||||||
.collect(Collectors.toSet());
|
String keyName = subs[subs.length - 1];
|
||||||
openKeys =
|
Assert.assertFalse(expected.contains(keyName));
|
||||||
cluster.getKeySpaceManager().getMetadataManager().getExpiredOpenKeys();
|
|
||||||
for (BlockGroup bg : openKeys) {
|
|
||||||
String[] subs = bg.getGroupID().split("/");
|
|
||||||
String keyName = subs[subs.length - 1];
|
|
||||||
Assert.assertTrue(expected.remove(keyName));
|
|
||||||
}
|
|
||||||
Assert.assertEquals(0, expected.size());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
// Now all k1-k4 should be in ExpiredOpenKeys
|
||||||
|
openKeys = cluster.getKeySpaceManager()
|
||||||
|
.getMetadataManager().getExpiredOpenKeys();
|
||||||
|
for (BlockGroup bg : openKeys) {
|
||||||
|
String[] subs = bg.getGroupID().split("/");
|
||||||
|
String keyName = subs[subs.length - 1];
|
||||||
|
if (expected.contains(keyName)) {
|
||||||
|
expected.remove(keyName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertEquals(0, expected.size());
|
||||||
|
|
||||||
KeyArgs keyArgs5 = new KeyArgs("testKey5", bucketArgs);
|
KeyArgs keyArgs5 = new KeyArgs("testKey5", bucketArgs);
|
||||||
storageHandler.newKeyWriter(keyArgs5);
|
storageHandler.newKeyWriter(keyArgs5);
|
||||||
|
|
||||||
// k1 and k2 are closed, so should be removed from meta data, k3 and k4
|
openKeyCleanUpService.triggerBackgroundTaskForTesting();
|
||||||
// should still be there.
|
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
|
// now all k1-k4 should have been removed by the clean-up task, only k5
|
||||||
|
// should be present in ExpiredOpenKeys.
|
||||||
openKeys =
|
openKeys =
|
||||||
cluster.getKeySpaceManager().getMetadataManager().getExpiredOpenKeys();
|
cluster.getKeySpaceManager().getMetadataManager().getExpiredOpenKeys();
|
||||||
Assert.assertEquals(1, openKeys.size());
|
System.out.println(openKeys);
|
||||||
String[] subs = openKeys.get(0).getGroupID().split("/");
|
boolean key5found = false;
|
||||||
String keyName = subs[subs.length - 1];
|
Set<String> removed = Stream.of(
|
||||||
Assert.assertEquals("testKey5", keyName);
|
"testKey1", "testKey2", "testKey3", "testKey4")
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
for (BlockGroup bg : openKeys) {
|
||||||
|
String[] subs = bg.getGroupID().split("/");
|
||||||
|
String keyName = subs[subs.length - 1];
|
||||||
|
Assert.assertFalse(removed.contains(keyName));
|
||||||
|
if (keyName.equals("testKey5")) {
|
||||||
|
key5found = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertTrue(key5found);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user