diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 042a6272d4..6163d93e43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1115,7 +1115,7 @@ private void initDiskBalancer(FsDatasetSpi data, /** * Shutdown disk balancer. */ - private void shutdownDiskBalancer() { + private void shutdownDiskBalancer() { if (this.diskBalancer != null) { this.diskBalancer.shutdown(); this.diskBalancer = null; @@ -2077,6 +2077,10 @@ public void shutdown() { ipcServer.stop(); } + if (ecWorker != null) { + ecWorker.shutDown(); + } + if(blockPoolManager != null) { try { this.blockPoolManager.shutDownAll(bposArray); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 07d213c55b..63498bc4dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -27,6 +27,8 @@ import org.slf4j.Logger; import java.util.Collection; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -149,7 +151,12 @@ Configuration getConf() { return conf; } - ThreadPoolExecutor getStripedReadPool() { - return stripedReadPool; + CompletionService createReadService() { + return new ExecutorCompletionService<>(stripedReadPool); + } + + public void shutDown() { + stripedReconstructionPool.shutdown(); + stripedReadPool.shutdown(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index 3202121b62..bbffcf5271 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -39,8 +39,6 @@ import java.nio.ByteBuffer; import java.util.BitSet; import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; /** @@ -110,7 +108,7 @@ abstract class StripedReconstructor { // position in striped internal block private long positionInBlock; private StripedReader stripedReader; - private ThreadPoolExecutor stripedReadPool; + private ErasureCodingWorker erasureCodingWorker; private final CachingStrategy cachingStrategy; private long maxTargetLength = 0L; private final BitSet liveBitSet; @@ -122,7 +120,7 @@ abstract class StripedReconstructor { StripedReconstructor(ErasureCodingWorker worker, StripedReconstructionInfo stripedReconInfo) { - this.stripedReadPool = worker.getStripedReadPool(); + this.erasureCodingWorker = worker; this.datanode = worker.getDatanode(); this.conf = worker.getConf(); this.ecPolicy = stripedReconInfo.getEcPolicy(); @@ -225,7 +223,7 @@ CachingStrategy getCachingStrategy() { } CompletionService createReadService() { - return new ExecutorCompletionService<>(stripedReadPool); + return erasureCodingWorker.createReadService(); } ExtendedBlock getBlockGroup() {