From 5689355783de005ebc604f4403dc5129a286bfca Mon Sep 17 00:00:00 2001 From: Yiqun Lin Date: Tue, 2 Oct 2018 09:43:14 +0800 Subject: [PATCH] HDFS-13768. Adding replicas to volume map makes DataNode start slowly. Contributed by Surendra Singh Lilhore. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../fsdataset/impl/BlockPoolSlice.java | 177 ++++++++++++++++-- .../fsdataset/impl/FsDatasetImpl.java | 14 ++ .../fsdataset/impl/FsDatasetUtil.java | 30 +-- .../datanode/fsdataset/impl/FsVolumeList.java | 5 +- .../datanode/fsdataset/impl/ReplicaMap.java | 25 +++ .../src/main/resources/hdfs-default.xml | 9 + .../fsdataset/impl/TestFsVolumeList.java | 64 ++++++- 9 files changed, 300 insertions(+), 29 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index a7e7b9bf87..d8024dcc67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -365,6 +365,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_CONTENT_SUMMARY_SLEEP_MICROSEC_DEFAULT = 500; public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated"; public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0; + public static final String + DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY = + "dfs.datanode.volumes.replica-add.threadpool.size"; public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose"; public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false; public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c9803958ad..270e30b24e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1695,7 +1695,7 @@ BPOfferService getBPOfferService(String bpid){ return blockPoolManager.get(bpid); } - int getBpOsCount() { + public int getBpOsCount() { return blockPoolManager.getAllNamenodeThreads().size(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 2adfb6bb52..b9b581fedc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -28,8 +28,19 @@ import java.io.OutputStreamWriter; import java.io.RandomAccessFile; import java.io.Writer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; +import java.util.List; +import java.util.Queue; import java.util.Scanner; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.RecursiveAction; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; @@ -52,8 +63,8 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; - import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker; @@ -96,6 +107,17 @@ class BlockPoolSlice { private final int maxDataLength; private final FileIoProvider fileIoProvider; + private static ForkJoinPool addReplicaThreadPool = null; + private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime + .getRuntime().availableProcessors(); + private static final Comparator FILE_COMPARATOR = + new Comparator() { + @Override + public int compare(File f1, File f2) { + return f1.getName().compareTo(f2.getName()); + } + }; + // TODO:FEDERATION scalability issue - a thread per DU is needed private final GetSpaceUsed dfsUsage; @@ -161,13 +183,15 @@ class BlockPoolSlice { .setConf(conf) .setInitialUsed(loadDfsUsed()) .build(); - + // initialize add replica fork join pool + initializeAddReplicaPool(conf); // Make the dfs usage to be saved during shutdown. shutdownHook = new Runnable() { @Override public void run() { if (!dfsUsedSaved) { saveDfsUsed(); + addReplicaThreadPool.shutdownNow(); } } }; @@ -175,6 +199,21 @@ public void run() { SHUTDOWN_HOOK_PRIORITY); } + private synchronized void initializeAddReplicaPool(Configuration conf) { + if (addReplicaThreadPool == null) { + FsDatasetImpl dataset = (FsDatasetImpl) volume.getDataset(); + int numberOfBlockPoolSlice = dataset.getVolumeCount() + * dataset.getBPServiceCount(); + int poolsize = Math.max(numberOfBlockPoolSlice, + VOLUMES_REPLICA_ADD_THREADPOOL_SIZE); + // Default pool sizes is max of (volume * number of bp_service) and + // number of processor. + addReplicaThreadPool = new ForkJoinPool(conf.getInt( + DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY, + poolsize)); + } + } + File getDirectory() { return currentDir.getParentFile(); } @@ -374,10 +413,55 @@ void getVolumeMap(ReplicaMap volumeMap, boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap); if (!success) { + List exceptions = Collections + .synchronizedList(new ArrayList()); + Queue subTaskQueue = + new ConcurrentLinkedQueue(); + // add finalized replicas - addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true); + AddReplicaProcessor task = new AddReplicaProcessor(volumeMap, + finalizedDir, lazyWriteReplicaMap, true, exceptions, subTaskQueue); + ForkJoinTask finalizedTask = addReplicaThreadPool.submit(task); + // add rbw replicas - addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false); + task = new AddReplicaProcessor(volumeMap, rbwDir, lazyWriteReplicaMap, + false, exceptions, subTaskQueue); + ForkJoinTask rbwTask = addReplicaThreadPool.submit(task); + + try { + finalizedTask.get(); + rbwTask.get(); + } catch (InterruptedException | ExecutionException e) { + exceptions.add(new IOException( + "Failed to start sub tasks to add replica in replica map :" + + e.getMessage())); + } + + //wait for all the tasks to finish. + waitForSubTaskToFinish(subTaskQueue, exceptions); + } + } + + /** + * Wait till all the recursive task for add replica to volume completed. + * + * @param subTaskQueue + * {@link AddReplicaProcessor} tasks list. + * @param exceptions + * exceptions occurred in sub tasks. + * @throws IOException + * throw if any sub task or multiple sub tasks failed. + */ + private void waitForSubTaskToFinish(Queue subTaskQueue, + List exceptions) throws IOException { + while (!subTaskQueue.isEmpty()) { + RecursiveAction task = subTaskQueue.poll(); + if (task != null) { + task.join(); + } + } + if (!exceptions.isEmpty()) { + throw MultipleIOException.createIOException(exceptions); } } @@ -526,10 +610,10 @@ private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap, } } - ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId()); - if (oldReplica == null) { - volumeMap.add(bpid, newReplica); - } else { + ReplicaInfo tmpReplicaInfo = volumeMap.addAndGet(bpid, newReplica); + ReplicaInfo oldReplica = (tmpReplicaInfo == newReplica) ? null + : tmpReplicaInfo; + if (oldReplica != null) { // We have multiple replicas of the same block so decide which one // to keep. newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap); @@ -558,15 +642,23 @@ private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap, * storage. * @param isFinalized true if the directory has finalized replicas; * false if the directory has rbw replicas + * @param exceptions list of exception which need to return to parent thread. + * @param subTaskQueue queue of sub tasks */ void addToReplicasMap(ReplicaMap volumeMap, File dir, - final RamDiskReplicaTracker lazyWriteReplicaMap, - boolean isFinalized) + final RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized, + List exceptions, Queue subTaskQueue) throws IOException { File[] files = fileIoProvider.listFiles(volume, dir); - for (File file : files) { + Arrays.sort(files, FILE_COMPARATOR); + for (int i = 0; i < files.length; i++) { + File file = files[i]; if (file.isDirectory()) { - addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized); + // Launch new sub task. + AddReplicaProcessor subTask = new AddReplicaProcessor(volumeMap, file, + lazyWriteReplicaMap, isFinalized, exceptions, subTaskQueue); + subTask.fork(); + subTaskQueue.add(subTask); } if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) { @@ -581,7 +673,7 @@ void addToReplicasMap(ReplicaMap volumeMap, File dir, } long genStamp = FsDatasetUtil.getGenerationStampFromFile( - files, file); + files, file, i); long blockId = Block.filename2id(file.getName()); Block block = new Block(blockId, file.length(), genStamp); addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap, @@ -886,4 +978,63 @@ void decrNumBlocks() { public long getNumOfBlocks() { return numOfBlocks.get(); } + + /** + * Recursive action for add replica in map. + */ + class AddReplicaProcessor extends RecursiveAction { + + private ReplicaMap volumeMap; + private File dir; + private RamDiskReplicaTracker lazyWriteReplicaMap; + private boolean isFinalized; + private List exceptions; + private Queue subTaskQueue; + + /** + * @param volumeMap + * the replicas map + * @param dir + * an input directory + * @param lazyWriteReplicaMap + * Map of replicas on transient storage. + * @param isFinalized + * true if the directory has finalized replicas; false if the + * directory has rbw replicas + * @param exceptions + * List of exception which need to return to parent thread. + * @param subTaskQueue + * queue of sub tasks + */ + AddReplicaProcessor(ReplicaMap volumeMap, File dir, + RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized, + List exceptions, Queue subTaskQueue) { + this.volumeMap = volumeMap; + this.dir = dir; + this.lazyWriteReplicaMap = lazyWriteReplicaMap; + this.isFinalized = isFinalized; + this.exceptions = exceptions; + this.subTaskQueue = subTaskQueue; + } + + @Override + protected void compute() { + try { + addToReplicasMap(volumeMap, dir, lazyWriteReplicaMap, isFinalized, + exceptions, subTaskQueue); + } catch (IOException e) { + LOG.warn("Caught exception while adding replicas from " + volume + + " in subtask. Will throw later.", e); + exceptions.add(e); + } + } + } + + /** + * Return the size of fork pool used for adding replica in map. + */ + @VisibleForTesting + public static int getAddReplicaForkPoolSize() { + return addReplicaThreadPool.getPoolSize(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 27196c2772..027a0bf681 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -3307,6 +3307,20 @@ public void setTimer(Timer newTimer) { this.timer = newTimer; } + /** + * Return the number of BP service count. + */ + public int getBPServiceCount() { + return datanode.getBpOsCount(); + } + + /** + * Return the number of volume. + */ + public int getVolumeCount() { + return volumes.getVolumes().size(); + } + void stopAllDataxceiverThreads(FsVolumeImpl volume) { try (AutoCloseableLock lock = datasetLock.acquire()) { for (String blockPoolId : volumeMap.getBlockPoolList()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java index 9f115a04f0..8a3b237e41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java @@ -117,21 +117,27 @@ public static FileDescriptor openAndSeek(File file, long offset) } /** - * Find the meta-file for the specified block file - * and then return the generation stamp from the name of the meta-file. + * Find the meta-file for the specified block file and then return the + * generation stamp from the name of the meta-file. Generally meta file will + * be the next file in sorted array of file's. + * + * @param listdir + * sorted list of file based on name. + * @param blockFile + * block file for which generation stamp is needed. + * @param index + * index of block file in array. + * @return generation stamp for block file. */ - static long getGenerationStampFromFile(File[] listdir, File blockFile) - throws IOException { + static long getGenerationStampFromFile(File[] listdir, File blockFile, + int index) { String blockName = blockFile.getName(); - for (int j = 0; j < listdir.length; j++) { - String path = listdir[j].getName(); - if (!path.startsWith(blockName)) { - continue; + if ((index + 1) < listdir.length) { + // Check if next index file is meta file + String metaFile = listdir[index + 1].getName(); + if (metaFile.startsWith(blockName)) { + return Block.getGenerationStamp(metaFile); } - if (blockFile.getCanonicalPath().equals(listdir[j].getCanonicalPath())) { - continue; - } - return Block.getGenerationStamp(listdir[j].getName()); } FsDatasetImpl.LOG.warn("Block " + blockFile + " does not have a metafile!"); return HdfsConstants.GRANDFATHER_GENERATION_STAMP; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index a1804ae477..a0fcb54c69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -226,8 +226,9 @@ public void run() { throw exceptions.get(0); } long totalTimeTaken = Time.monotonicNow() - totalStartTime; - FsDatasetImpl.LOG.info("Total time to add all replicas to map: " - + totalTimeTaken + "ms"); + FsDatasetImpl.LOG + .info("Total time to add all replicas to map for block pool " + bpid + + ": " + totalTimeTaken + "ms"); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index d4fb69b1f0..c630b951a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -135,6 +135,31 @@ ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) { } } + /** + * Add a replica's meta information into the map, if already exist + * return the old replicaInfo. + */ + ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) { + checkBlockPool(bpid); + checkBlock(replicaInfo); + try (AutoCloseableLock l = lock.acquire()) { + FoldedTreeSet set = map.get(bpid); + if (set == null) { + // Add an entry for block pool if it does not exist already + set = new FoldedTreeSet<>(); + map.put(bpid, set); + } + ReplicaInfo oldReplicaInfo = set.get(replicaInfo.getBlockId(), + LONG_AND_BLOCK_COMPARATOR); + if (oldReplicaInfo != null) { + return oldReplicaInfo; + } else { + set.add(replicaInfo); + } + return replicaInfo; + } + } + /** * Add all entries from the given replica map into the local replica map. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1573582c91..2ee8399d2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1281,6 +1281,15 @@ + + dfs.datanode.volumes.replica-add.threadpool.size + + Specifies the maximum number of threads to use for + adding block in volume. Default value for this configuration is + max of (volume * number of bp_service, number of processor). + + + dfs.image.compress false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index f53c21c049..581a7a8d01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -18,11 +18,16 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import com.google.common.base.Supplier; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; @@ -30,6 +35,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.StringUtils; import org.junit.Before; import org.junit.Test; @@ -40,12 +46,16 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -315,4 +325,56 @@ public void testDfsReservedPercentageForDifferentStorageTypes() .build(); assertEquals(600, volume4.getReserved()); } -} + + @Test(timeout = 60000) + public void testAddRplicaProcessorForAddingReplicaInMap() throws Exception { + Configuration cnf = new Configuration(); + int poolSize = 5; + cnf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + cnf.setInt( + DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY, + poolSize); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(cnf).numDataNodes(1) + .storagesPerDatanode(1).build(); + DistributedFileSystem fs = cluster.getFileSystem(); + // Generate data blocks. + ExecutorService pool = Executors.newFixedThreadPool(10); + List> futureList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + Thread thread = new Thread() { + @Override + public void run() { + for (int j = 0; j < 10; j++) { + try { + DFSTestUtil.createFile(fs, new Path("File_" + getName() + j), 10, + (short) 1, 0); + } catch (IllegalArgumentException | IOException e) { + e.printStackTrace(); + } + } + } + }; + thread.setName("FileWriter" + i); + futureList.add(pool.submit(thread)); + } + // Wait for data generation + for (Future f : futureList) { + f.get(); + } + fs.close(); + FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0) + .getFSDataset(); + ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); + RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker + .getInstance(conf, fsDataset); + FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0); + String bpid = cluster.getNamesystem().getBlockPoolId(); + // It will create BlockPoolSlice.AddReplicaProcessor task's and lunch in + // ForkJoinPool recursively + vol.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap); + assertTrue("Failed to add all the replica to map", volumeMap.replicas(bpid) + .size() == 1000); + assertTrue("Fork pool size should be " + poolSize, + BlockPoolSlice.getAddReplicaForkPoolSize() == poolSize); + } +} \ No newline at end of file