diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 5fe31ce4ae..4b5a255dd4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -192,7 +192,7 @@ public int compare(File f1, File f2) { if (addReplicaThreadPool == null) { // initialize add replica fork join pool - initializeAddReplicaPool(conf); + initializeAddReplicaPool(conf, (FsDatasetImpl) volume.getDataset()); } // Make the dfs usage to be saved during shutdown. shutdownHook = new Runnable() { @@ -208,9 +208,9 @@ public void run() { SHUTDOWN_HOOK_PRIORITY); } - private synchronized void initializeAddReplicaPool(Configuration conf) { + private synchronized static void initializeAddReplicaPool(Configuration conf, + FsDatasetImpl dataset) { if (addReplicaThreadPool == null) { - FsDatasetImpl dataset = (FsDatasetImpl) volume.getDataset(); int numberOfBlockPoolSlice = dataset.getVolumeCount() * dataset.getBPServiceCount(); int poolsize = Math.max(numberOfBlockPoolSlice, @@ -1050,4 +1050,15 @@ protected void compute() { public static int getAddReplicaForkPoolSize() { return addReplicaThreadPool.getPoolSize(); } + + @VisibleForTesting + public ForkJoinPool getAddReplicaThreadPool() { + return addReplicaThreadPool; + } + + @VisibleForTesting + public static void reInitializeAddReplicaThreadPool() { + addReplicaThreadPool.shutdown(); + addReplicaThreadPool = null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 581a7a8d01..b22742676d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -27,7 +27,9 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; @@ -48,6 +50,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; @@ -328,6 +331,7 @@ public void testDfsReservedPercentageForDifferentStorageTypes() @Test(timeout = 60000) public void testAddRplicaProcessorForAddingReplicaInMap() throws Exception { + BlockPoolSlice.reInitializeAddReplicaThreadPool(); Configuration cnf = new Configuration(); int poolSize = 5; cnf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); @@ -374,7 +378,29 @@ public void run() { vol.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap); assertTrue("Failed to add all the replica to map", volumeMap.replicas(bpid) .size() == 1000); - assertTrue("Fork pool size should be " + poolSize, - BlockPoolSlice.getAddReplicaForkPoolSize() == poolSize); + assertEquals("Fork pool should be initialize with configured pool size", + poolSize, BlockPoolSlice.getAddReplicaForkPoolSize()); + } + + @Test(timeout = 60000) + public void testInstanceOfAddReplicaThreadPool() throws Exception { + // Start cluster with multiple namespace + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder( + new HdfsConfiguration()) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) + .numDataNodes(1).build()) { + cluster.waitActive(); + FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0) + .getFSDataset(); + FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences() + .get(0); + ForkJoinPool threadPool1 = vol.getBlockPoolSlice( + cluster.getNamesystem(0).getBlockPoolId()).getAddReplicaThreadPool(); + ForkJoinPool threadPool2 = vol.getBlockPoolSlice( + cluster.getNamesystem(1).getBlockPoolId()).getAddReplicaThreadPool(); + assertEquals( + "Thread pool instance should be same in all the BlockPoolSlice", + threadPool1, threadPool2); + } } } \ No newline at end of file