diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 6212e65e01..22bb9056d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -28,8 +28,9 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -204,15 +205,20 @@ public static void updateBlocksMap(INodeFile file, BlockManager bm) { private final FSDirectory dir; private final FSNamesystem fsn; private final FSImageFormatProtobuf.Loader parent; - private ReentrantLock cacheNameMapLock; - private ReentrantLock blockMapLock; + + // Update blocks map by single thread asynchronously + private ExecutorService blocksMapUpdateExecutor; + // update name cache by single thread asynchronously. + private ExecutorService nameCacheUpdateExecutor; Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) { this.fsn = fsn; this.dir = fsn.dir; this.parent = parent; - cacheNameMapLock = new ReentrantLock(true); - blockMapLock = new ReentrantLock(true); + // Note: these executors must be SingleThreadExecutor, as they + // are used to modify structures which are not thread safe. + blocksMapUpdateExecutor = Executors.newSingleThreadExecutor(); + nameCacheUpdateExecutor = Executors.newSingleThreadExecutor(); } void loadINodeDirectorySectionInParallel(ExecutorService service, @@ -263,7 +269,6 @@ void loadINodeDirectorySectionInParallel(ExecutorService service, void loadINodeDirectorySection(InputStream in) throws IOException { final List refList = parent.getLoaderContext() .getRefList(); - ArrayList inodeList = new ArrayList<>(); while (true) { INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry .parseDelimitedFrom(in); @@ -274,15 +279,7 @@ void loadINodeDirectorySection(InputStream in) throws IOException { INodeDirectory p = dir.getInode(e.getParent()).asDirectory(); for (long id : e.getChildrenList()) { INode child = dir.getInode(id); - if (addToParent(p, child)) { - if (child.isFile()) { - inodeList.add(child); - } - if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) { - addToCacheAndBlockMap(inodeList); - inodeList.clear(); - } - } else { + if (!addToParent(p, child)) { LOG.warn("Failed to add the inode {} to the directory {}", child.getId(), p.getId()); } @@ -290,40 +287,79 @@ void loadINodeDirectorySection(InputStream in) throws IOException { for (int refId : e.getRefChildrenList()) { INodeReference ref = refList.get(refId); - if (addToParent(p, ref)) { - if (ref.isFile()) { - inodeList.add(ref); - } - if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) { - addToCacheAndBlockMap(inodeList); - inodeList.clear(); - } - } else { + if (!addToParent(p, ref)) { LOG.warn("Failed to add the inode reference {} to the directory {}", ref.getId(), p.getId()); } } } - addToCacheAndBlockMap(inodeList); } - private void addToCacheAndBlockMap(ArrayList inodeList) { - try { - cacheNameMapLock.lock(); - for (INode i : inodeList) { - dir.cacheName(i); - } - } finally { - cacheNameMapLock.unlock(); + private void fillUpInodeList(ArrayList inodeList, INode inode) { + if (inode.isFile()) { + inodeList.add(inode); } + if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) { + addToCacheAndBlockMap(inodeList); + inodeList.clear(); + } + } - try { - blockMapLock.lock(); - for (INode i : inodeList) { - updateBlocksMap(i.asFile(), fsn.getBlockManager()); + private void addToCacheAndBlockMap(final ArrayList inodeList) { + final ArrayList inodes = new ArrayList<>(inodeList); + nameCacheUpdateExecutor.submit( + new Runnable() { + @Override + public void run() { + addToCacheInternal(inodes); + } + }); + blocksMapUpdateExecutor.submit( + new Runnable() { + @Override + public void run() { + updateBlockMapInternal(inodes); + } + }); + } + + // update name cache with non-thread safe + private void addToCacheInternal(ArrayList inodeList) { + for (INode i : inodeList) { + dir.cacheName(i); + } + } + + // update blocks map with non-thread safe + private void updateBlockMapInternal(ArrayList inodeList) { + for (INode i : inodeList) { + updateBlocksMap(i.asFile(), fsn.getBlockManager()); + } + } + + void waitBlocksMapAndNameCacheUpdateFinished() throws IOException { + long start = System.currentTimeMillis(); + waitExecutorTerminated(blocksMapUpdateExecutor); + waitExecutorTerminated(nameCacheUpdateExecutor); + LOG.info("Completed update blocks map and name cache, total waiting " + + "duration {}ms.", (System.currentTimeMillis() - start)); + } + + private void waitExecutorTerminated(ExecutorService executorService) + throws IOException { + executorService.shutdown(); + long start = System.currentTimeMillis(); + while (!executorService.isTerminated()) { + try { + executorService.awaitTermination(1, TimeUnit.SECONDS); + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting to executor service terminated duration {}ms.", + (System.currentTimeMillis() - start)); + } + } catch (InterruptedException e) { + LOG.error("Interrupted waiting for executor terminated.", e); + throw new IOException(e); } - } finally { - blockMapLock.unlock(); } } @@ -340,6 +376,7 @@ private int loadINodesInSection(InputStream in, Counter counter) // As the input stream is a LimitInputStream, the reading will stop when // EOF is encountered at the end of the stream. int cntr = 0; + ArrayList inodeList = new ArrayList<>(); while (true) { INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in); if (p == null) { @@ -354,12 +391,16 @@ private int loadINodesInSection(InputStream in, Counter counter) synchronized(this) { dir.addToInodeMap(n); } + fillUpInodeList(inodeList, n); } cntr++; if (counter != null) { counter.increment(); } } + if (inodeList.size() > 0){ + addToCacheAndBlockMap(inodeList); + } return cntr; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index be21d1f80f..d34da5f6be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -447,6 +447,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) { } else { inodeLoader.loadINodeDirectorySection(in); } + inodeLoader.waitBlocksMapAndNameCacheUpdateFinished(); break; case FILES_UNDERCONSTRUCTION: inodeLoader.loadFilesUnderConstructionSection(in); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java index 793a749be2..39a0f15c8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java @@ -36,6 +36,7 @@ import java.util.EnumSet; import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.Block; @@ -49,6 +50,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.NativeCodeLoader; @@ -1152,4 +1154,63 @@ private void ensureSubSectionsAlignWithParent(ArrayList
subSec, // The first sub-section and parent section should have the same offset assertEquals(parent.getOffset(), subSec.get(0).getOffset()); } + + @Test + public void testUpdateBlocksMapAndNameCacheAsync() throws IOException { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + FSDirectory fsdir = cluster.getNameNode().namesystem.getFSDirectory(); + File workingDir = GenericTestUtils.getTestDir(); + + File preRestartTree = new File(workingDir, "preRestartTree"); + File postRestartTree = new File(workingDir, "postRestartTree"); + + Path baseDir = new Path("/user/foo"); + fs.mkdirs(baseDir); + fs.allowSnapshot(baseDir); + for (int i = 0; i < 5; i++) { + Path dir = new Path(baseDir, Integer.toString(i)); + fs.mkdirs(dir); + for (int j = 0; j < 5; j++) { + Path file = new Path(dir, Integer.toString(j)); + FSDataOutputStream os = fs.create(file); + os.write((byte) j); + os.close(); + } + fs.createSnapshot(baseDir, "snap_"+i); + fs.rename(new Path(dir, "0"), new Path(dir, "renamed")); + } + SnapshotTestHelper.dumpTree2File(fsdir, preRestartTree); + + // checkpoint + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + + cluster.restartNameNode(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + fsdir = cluster.getNameNode().namesystem.getFSDirectory(); + + // Ensure all the files created above exist, and blocks is correct. + for (int i = 0; i < 5; i++) { + Path dir = new Path(baseDir, Integer.toString(i)); + assertTrue(fs.getFileStatus(dir).isDirectory()); + for (int j = 0; j < 5; j++) { + Path file = new Path(dir, Integer.toString(j)); + if (j == 0) { + file = new Path(dir, "renamed"); + } + FSDataInputStream in = fs.open(file); + int n = in.readByte(); + assertEquals(j, n); + in.close(); + } + } + SnapshotTestHelper.dumpTree2File(fsdir, postRestartTree); + SnapshotTestHelper.compareDumpedTreeInFile( + preRestartTree, postRestartTree, true); + } } \ No newline at end of file