HDFS-15010. BlockPoolSlice#addReplicaThreadPool static pool should be initialized by static method. Contributed by Surendra Singh Lilhore.

This commit is contained in:
Surendra Singh Lilhore 2019-11-28 23:49:35 +05:30
parent 46166bd8d1
commit 0384687811
2 changed files with 42 additions and 5 deletions

View File

@ -192,7 +192,7 @@ public int compare(File f1, File f2) {
if (addReplicaThreadPool == null) {
// initialize add replica fork join pool
initializeAddReplicaPool(conf);
initializeAddReplicaPool(conf, (FsDatasetImpl) volume.getDataset());
}
// Make the dfs usage to be saved during shutdown.
shutdownHook = new Runnable() {
@ -208,9 +208,9 @@ public void run() {
SHUTDOWN_HOOK_PRIORITY);
}
private synchronized void initializeAddReplicaPool(Configuration conf) {
private synchronized static void initializeAddReplicaPool(Configuration conf,
FsDatasetImpl dataset) {
if (addReplicaThreadPool == null) {
FsDatasetImpl dataset = (FsDatasetImpl) volume.getDataset();
int numberOfBlockPoolSlice = dataset.getVolumeCount()
* dataset.getBPServiceCount();
int poolsize = Math.max(numberOfBlockPoolSlice,
@ -1050,4 +1050,15 @@ protected void compute() {
public static int getAddReplicaForkPoolSize() {
return addReplicaThreadPool.getPoolSize();
}
@VisibleForTesting
public ForkJoinPool getAddReplicaThreadPool() {
return addReplicaThreadPool;
}
@VisibleForTesting
public static void reInitializeAddReplicaThreadPool() {
addReplicaThreadPool.shutdown();
addReplicaThreadPool = null;
}
}

View File

@ -27,7 +27,9 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@ -48,6 +50,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
@ -328,6 +331,7 @@ public void testDfsReservedPercentageForDifferentStorageTypes()
@Test(timeout = 60000)
public void testAddRplicaProcessorForAddingReplicaInMap() throws Exception {
BlockPoolSlice.reInitializeAddReplicaThreadPool();
Configuration cnf = new Configuration();
int poolSize = 5;
cnf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
@ -374,7 +378,29 @@ public void run() {
vol.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
assertTrue("Failed to add all the replica to map", volumeMap.replicas(bpid)
.size() == 1000);
assertTrue("Fork pool size should be " + poolSize,
BlockPoolSlice.getAddReplicaForkPoolSize() == poolSize);
assertEquals("Fork pool should be initialize with configured pool size",
poolSize, BlockPoolSlice.getAddReplicaForkPoolSize());
}
@Test(timeout = 60000)
public void testInstanceOfAddReplicaThreadPool() throws Exception {
// Start cluster with multiple namespace
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(
new HdfsConfiguration())
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.numDataNodes(1).build()) {
cluster.waitActive();
FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
.getFSDataset();
FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences()
.get(0);
ForkJoinPool threadPool1 = vol.getBlockPoolSlice(
cluster.getNamesystem(0).getBlockPoolId()).getAddReplicaThreadPool();
ForkJoinPool threadPool2 = vol.getBlockPoolSlice(
cluster.getNamesystem(1).getBlockPoolId()).getAddReplicaThreadPool();
assertEquals(
"Thread pool instance should be same in all the BlockPoolSlice",
threadPool1, threadPool2);
}
}
}