HDFS-15493. Update block map and name cache in parallel while loading fsimage. Contributed by Chengwei Wang

This commit is contained in:
S O'Donnell 2020-08-12 09:02:47 +01:00
parent 141c62584b
commit 10716040a8
3 changed files with 142 additions and 39 deletions

View File

@ -28,8 +28,9 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -204,15 +205,20 @@ public static void updateBlocksMap(INodeFile file, BlockManager bm) {
private final FSDirectory dir;
private final FSNamesystem fsn;
private final FSImageFormatProtobuf.Loader parent;
private ReentrantLock cacheNameMapLock;
private ReentrantLock blockMapLock;
// Update blocks map by single thread asynchronously
private ExecutorService blocksMapUpdateExecutor;
// update name cache by single thread asynchronously.
private ExecutorService nameCacheUpdateExecutor;
Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
this.fsn = fsn;
this.dir = fsn.dir;
this.parent = parent;
cacheNameMapLock = new ReentrantLock(true);
blockMapLock = new ReentrantLock(true);
// Note: these executors must be SingleThreadExecutor, as they
// are used to modify structures which are not thread safe.
blocksMapUpdateExecutor = Executors.newSingleThreadExecutor();
nameCacheUpdateExecutor = Executors.newSingleThreadExecutor();
}
void loadINodeDirectorySectionInParallel(ExecutorService service,
@ -263,7 +269,6 @@ void loadINodeDirectorySectionInParallel(ExecutorService service,
void loadINodeDirectorySection(InputStream in) throws IOException {
final List<INodeReference> refList = parent.getLoaderContext()
.getRefList();
ArrayList<INode> inodeList = new ArrayList<>();
while (true) {
INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
.parseDelimitedFrom(in);
@ -274,15 +279,7 @@ void loadINodeDirectorySection(InputStream in) throws IOException {
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
for (long id : e.getChildrenList()) {
INode child = dir.getInode(id);
if (addToParent(p, child)) {
if (child.isFile()) {
inodeList.add(child);
}
if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
addToCacheAndBlockMap(inodeList);
inodeList.clear();
}
} else {
if (!addToParent(p, child)) {
LOG.warn("Failed to add the inode {} to the directory {}",
child.getId(), p.getId());
}
@ -290,40 +287,79 @@ void loadINodeDirectorySection(InputStream in) throws IOException {
for (int refId : e.getRefChildrenList()) {
INodeReference ref = refList.get(refId);
if (addToParent(p, ref)) {
if (ref.isFile()) {
inodeList.add(ref);
}
if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
addToCacheAndBlockMap(inodeList);
inodeList.clear();
}
} else {
if (!addToParent(p, ref)) {
LOG.warn("Failed to add the inode reference {} to the directory {}",
ref.getId(), p.getId());
}
}
}
addToCacheAndBlockMap(inodeList);
}
private void addToCacheAndBlockMap(ArrayList<INode> inodeList) {
try {
cacheNameMapLock.lock();
private void fillUpInodeList(ArrayList<INode> inodeList, INode inode) {
if (inode.isFile()) {
inodeList.add(inode);
}
if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
addToCacheAndBlockMap(inodeList);
inodeList.clear();
}
}
private void addToCacheAndBlockMap(final ArrayList<INode> inodeList) {
final ArrayList<INode> inodes = new ArrayList<>(inodeList);
nameCacheUpdateExecutor.submit(
new Runnable() {
@Override
public void run() {
addToCacheInternal(inodes);
}
});
blocksMapUpdateExecutor.submit(
new Runnable() {
@Override
public void run() {
updateBlockMapInternal(inodes);
}
});
}
// update name cache with non-thread safe
private void addToCacheInternal(ArrayList<INode> inodeList) {
for (INode i : inodeList) {
dir.cacheName(i);
}
} finally {
cacheNameMapLock.unlock();
}
try {
blockMapLock.lock();
// update blocks map with non-thread safe
private void updateBlockMapInternal(ArrayList<INode> inodeList) {
for (INode i : inodeList) {
updateBlocksMap(i.asFile(), fsn.getBlockManager());
}
} finally {
blockMapLock.unlock();
}
void waitBlocksMapAndNameCacheUpdateFinished() throws IOException {
long start = System.currentTimeMillis();
waitExecutorTerminated(blocksMapUpdateExecutor);
waitExecutorTerminated(nameCacheUpdateExecutor);
LOG.info("Completed update blocks map and name cache, total waiting "
+ "duration {}ms.", (System.currentTimeMillis() - start));
}
private void waitExecutorTerminated(ExecutorService executorService)
throws IOException {
executorService.shutdown();
long start = System.currentTimeMillis();
while (!executorService.isTerminated()) {
try {
executorService.awaitTermination(1, TimeUnit.SECONDS);
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting to executor service terminated duration {}ms.",
(System.currentTimeMillis() - start));
}
} catch (InterruptedException e) {
LOG.error("Interrupted waiting for executor terminated.", e);
throw new IOException(e);
}
}
}
@ -340,6 +376,7 @@ private int loadINodesInSection(InputStream in, Counter counter)
// As the input stream is a LimitInputStream, the reading will stop when
// EOF is encountered at the end of the stream.
int cntr = 0;
ArrayList<INode> inodeList = new ArrayList<>();
while (true) {
INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
if (p == null) {
@ -354,12 +391,16 @@ private int loadINodesInSection(InputStream in, Counter counter)
synchronized(this) {
dir.addToInodeMap(n);
}
fillUpInodeList(inodeList, n);
}
cntr++;
if (counter != null) {
counter.increment();
}
}
if (inodeList.size() > 0){
addToCacheAndBlockMap(inodeList);
}
return cntr;
}

View File

@ -447,6 +447,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
} else {
inodeLoader.loadINodeDirectorySection(in);
}
inodeLoader.waitBlocksMapAndNameCacheUpdateFinished();
break;
case FILES_UNDERCONSTRUCTION:
inodeLoader.loadFilesUnderConstructionSection(in);

View File

@ -36,6 +36,7 @@
import java.util.EnumSet;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.Block;
@ -49,6 +50,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.NativeCodeLoader;
@ -1152,4 +1154,63 @@ private void ensureSubSectionsAlignWithParent(ArrayList<Section> subSec,
// The first sub-section and parent section should have the same offset
assertEquals(parent.getOffset(), subSec.get(0).getOffset());
}
@Test
public void testUpdateBlocksMapAndNameCacheAsync() throws IOException {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
FSDirectory fsdir = cluster.getNameNode().namesystem.getFSDirectory();
File workingDir = GenericTestUtils.getTestDir();
File preRestartTree = new File(workingDir, "preRestartTree");
File postRestartTree = new File(workingDir, "postRestartTree");
Path baseDir = new Path("/user/foo");
fs.mkdirs(baseDir);
fs.allowSnapshot(baseDir);
for (int i = 0; i < 5; i++) {
Path dir = new Path(baseDir, Integer.toString(i));
fs.mkdirs(dir);
for (int j = 0; j < 5; j++) {
Path file = new Path(dir, Integer.toString(j));
FSDataOutputStream os = fs.create(file);
os.write((byte) j);
os.close();
}
fs.createSnapshot(baseDir, "snap_"+i);
fs.rename(new Path(dir, "0"), new Path(dir, "renamed"));
}
SnapshotTestHelper.dumpTree2File(fsdir, preRestartTree);
// checkpoint
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.restartNameNode();
cluster.waitActive();
fs = cluster.getFileSystem();
fsdir = cluster.getNameNode().namesystem.getFSDirectory();
// Ensure all the files created above exist, and blocks is correct.
for (int i = 0; i < 5; i++) {
Path dir = new Path(baseDir, Integer.toString(i));
assertTrue(fs.getFileStatus(dir).isDirectory());
for (int j = 0; j < 5; j++) {
Path file = new Path(dir, Integer.toString(j));
if (j == 0) {
file = new Path(dir, "renamed");
}
FSDataInputStream in = fs.open(file);
int n = in.readByte();
assertEquals(j, n);
in.close();
}
}
SnapshotTestHelper.dumpTree2File(fsdir, postRestartTree);
SnapshotTestHelper.compareDumpedTreeInFile(
preRestartTree, postRestartTree, true);
}
}