HDFS-15493. Update block map and name cache in parallel while loading fsimage. Contributed by Chengwei Wang
(cherry picked from commit 10716040a8
)
This commit is contained in:
parent
1354400e7c
commit
42154f04e3
@ -28,8 +28,9 @@
|
|||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -204,15 +205,20 @@ public static void updateBlocksMap(INodeFile file, BlockManager bm) {
|
|||||||
private final FSDirectory dir;
|
private final FSDirectory dir;
|
||||||
private final FSNamesystem fsn;
|
private final FSNamesystem fsn;
|
||||||
private final FSImageFormatProtobuf.Loader parent;
|
private final FSImageFormatProtobuf.Loader parent;
|
||||||
private ReentrantLock cacheNameMapLock;
|
|
||||||
private ReentrantLock blockMapLock;
|
// Update blocks map by single thread asynchronously
|
||||||
|
private ExecutorService blocksMapUpdateExecutor;
|
||||||
|
// update name cache by single thread asynchronously.
|
||||||
|
private ExecutorService nameCacheUpdateExecutor;
|
||||||
|
|
||||||
Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
|
Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
|
||||||
this.fsn = fsn;
|
this.fsn = fsn;
|
||||||
this.dir = fsn.dir;
|
this.dir = fsn.dir;
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
cacheNameMapLock = new ReentrantLock(true);
|
// Note: these executors must be SingleThreadExecutor, as they
|
||||||
blockMapLock = new ReentrantLock(true);
|
// are used to modify structures which are not thread safe.
|
||||||
|
blocksMapUpdateExecutor = Executors.newSingleThreadExecutor();
|
||||||
|
nameCacheUpdateExecutor = Executors.newSingleThreadExecutor();
|
||||||
}
|
}
|
||||||
|
|
||||||
void loadINodeDirectorySectionInParallel(ExecutorService service,
|
void loadINodeDirectorySectionInParallel(ExecutorService service,
|
||||||
@ -263,7 +269,6 @@ void loadINodeDirectorySectionInParallel(ExecutorService service,
|
|||||||
void loadINodeDirectorySection(InputStream in) throws IOException {
|
void loadINodeDirectorySection(InputStream in) throws IOException {
|
||||||
final List<INodeReference> refList = parent.getLoaderContext()
|
final List<INodeReference> refList = parent.getLoaderContext()
|
||||||
.getRefList();
|
.getRefList();
|
||||||
ArrayList<INode> inodeList = new ArrayList<>();
|
|
||||||
while (true) {
|
while (true) {
|
||||||
INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
|
INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
|
||||||
.parseDelimitedFrom(in);
|
.parseDelimitedFrom(in);
|
||||||
@ -274,15 +279,7 @@ void loadINodeDirectorySection(InputStream in) throws IOException {
|
|||||||
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
|
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
|
||||||
for (long id : e.getChildrenList()) {
|
for (long id : e.getChildrenList()) {
|
||||||
INode child = dir.getInode(id);
|
INode child = dir.getInode(id);
|
||||||
if (addToParent(p, child)) {
|
if (!addToParent(p, child)) {
|
||||||
if (child.isFile()) {
|
|
||||||
inodeList.add(child);
|
|
||||||
}
|
|
||||||
if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
|
|
||||||
addToCacheAndBlockMap(inodeList);
|
|
||||||
inodeList.clear();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG.warn("Failed to add the inode {} to the directory {}",
|
LOG.warn("Failed to add the inode {} to the directory {}",
|
||||||
child.getId(), p.getId());
|
child.getId(), p.getId());
|
||||||
}
|
}
|
||||||
@ -290,40 +287,79 @@ void loadINodeDirectorySection(InputStream in) throws IOException {
|
|||||||
|
|
||||||
for (int refId : e.getRefChildrenList()) {
|
for (int refId : e.getRefChildrenList()) {
|
||||||
INodeReference ref = refList.get(refId);
|
INodeReference ref = refList.get(refId);
|
||||||
if (addToParent(p, ref)) {
|
if (!addToParent(p, ref)) {
|
||||||
if (ref.isFile()) {
|
|
||||||
inodeList.add(ref);
|
|
||||||
}
|
|
||||||
if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
|
|
||||||
addToCacheAndBlockMap(inodeList);
|
|
||||||
inodeList.clear();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG.warn("Failed to add the inode reference {} to the directory {}",
|
LOG.warn("Failed to add the inode reference {} to the directory {}",
|
||||||
ref.getId(), p.getId());
|
ref.getId(), p.getId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
addToCacheAndBlockMap(inodeList);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addToCacheAndBlockMap(ArrayList<INode> inodeList) {
|
private void fillUpInodeList(ArrayList<INode> inodeList, INode inode) {
|
||||||
try {
|
if (inode.isFile()) {
|
||||||
cacheNameMapLock.lock();
|
inodeList.add(inode);
|
||||||
for (INode i : inodeList) {
|
|
||||||
dir.cacheName(i);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
cacheNameMapLock.unlock();
|
|
||||||
}
|
}
|
||||||
|
if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
|
||||||
|
addToCacheAndBlockMap(inodeList);
|
||||||
|
inodeList.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
private void addToCacheAndBlockMap(final ArrayList<INode> inodeList) {
|
||||||
blockMapLock.lock();
|
final ArrayList<INode> inodes = new ArrayList<>(inodeList);
|
||||||
for (INode i : inodeList) {
|
nameCacheUpdateExecutor.submit(
|
||||||
updateBlocksMap(i.asFile(), fsn.getBlockManager());
|
new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
addToCacheInternal(inodes);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
blocksMapUpdateExecutor.submit(
|
||||||
|
new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
updateBlockMapInternal(inodes);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// update name cache with non-thread safe
|
||||||
|
private void addToCacheInternal(ArrayList<INode> inodeList) {
|
||||||
|
for (INode i : inodeList) {
|
||||||
|
dir.cacheName(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// update blocks map with non-thread safe
|
||||||
|
private void updateBlockMapInternal(ArrayList<INode> inodeList) {
|
||||||
|
for (INode i : inodeList) {
|
||||||
|
updateBlocksMap(i.asFile(), fsn.getBlockManager());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void waitBlocksMapAndNameCacheUpdateFinished() throws IOException {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
waitExecutorTerminated(blocksMapUpdateExecutor);
|
||||||
|
waitExecutorTerminated(nameCacheUpdateExecutor);
|
||||||
|
LOG.info("Completed update blocks map and name cache, total waiting "
|
||||||
|
+ "duration {}ms.", (System.currentTimeMillis() - start));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitExecutorTerminated(ExecutorService executorService)
|
||||||
|
throws IOException {
|
||||||
|
executorService.shutdown();
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
while (!executorService.isTerminated()) {
|
||||||
|
try {
|
||||||
|
executorService.awaitTermination(1, TimeUnit.SECONDS);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Waiting to executor service terminated duration {}ms.",
|
||||||
|
(System.currentTimeMillis() - start));
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.error("Interrupted waiting for executor terminated.", e);
|
||||||
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
blockMapLock.unlock();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -340,6 +376,7 @@ private int loadINodesInSection(InputStream in, Counter counter)
|
|||||||
// As the input stream is a LimitInputStream, the reading will stop when
|
// As the input stream is a LimitInputStream, the reading will stop when
|
||||||
// EOF is encountered at the end of the stream.
|
// EOF is encountered at the end of the stream.
|
||||||
int cntr = 0;
|
int cntr = 0;
|
||||||
|
ArrayList<INode> inodeList = new ArrayList<>();
|
||||||
while (true) {
|
while (true) {
|
||||||
INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
|
INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
|
||||||
if (p == null) {
|
if (p == null) {
|
||||||
@ -354,12 +391,16 @@ private int loadINodesInSection(InputStream in, Counter counter)
|
|||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
dir.addToInodeMap(n);
|
dir.addToInodeMap(n);
|
||||||
}
|
}
|
||||||
|
fillUpInodeList(inodeList, n);
|
||||||
}
|
}
|
||||||
cntr++;
|
cntr++;
|
||||||
if (counter != null) {
|
if (counter != null) {
|
||||||
counter.increment();
|
counter.increment();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (inodeList.size() > 0){
|
||||||
|
addToCacheAndBlockMap(inodeList);
|
||||||
|
}
|
||||||
return cntr;
|
return cntr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -447,6 +447,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
|
|||||||
} else {
|
} else {
|
||||||
inodeLoader.loadINodeDirectorySection(in);
|
inodeLoader.loadINodeDirectorySection(in);
|
||||||
}
|
}
|
||||||
|
inodeLoader.waitBlocksMapAndNameCacheUpdateFinished();
|
||||||
break;
|
break;
|
||||||
case FILES_UNDERCONSTRUCTION:
|
case FILES_UNDERCONSTRUCTION:
|
||||||
inodeLoader.loadFilesUnderConstructionSection(in);
|
inodeLoader.loadFilesUnderConstructionSection(in);
|
||||||
|
@ -36,6 +36,7 @@
|
|||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
|
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
@ -49,6 +50,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.util.NativeCodeLoader;
|
import org.apache.hadoop.util.NativeCodeLoader;
|
||||||
@ -1152,4 +1154,63 @@ private void ensureSubSectionsAlignWithParent(ArrayList<Section> subSec,
|
|||||||
// The first sub-section and parent section should have the same offset
|
// The first sub-section and parent section should have the same offset
|
||||||
assertEquals(parent.getOffset(), subSec.get(0).getOffset());
|
assertEquals(parent.getOffset(), subSec.get(0).getOffset());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateBlocksMapAndNameCacheAsync() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
FSDirectory fsdir = cluster.getNameNode().namesystem.getFSDirectory();
|
||||||
|
File workingDir = GenericTestUtils.getTestDir();
|
||||||
|
|
||||||
|
File preRestartTree = new File(workingDir, "preRestartTree");
|
||||||
|
File postRestartTree = new File(workingDir, "postRestartTree");
|
||||||
|
|
||||||
|
Path baseDir = new Path("/user/foo");
|
||||||
|
fs.mkdirs(baseDir);
|
||||||
|
fs.allowSnapshot(baseDir);
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
Path dir = new Path(baseDir, Integer.toString(i));
|
||||||
|
fs.mkdirs(dir);
|
||||||
|
for (int j = 0; j < 5; j++) {
|
||||||
|
Path file = new Path(dir, Integer.toString(j));
|
||||||
|
FSDataOutputStream os = fs.create(file);
|
||||||
|
os.write((byte) j);
|
||||||
|
os.close();
|
||||||
|
}
|
||||||
|
fs.createSnapshot(baseDir, "snap_"+i);
|
||||||
|
fs.rename(new Path(dir, "0"), new Path(dir, "renamed"));
|
||||||
|
}
|
||||||
|
SnapshotTestHelper.dumpTree2File(fsdir, preRestartTree);
|
||||||
|
|
||||||
|
// checkpoint
|
||||||
|
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||||
|
fs.saveNamespace();
|
||||||
|
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||||
|
|
||||||
|
cluster.restartNameNode();
|
||||||
|
cluster.waitActive();
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
fsdir = cluster.getNameNode().namesystem.getFSDirectory();
|
||||||
|
|
||||||
|
// Ensure all the files created above exist, and blocks is correct.
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
Path dir = new Path(baseDir, Integer.toString(i));
|
||||||
|
assertTrue(fs.getFileStatus(dir).isDirectory());
|
||||||
|
for (int j = 0; j < 5; j++) {
|
||||||
|
Path file = new Path(dir, Integer.toString(j));
|
||||||
|
if (j == 0) {
|
||||||
|
file = new Path(dir, "renamed");
|
||||||
|
}
|
||||||
|
FSDataInputStream in = fs.open(file);
|
||||||
|
int n = in.readByte();
|
||||||
|
assertEquals(j, n);
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SnapshotTestHelper.dumpTree2File(fsdir, postRestartTree);
|
||||||
|
SnapshotTestHelper.compareDumpedTreeInFile(
|
||||||
|
preRestartTree, postRestartTree, true);
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user