From 639b4fb8a9df9e709b59cecd72546218709773c5 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Fri, 11 Aug 2017 18:45:55 +0800 Subject: [PATCH] HDFS-12196. Ozone: DeleteKey-2: Implement block deleting service to delete stale blocks at background. Contributed by Weiwei Yang. --- .../apache/hadoop/ozone/OzoneConfigKeys.java | 18 + .../common/impl/ContainerManagerImpl.java | 2 +- .../background/BlockDeletingService.java | 211 ++++++++++++ .../statemachine/background/package-info.java | 18 + .../container/ozoneimpl/OzoneContainer.java | 14 + .../hadoop/utils/BackgroundService.java | 147 +++++++++ .../apache/hadoop/utils/BackgroundTask.java | 28 ++ .../hadoop/utils/BackgroundTaskQueue.java | 64 ++++ .../hadoop/utils/BackgroundTaskResult.java | 29 ++ .../src/main/resources/ozone-default.xml | 31 ++ .../BlockDeletingServiceTestImpl.java | 99 ++++++ .../common/TestBlockDeletingService.java | 312 ++++++++++++++++++ 12 files changed, 972 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundService.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTask.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/BlockDeletingServiceTestImpl.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 55b5f88391..92017a0206 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -94,6 +94,24 @@ public final class OzoneConfigKeys { "ozone.client.connection.timeout.ms"; public static final int OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT = 5000; + /** + * Configuration properties for Ozone Block Deleting Service. + */ + public static final String OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS = + "ozone.block.deleting.service.interval.ms"; + public static final int OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT + = 60000; + + public static final String OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER = + "ozone.block.deleting.limit.per.task"; + public static final int OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT + = 1000; + + public static final String OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL + = "ozone.block.deleting.container.limit.per.interval"; + public static final int + OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10; + public static final String DFS_CONTAINER_RATIS_ENABLED_KEY = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java index 2beb5251c6..68ceb6f0ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java @@ -407,7 +407,7 @@ public void shutdown() throws IOException { @VisibleForTesting - ConcurrentSkipListMap getContainerMap() { + public ConcurrentSkipListMap getContainerMap() { return containerMap; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java new file mode 100644 index 0000000000..618fa42c1d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.background; + +import com.google.common.collect.Lists; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.utils.BackgroundService; +import org.apache.hadoop.utils.BackgroundTaskResult; +import org.apache.hadoop.utils.BackgroundTaskQueue; +import org.apache.hadoop.utils.BackgroundTask; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT; + +/** + * A per-datanode container block deleting service takes in charge + * of deleting staled ozone blocks. + */ +public class BlockDeletingService extends BackgroundService{ + + private static final Logger LOG = + LoggerFactory.getLogger(BlockDeletingService.class); + + private final ContainerManager containerManager; + private final Configuration conf; + + // Throttle number of blocks to delete per task, + // set to 1 for testing + private final int blockLimitPerTask; + + // Throttle the number of containers to process concurrently at a time, + private final int containerLimitPerInterval; + + // Task priority is useful when a to-delete block has weight. + private final static int TASK_PRIORITY_DEFAULT = 1; + // Core pool size for container tasks + private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10; + + public BlockDeletingService(ContainerManager containerManager, + int serviceInterval, Configuration conf) { + super("BlockDeletingService", serviceInterval, + TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE); + this.containerManager = containerManager; + this.conf = conf; + this.blockLimitPerTask = conf.getInt( + OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, + OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT); + this.containerLimitPerInterval = conf.getInt( + OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, + OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT); + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + List containers = Lists.newArrayList(); + try { + // We at most list a number of containers a time, + // in case there are too many containers and start too many workers. + // We must ensure there is no empty container in this result. + containerManager.listContainer(null, containerLimitPerInterval, + null, containers); + + // TODO + // in case we always fetch a few same containers, + // should we list some more containers a time and shuffle them? + for(ContainerData container : containers) { + BlockDeletingTask containerTask = + new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT); + queue.add(containerTask); + } + } catch (StorageContainerException e) { + LOG.warn("Failed to initiate block deleting tasks, " + + "caused by unable to get containers info. " + + "Retry in next interval. ", e); + } + return queue; + } + + private static class ContainerBackgroundTaskResult + implements BackgroundTaskResult { + private List deletedBlockIds; + + ContainerBackgroundTaskResult() { + deletedBlockIds = new LinkedList<>(); + } + + public void addBlockId(String blockId) { + deletedBlockIds.add(blockId); + } + + public void addAll(List blockIds) { + deletedBlockIds.addAll(blockIds); + } + + public List getDeletedBlocks() { + return deletedBlockIds; + } + + @Override + public int getSize() { + return deletedBlockIds.size(); + } + } + + private class BlockDeletingTask + implements BackgroundTask { + + private final int priority; + private final ContainerData containerData; + + BlockDeletingTask(ContainerData containerName, int priority) { + this.priority = priority; + this.containerData = containerName; + } + + @Override + public BackgroundTaskResult call() throws Exception { + // Scan container's db and get list of under deletion blocks + MetadataStore meta = KeyUtils.getDB(containerData, conf); + // # of blocks to delete is throttled + KeyPrefixFilter filter = new KeyPrefixFilter( + OzoneConsts.DELETING_KEY_PREFIX); + List> toDeleteBlocks = + meta.getRangeKVs(null, blockLimitPerTask, filter); + if (toDeleteBlocks.isEmpty()) { + LOG.info("No under deletion block found in container : {}", + containerData.getContainerName()); + } + + List succeedBlocks = new LinkedList<>(); + LOG.info("Container : {}, To-Delete blocks : {}", + containerData.getContainerName(), toDeleteBlocks.size()); + toDeleteBlocks.forEach(entry -> { + String blockName = DFSUtil.bytes2String(entry.getKey()); + LOG.info("Deleting block {}", blockName); + try { + ContainerProtos.KeyData data = + ContainerProtos.KeyData.parseFrom(entry.getValue()); + + for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) { + File chunkFile = new File(chunkInfo.getChunkName()); + if (FileUtils.deleteQuietly(chunkFile)) { + LOG.info("block {} chunk {} deleted", blockName, + chunkFile.getAbsolutePath()); + } + } + succeedBlocks.add(blockName); + } catch (InvalidProtocolBufferException e) { + LOG.error("Failed to parse block info for block {}", blockName, e); + } + }); + + // Once files are deleted ... clean up DB + BatchOperation batch = new BatchOperation(); + succeedBlocks.forEach(entry -> + batch.delete(DFSUtil.string2Bytes(entry))); + meta.writeBatch(batch); + + ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult(); + crr.addAll(succeedBlocks); + return crr; + } + + @Override + public int getPriority() { + return priority; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java new file mode 100644 index 0000000000..a9e202e35e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.background; \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index f7caf5a74c..792c1320a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; +import org.apache.hadoop.ozone.container.common.statemachine.background.BlockDeletingService; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.protocol.proto @@ -45,6 +46,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT; /** * Ozone main class sets up the network server and initializes the container @@ -60,6 +65,7 @@ public class OzoneContainer { private final XceiverServerSpi server; private final ChunkManager chunkManager; private final KeyManager keyManager; + private final BlockDeletingService blockDeletingService; /** * Creates a network endpoint and enables Ozone container. @@ -90,6 +96,12 @@ public OzoneContainer( this.keyManager = new KeyManagerImpl(manager, ozoneConfig); manager.setKeyManager(this.keyManager); + int svcInterval = ozoneConfig.getInt( + OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, + OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT); + this.blockDeletingService = new BlockDeletingService(manager, + svcInterval, ozoneConfig); + this.dispatcher = new Dispatcher(manager, this.ozoneConfig); final boolean useRatis = ozoneConfig.getBoolean( @@ -107,6 +119,7 @@ public OzoneContainer( */ public void start() throws IOException { server.start(); + blockDeletingService.start(); dispatcher.init(); } @@ -152,6 +165,7 @@ public void stop() { this.chunkManager.shutdown(); this.keyManager.shutdown(); this.manager.shutdown(); + this.blockDeletingService.shutdown(); LOG.info("container services shutdown complete."); } catch (IOException ex) { LOG.warn("container service shutdown error:", ex); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundService.java new file mode 100644 index 0000000000..b057533cf5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundService.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.utils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; + +/** + * An abstract class for a background service in ozone. + * A background service schedules multiple child tasks in parallel + * in a certain period. In each interval, it waits until all the tasks + * finish execution and then schedule next interval. + */ +public abstract class BackgroundService { + + private static final Logger LOG = + LoggerFactory.getLogger(BackgroundService.class); + + // Executor to launch child tasks + private final ScheduledExecutorService exec; + private final ThreadGroup threadGroup; + private final ThreadFactory threadFactory; + private final String serviceName; + private final int interval; + private final TimeUnit unit; + + public BackgroundService(String serviceName, int interval, + TimeUnit unit, int threadPoolSize) { + this.interval = interval; + this.unit = unit; + this.serviceName = serviceName; + threadGroup = new ThreadGroup(serviceName); + ThreadFactory tf = r -> new Thread(threadGroup, r); + threadFactory = new ThreadFactoryBuilder() + .setThreadFactory(tf) + .setDaemon(true) + .setNameFormat(serviceName + "#%d") + .build(); + exec = Executors.newScheduledThreadPool(threadPoolSize, threadFactory); + } + + protected ExecutorService getExecutorService() { + return this.exec; + } + + @VisibleForTesting + public int getThreadCount() { + return threadGroup.activeCount(); + } + + + // start service + public void start() { + exec.scheduleWithFixedDelay(new PeriodicalTask(), 0, interval, unit); + } + + public abstract BackgroundTaskQueue getTasks(); + + /** + * Run one or more background tasks concurrently. + * Wait until all tasks to return the result. + */ + public class PeriodicalTask implements Runnable { + @Override + public void run() { + LOG.info("Running background service : {}", serviceName); + BackgroundTaskQueue tasks = getTasks(); + if (tasks.isEmpty()) { + // No task found, or some problems to init tasks + // return and retry in next interval. + return; + } + + LOG.info("Number of background tasks to execute : {}", tasks.size()); + CompletionService taskCompletionService = + new ExecutorCompletionService<>(exec); + + List> results = Lists.newArrayList(); + while (tasks.size() > 0) { + BackgroundTask task = tasks.poll(); + Future result = + taskCompletionService.submit(task); + results.add(result); + } + + results.parallelStream().forEach(taskResultFuture -> { + try { + // Collect task results + // TODO timeout in case task hangs + BackgroundTaskResult result = taskResultFuture.get(); + if (LOG.isDebugEnabled()) { + LOG.debug("task execution result size {}", result.getSize()); + } + } catch (InterruptedException | ExecutionException e) { + LOG.warn( + "Background task fails to execute, " + + "retrying in next interval", e); + } + }); + } + } + + // shutdown and make sure all threads are properly released. + public void shutdown() { + LOG.info("Shutting down service {}", this.serviceName); + exec.shutdown(); + try { + if (!exec.awaitTermination(60, TimeUnit.SECONDS)) { + exec.shutdownNow(); + } + } catch (InterruptedException e) { + exec.shutdownNow(); + } + if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) { + threadGroup.destroy(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTask.java new file mode 100644 index 0000000000..47e8ebc98f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTask.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.utils; + +import java.util.concurrent.Callable; + +/** + * A task thread to run by {@link BackgroundService}. + */ +public interface BackgroundTask extends Callable { + + int getPriority(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java new file mode 100644 index 0000000000..b56ef0c804 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.utils; + +import java.util.PriorityQueue; + +/** + * A priority queue that stores a number of {@link BackgroundTask}. + */ +public class BackgroundTaskQueue { + + private final PriorityQueue tasks; + + public BackgroundTaskQueue() { + tasks = new PriorityQueue<>((task1, task2) + -> task1.getPriority() - task2.getPriority()); + } + + /** + * @return the head task in this queue. + */ + public synchronized BackgroundTask poll() { + return tasks.poll(); + } + + /** + * Add a {@link BackgroundTask} to the queue, + * the task will be sorted by its priority. + * + * @param task + */ + public synchronized void add(BackgroundTask task) { + tasks.add(task); + } + + /** + * @return true if the queue contains no task, false otherwise. + */ + public synchronized boolean isEmpty() { + return tasks.isEmpty(); + } + + /** + * @return the size of the queue. + */ + public synchronized int size() { + return tasks.size(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java new file mode 100644 index 0000000000..b37a5dbbb0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.utils; + +/** + * Result of a {@link BackgroundTask}. + */ +public interface BackgroundTaskResult { + + /** + * Returns the size of entries included in this result. + */ + int getSize(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index be9a48a7b9..17a127d827 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -359,6 +359,37 @@ + + ozone.block.deleting.service.interval.ms + 60000 + + Time interval in milliseconds of the block deleting service. + The block deleting service runs on each datanode to scan staled + blocks and delete them asynchronously. + + + + + ozone.block.deleting.limit.per.task + 1000 + + Maximum number of blocks to be deleted by block deleting service + per time interval. This property is used to throttle the actual number + of block deletions on a datanode per container. + + + + + ozone.block.deleting.container.limit.per.interval + 10 + + Maximum number of containers to be scanned by block deleting service + per time interval. The block deleting service spawns a thread to handle + block deletions in a container. This property is used to throttle + the number of threads spawned for block deletions. + + + dfs.container.ipc 50011 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/BlockDeletingServiceTestImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/BlockDeletingServiceTestImpl.java new file mode 100644 index 0000000000..0fde964a7a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/BlockDeletingServiceTestImpl.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.TestUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.statemachine.background.BlockDeletingService; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A test class implementation for {@link BlockDeletingService}. + */ +public class BlockDeletingServiceTestImpl + extends BlockDeletingService { + + // tests only + private CountDownLatch latch; + private Thread testingThread; + private AtomicInteger numOfProcessed = new AtomicInteger(0); + + public BlockDeletingServiceTestImpl(ContainerManager containerManager, + int serviceInterval, Configuration conf) { + super(containerManager, serviceInterval, conf); + } + + @VisibleForTesting + public void runDeletingTasks() { + if (latch.getCount() > 0) { + this.latch.countDown(); + } else { + throw new IllegalStateException("Count already reaches zero"); + } + } + + @VisibleForTesting + public boolean isStarted() { + return latch != null && testingThread.isAlive(); + } + + public int getTimesOfProcessed() { + return numOfProcessed.get(); + } + + // Override the implementation to start a single on-call control thread. + @Override public void start() { + PeriodicalTask svc = new PeriodicalTask(); + // In test mode, relies on a latch countdown to runDeletingTasks tasks. + Runnable r = () -> { + while (true) { + latch = new CountDownLatch(1); + try { + latch.await(); + } catch (InterruptedException e) { + break; + } + Future future = this.getExecutorService().submit(svc); + try { + // for tests, we only wait for 3s for completion + future.get(3, TimeUnit.SECONDS); + numOfProcessed.incrementAndGet(); + } catch (Exception e) { + return; + } + } + }; + + testingThread = new ThreadFactoryBuilder() + .setDaemon(true) + .build() + .newThread(r); + testingThread.start(); + } + + @Override + public void shutdown() { + testingThread.interrupt(); + super.shutdown(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java new file mode 100644 index 0000000000..2d4b5b2807 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common; + +import com.google.common.collect.Lists; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.TestUtils.BlockDeletingServiceTestImpl; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.utils.MetadataKeyFilters; +import org.apache.hadoop.utils.MetadataStore; +import org.junit.Assert; +import org.junit.Test; +import org.junit.BeforeClass; +import org.junit.Before; +import org.junit.After; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL; +import static org.apache.hadoop.ozone.container + .ContainerTestHelper.createSingleNodePipeline; + +/** + * Tests to test block deleting service. + */ +public class TestBlockDeletingService { + + private static final Logger LOG = + LoggerFactory.getLogger(TestBlockDeletingService.class); + + private static File testRoot; + private static File containersDir; + private static File chunksDir; + + @BeforeClass + public static void init() { + testRoot = GenericTestUtils + .getTestDir(TestBlockDeletingService.class.getSimpleName()); + chunksDir = new File(testRoot, "chunks"); + containersDir = new File(testRoot, "containers"); + } + + @Before + public void setup() throws IOException { + if (chunksDir.exists()) { + FileUtils.deleteDirectory(chunksDir); + } + } + + @After + public void cleanup() throws IOException { + FileUtils.deleteDirectory(chunksDir); + FileUtils.deleteDirectory(containersDir); + FileUtils.deleteDirectory(testRoot); + } + + private ContainerManager createContainerManager(Configuration conf) + throws Exception { + conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, + containersDir.getAbsolutePath()); + if (containersDir.exists()) { + FileUtils.deleteDirectory(containersDir); + } + ContainerManager containerManager = new ContainerManagerImpl(); + List pathLists = new LinkedList<>(); + pathLists.add(StorageLocation.parse(containersDir.getAbsolutePath())); + containerManager.init(conf, pathLists); + return containerManager; + } + + /** + * A helper method to create some blocks and put them under deletion + * state for testing. This method directly updates container.db and + * creates some fake chunk files for testing. + */ + private void createToDeleteBlocks(ContainerManager mgr, + Configuration conf, int numOfContainers, int numOfBlocksPerContainer, + int numOfChunksPerBlock, File chunkDir) throws IOException { + for (int x = 0; x < numOfContainers; x++) { + String containerName = OzoneUtils.getRequestID(); + ContainerData data = new ContainerData(containerName); + mgr.createContainer(createSingleNodePipeline(containerName), data); + data = mgr.readContainer(containerName); + MetadataStore metadata = KeyUtils.getDB(data, conf); + for (int j = 0; j chunks = Lists.newArrayList(); + for (int k = 0; k service.getTimesOfProcessed() == timesOfProcessed, 100, 3000); + } + + /** + * Get under deletion blocks count from DB, + * note this info is parsed from container.db. + */ + private int getUnderDeletionBlocksCount(MetadataStore meta) + throws IOException { + List> underDeletionBlocks = + meta.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter( + OzoneConsts.DELETING_KEY_PREFIX)); + return underDeletionBlocks.size(); + } + + @Test + public void testBlockDeletion() throws Exception { + Configuration conf = new OzoneConfiguration(); + conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10); + conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2); + ContainerManager containerManager = createContainerManager(conf); + createToDeleteBlocks(containerManager, conf, 1, 3, 1, chunksDir); + + BlockDeletingServiceTestImpl svc = + new BlockDeletingServiceTestImpl(containerManager, 1000, conf); + svc.start(); + GenericTestUtils.waitFor(() -> svc.isStarted(), 100, 3000); + + // Ensure 1 container was created + List containerData = Lists.newArrayList(); + containerManager.listContainer(null, 1, "", containerData); + Assert.assertEquals(1, containerData.size()); + MetadataStore meta = KeyUtils.getDB(containerData.get(0), conf); + + // Ensure there is 100 blocks under deletion + Assert.assertEquals(3, getUnderDeletionBlocksCount(meta)); + + // An interval will delete 1 * 2 blocks + deleteAndWait(svc, 1); + Assert.assertEquals(1, getUnderDeletionBlocksCount(meta)); + + deleteAndWait(svc, 2); + Assert.assertEquals(0, getUnderDeletionBlocksCount(meta)); + + deleteAndWait(svc, 3); + Assert.assertEquals(0, getUnderDeletionBlocksCount(meta)); + + svc.shutdown(); + } + + @Test + public void testShutdownService() throws Exception { + Configuration conf = new OzoneConfiguration(); + conf.setInt(OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 500); + conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10); + conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 10); + ContainerManager containerManager = createContainerManager(conf); + // Create 1 container with 100 blocks + createToDeleteBlocks(containerManager, conf, 1, 100, 1, chunksDir); + + BlockDeletingServiceTestImpl service = + new BlockDeletingServiceTestImpl(containerManager, 1000, conf); + service.start(); + GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000); + + // Run some deleting tasks and verify there are threads running + service.runDeletingTasks(); + GenericTestUtils.waitFor(() -> service.getThreadCount() > 0, 100, 1000); + + // Wait for 1 or 2 intervals + Thread.sleep(1000); + + // Shutdown service and verify all threads are stopped + service.shutdown(); + GenericTestUtils.waitFor(() -> service.getThreadCount() == 0, 100, 1000); + } + + @Test(timeout = 30000) + public void testContainerThrottle() throws Exception { + // Properties : + // - Number of containers : 2 + // - Number of blocks per container : 1 + // - Number of chunks per block : 10 + // - Container limit per interval : 1 + // - Block limit per container : 1 + // + // Each time only 1 container can be processed, so each time + // 1 block from 1 container can be deleted. + Configuration conf = new OzoneConfiguration(); + // Process 1 container per interval + conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 1); + conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 1); + ContainerManager containerManager = createContainerManager(conf); + createToDeleteBlocks(containerManager, conf, 2, 1, 10, chunksDir); + + BlockDeletingServiceTestImpl service = + new BlockDeletingServiceTestImpl(containerManager, 1000, conf); + service.start(); + + try { + GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000); + // 1st interval processes 1 container 1 block and 10 chunks + deleteAndWait(service, 1); + Assert.assertEquals(10, chunksDir.listFiles().length); + } finally { + service.shutdown(); + } + } + + + @Test(timeout = 30000) + public void testBlockThrottle() throws Exception { + // Properties : + // - Number of containers : 5 + // - Number of blocks per container : 3 + // - Number of chunks per block : 1 + // - Container limit per interval : 10 + // - Block limit per container : 2 + // + // Each time containers can be all scanned, but only 2 blocks + // per container can be actually deleted. So it requires 2 waves + // to cleanup all blocks. + Configuration conf = new OzoneConfiguration(); + conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10); + conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2); + ContainerManager containerManager = createContainerManager(conf); + createToDeleteBlocks(containerManager, conf, 5, 3, 1, chunksDir); + + // Make sure chunks are created + Assert.assertEquals(15, chunksDir.listFiles().length); + + BlockDeletingServiceTestImpl service = + new BlockDeletingServiceTestImpl(containerManager, 1000, conf); + service.start(); + + try { + GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000); + // Total blocks = 3 * 5 = 15 + // block per task = 2 + // number of containers = 5 + // each interval will at most runDeletingTasks 5 * 2 = 10 blocks + deleteAndWait(service, 1); + Assert.assertEquals(5, chunksDir.listFiles().length); + + // There is only 5 blocks left to runDeletingTasks + deleteAndWait(service, 2); + Assert.assertEquals(0, chunksDir.listFiles().length); + } finally { + service.shutdown(); + } + } +}