HDFS-14617. Improve fsimage load time by writing sub-sections to the fsimage index (#1028). Contributed by Stephen O'Donnell.

Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
Stephen O'Donnell 2019-08-23 01:09:57 +01:00 committed by Wei-Chiu Chuang
parent 93daf69f90
commit b67812ea21
9 changed files with 719 additions and 31 deletions

View File

@ -883,6 +883,22 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize"; public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024; public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
public static final String DFS_IMAGE_PARALLEL_LOAD_KEY =
"dfs.image.parallel.load";
public static final boolean DFS_IMAGE_PARALLEL_LOAD_DEFAULT = true;
public static final String DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY =
"dfs.image.parallel.target.sections";
public static final int DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT = 12;
public static final String DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY =
"dfs.image.parallel.inode.threshold";
public static final int DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT = 1000000;
public static final String DFS_IMAGE_PARALLEL_THREADS_KEY =
"dfs.image.parallel.threads";
public static final int DFS_IMAGE_PARALLEL_THREADS_DEFAULT = 4;
// Edit Log segment transfer timeout // Edit Log segment transfer timeout
public static final String DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY = public static final String DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY =
"dfs.edit.log.transfer.timeout"; "dfs.edit.log.transfer.timeout";

View File

@ -985,7 +985,8 @@ void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid); File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
File dstFile = NNStorage.getStorageFile(sd, dstType, txid); File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context); FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context,
conf);
FSImageCompression compression = FSImageCompression.createCompression(conf); FSImageCompression compression = FSImageCompression.createCompression(conf);
long numErrors = saver.save(newFile, compression); long numErrors = saver.save(newFile, compression);
if (numErrors > 0) { if (numErrors > 0) {

View File

@ -25,6 +25,11 @@
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -90,6 +95,8 @@ public final class FSImageFormatPBINode {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(FSImageFormatPBINode.class); LoggerFactory.getLogger(FSImageFormatPBINode.class);
private static final int DIRECTORY_ENTRY_BATCH_SIZE = 1000;
// the loader must decode all fields referencing serial number based fields // the loader must decode all fields referencing serial number based fields
// via to<Item> methods with the string table. // via to<Item> methods with the string table.
public final static class Loader { public final static class Loader {
@ -197,16 +204,66 @@ public static void updateBlocksMap(INodeFile file, BlockManager bm) {
private final FSDirectory dir; private final FSDirectory dir;
private final FSNamesystem fsn; private final FSNamesystem fsn;
private final FSImageFormatProtobuf.Loader parent; private final FSImageFormatProtobuf.Loader parent;
private ReentrantLock cacheNameMapLock;
private ReentrantLock blockMapLock;
Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) { Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
this.fsn = fsn; this.fsn = fsn;
this.dir = fsn.dir; this.dir = fsn.dir;
this.parent = parent; this.parent = parent;
cacheNameMapLock = new ReentrantLock(true);
blockMapLock = new ReentrantLock(true);
}
void loadINodeDirectorySectionInParallel(ExecutorService service,
ArrayList<FileSummary.Section> sections, String compressionCodec)
throws IOException {
LOG.info("Loading the INodeDirectory section in parallel with {} sub-" +
"sections", sections.size());
CountDownLatch latch = new CountDownLatch(sections.size());
final CopyOnWriteArrayList<IOException> exceptions =
new CopyOnWriteArrayList<>();
for (FileSummary.Section s : sections) {
service.submit(() -> {
InputStream ins = null;
try {
ins = parent.getInputStreamForSection(s,
compressionCodec);
loadINodeDirectorySection(ins);
} catch (Exception e) {
LOG.error("An exception occurred loading INodeDirectories in " +
"parallel", e);
exceptions.add(new IOException(e));
} finally {
latch.countDown();
try {
if (ins != null) {
ins.close();
}
} catch (IOException ioe) {
LOG.warn("Failed to close the input stream, ignoring", ioe);
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
LOG.error("Interrupted waiting for countdown latch", e);
throw new IOException(e);
}
if (exceptions.size() != 0) {
LOG.error("{} exceptions occurred loading INodeDirectories",
exceptions.size());
throw exceptions.get(0);
}
LOG.info("Completed loading all INodeDirectory sub-sections");
} }
void loadINodeDirectorySection(InputStream in) throws IOException { void loadINodeDirectorySection(InputStream in) throws IOException {
final List<INodeReference> refList = parent.getLoaderContext() final List<INodeReference> refList = parent.getLoaderContext()
.getRefList(); .getRefList();
ArrayList<INode> inodeList = new ArrayList<>();
while (true) { while (true) {
INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
.parseDelimitedFrom(in); .parseDelimitedFrom(in);
@ -217,33 +274,159 @@ void loadINodeDirectorySection(InputStream in) throws IOException {
INodeDirectory p = dir.getInode(e.getParent()).asDirectory(); INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
for (long id : e.getChildrenList()) { for (long id : e.getChildrenList()) {
INode child = dir.getInode(id); INode child = dir.getInode(id);
addToParent(p, child); if (addToParent(p, child)) {
if (child.isFile()) {
inodeList.add(child);
}
if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
addToCacheAndBlockMap(inodeList);
inodeList.clear();
}
} else {
LOG.warn("Failed to add the inode {} to the directory {}",
child.getId(), p.getId());
}
} }
for (int refId : e.getRefChildrenList()) { for (int refId : e.getRefChildrenList()) {
INodeReference ref = refList.get(refId); INodeReference ref = refList.get(refId);
addToParent(p, ref); if (addToParent(p, ref)) {
if (ref.isFile()) {
inodeList.add(ref);
}
if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
addToCacheAndBlockMap(inodeList);
inodeList.clear();
}
} else {
LOG.warn("Failed to add the inode reference {} to the directory {}",
ref.getId(), p.getId());
}
} }
} }
addToCacheAndBlockMap(inodeList);
}
private void addToCacheAndBlockMap(ArrayList<INode> inodeList) {
try {
cacheNameMapLock.lock();
for (INode i : inodeList) {
dir.cacheName(i);
}
} finally {
cacheNameMapLock.unlock();
}
try {
blockMapLock.lock();
for (INode i : inodeList) {
updateBlocksMap(i.asFile(), fsn.getBlockManager());
}
} finally {
blockMapLock.unlock();
}
} }
void loadINodeSection(InputStream in, StartupProgress prog, void loadINodeSection(InputStream in, StartupProgress prog,
Step currentStep) throws IOException { Step currentStep) throws IOException {
loadINodeSectionHeader(in, prog, currentStep);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
int totalLoaded = loadINodesInSection(in, counter);
LOG.info("Successfully loaded {} inodes", totalLoaded);
}
private int loadINodesInSection(InputStream in, Counter counter)
throws IOException {
// As the input stream is a LimitInputStream, the reading will stop when
// EOF is encountered at the end of the stream.
int cntr = 0;
while (true) {
INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
if (p == null) {
break;
}
if (p.getId() == INodeId.ROOT_INODE_ID) {
synchronized(this) {
loadRootINode(p);
}
} else {
INode n = loadINode(p);
synchronized(this) {
dir.addToInodeMap(n);
}
}
cntr++;
if (counter != null) {
counter.increment();
}
}
return cntr;
}
private long loadINodeSectionHeader(InputStream in, StartupProgress prog,
Step currentStep) throws IOException {
INodeSection s = INodeSection.parseDelimitedFrom(in); INodeSection s = INodeSection.parseDelimitedFrom(in);
fsn.dir.resetLastInodeId(s.getLastInodeId()); fsn.dir.resetLastInodeId(s.getLastInodeId());
long numInodes = s.getNumInodes(); long numInodes = s.getNumInodes();
LOG.info("Loading " + numInodes + " INodes."); LOG.info("Loading " + numInodes + " INodes.");
prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes); prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep); return numInodes;
for (int i = 0; i < numInodes; ++i) { }
INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
if (p.getId() == INodeId.ROOT_INODE_ID) { void loadINodeSectionInParallel(ExecutorService service,
loadRootINode(p); ArrayList<FileSummary.Section> sections,
} else { String compressionCodec, StartupProgress prog,
INode n = loadINode(p); Step currentStep) throws IOException {
dir.addToInodeMap(n); LOG.info("Loading the INode section in parallel with {} sub-sections",
sections.size());
long expectedInodes = 0;
CountDownLatch latch = new CountDownLatch(sections.size());
AtomicInteger totalLoaded = new AtomicInteger(0);
final CopyOnWriteArrayList<IOException> exceptions =
new CopyOnWriteArrayList<>();
for (int i=0; i < sections.size(); i++) {
FileSummary.Section s = sections.get(i);
InputStream ins = parent.getInputStreamForSection(s, compressionCodec);
if (i == 0) {
// The first inode section has a header which must be processed first
expectedInodes = loadINodeSectionHeader(ins, prog, currentStep);
} }
counter.increment(); service.submit(() -> {
try {
totalLoaded.addAndGet(loadINodesInSection(ins, null));
prog.setCount(Phase.LOADING_FSIMAGE, currentStep,
totalLoaded.get());
} catch (Exception e) {
LOG.error("An exception occurred loading INodes in parallel", e);
exceptions.add(new IOException(e));
} finally {
latch.countDown();
try {
ins.close();
} catch (IOException ioe) {
LOG.warn("Failed to close the input stream, ignoring", ioe);
}
}
});
} }
try {
latch.await();
} catch (InterruptedException e) {
LOG.info("Interrupted waiting for countdown latch");
}
if (exceptions.size() != 0) {
LOG.error("{} exceptions occurred loading INodes", exceptions.size());
throw exceptions.get(0);
}
if (totalLoaded.get() != expectedInodes) {
throw new IOException("Expected to load "+expectedInodes+" in " +
"parallel, but loaded "+totalLoaded.get()+". The image may " +
"be corrupt.");
}
LOG.info("Completed loading all INode sections. Loaded {} inodes.",
totalLoaded.get());
} }
/** /**
@ -261,22 +444,18 @@ void loadFilesUnderConstructionSection(InputStream in) throws IOException {
} }
} }
private void addToParent(INodeDirectory parent, INode child) { private boolean addToParent(INodeDirectory parentDir, INode child) {
if (parent == dir.rootDir && FSDirectory.isReservedName(child)) { if (parentDir == dir.rootDir && FSDirectory.isReservedName(child)) {
throw new HadoopIllegalArgumentException("File name \"" throw new HadoopIllegalArgumentException("File name \""
+ child.getLocalName() + "\" is reserved. Please " + child.getLocalName() + "\" is reserved. Please "
+ " change the name of the existing file or directory to another " + " change the name of the existing file or directory to another "
+ "name before upgrading to this release."); + "name before upgrading to this release.");
} }
// NOTE: This does not update space counts for parents // NOTE: This does not update space counts for parents
if (!parent.addChildAtLoading(child)) { if (!parentDir.addChildAtLoading(child)) {
return; return false;
}
dir.cacheName(child);
if (child.isFile()) {
updateBlocksMap(child.asFile(), fsn.getBlockManager());
} }
return true;
} }
private INode loadINode(INodeSection.INode n) { private INode loadINode(INodeSection.INode n) {
@ -527,6 +706,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException {
final ArrayList<INodeReference> refList = parent.getSaverContext() final ArrayList<INodeReference> refList = parent.getSaverContext()
.getRefList(); .getRefList();
int i = 0; int i = 0;
int outputInodes = 0;
while (iter.hasNext()) { while (iter.hasNext()) {
INodeWithAdditionalFields n = iter.next(); INodeWithAdditionalFields n = iter.next();
if (!n.isDirectory()) { if (!n.isDirectory()) {
@ -558,6 +738,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException {
refList.add(inode.asReference()); refList.add(inode.asReference());
b.addRefChildren(refList.size() - 1); b.addRefChildren(refList.size() - 1);
} }
outputInodes++;
} }
INodeDirectorySection.DirEntry e = b.build(); INodeDirectorySection.DirEntry e = b.build();
e.writeDelimitedTo(out); e.writeDelimitedTo(out);
@ -567,9 +748,15 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException {
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
context.checkCancelled(); context.checkCancelled();
} }
if (outputInodes >= parent.getInodesPerSubSection()) {
outputInodes = 0;
parent.commitSubSection(summary,
FSImageFormatProtobuf.SectionName.INODE_DIR_SUB);
}
} }
parent.commitSection(summary, parent.commitSectionAndSubSection(summary,
FSImageFormatProtobuf.SectionName.INODE_DIR); FSImageFormatProtobuf.SectionName.INODE_DIR,
FSImageFormatProtobuf.SectionName.INODE_DIR_SUB);
} }
void serializeINodeSection(OutputStream out) throws IOException { void serializeINodeSection(OutputStream out) throws IOException {
@ -589,8 +776,14 @@ void serializeINodeSection(OutputStream out) throws IOException {
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
context.checkCancelled(); context.checkCancelled();
} }
if (i % parent.getInodesPerSubSection() == 0) {
parent.commitSubSection(summary,
FSImageFormatProtobuf.SectionName.INODE_SUB);
}
} }
parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE); parent.commitSectionAndSubSection(summary,
FSImageFormatProtobuf.SectionName.INODE,
FSImageFormatProtobuf.SectionName.INODE_SUB);
} }
void serializeFilesUCSection(OutputStream out) throws IOException { void serializeFilesUCSection(OutputStream out) throws IOException {

View File

@ -40,7 +40,11 @@
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.CompressionOutputStream;
@ -150,6 +154,8 @@ public static final class Loader implements FSImageFormat.AbstractLoader {
*/ */
private final boolean requireSameLayoutVersion; private final boolean requireSameLayoutVersion;
private File filename;
Loader(Configuration conf, FSNamesystem fsn, Loader(Configuration conf, FSNamesystem fsn,
boolean requireSameLayoutVersion) { boolean requireSameLayoutVersion) {
this.conf = conf; this.conf = conf;
@ -229,6 +235,7 @@ public String toString() {
} }
void load(File file) throws IOException { void load(File file) throws IOException {
filename = file;
long start = Time.monotonicNow(); long start = Time.monotonicNow();
DigestThread dt = new DigestThread(file); DigestThread dt = new DigestThread(file);
dt.start(); dt.start();
@ -250,6 +257,96 @@ void load(File file) throws IOException {
} }
} }
/**
* Given a FSImage FileSummary.section, return a LimitInput stream set to
* the starting position of the section and limited to the section length.
* @param section The FileSummary.Section containing the offset and length
* @param compressionCodec The compression codec in use, if any
* @return An InputStream for the given section
* @throws IOException
*/
public InputStream getInputStreamForSection(FileSummary.Section section,
String compressionCodec)
throws IOException {
FileInputStream fin = new FileInputStream(filename);
FileChannel channel = fin.getChannel();
channel.position(section.getOffset());
InputStream in = new BufferedInputStream(new LimitInputStream(fin,
section.getLength()));
in = FSImageUtil.wrapInputStreamForCompression(conf,
compressionCodec, in);
return in;
}
/**
* Takes an ArrayList of Section's and removes all Section's whose
* name ends in _SUB, indicating they are sub-sections. The original
* array list is modified and a new list of the removed Section's is
* returned.
* @param sections Array List containing all Sections and Sub Sections
* in the image.
* @return ArrayList of the sections removed, or an empty list if none are
* removed.
*/
private ArrayList<FileSummary.Section> getAndRemoveSubSections(
ArrayList<FileSummary.Section> sections) {
ArrayList<FileSummary.Section> subSections = new ArrayList<>();
Iterator<FileSummary.Section> iter = sections.iterator();
while (iter.hasNext()) {
FileSummary.Section s = iter.next();
String name = s.getName();
if (name.matches(".*_SUB$")) {
subSections.add(s);
iter.remove();
}
}
return subSections;
}
/**
* Given an ArrayList of Section's, return all Section's with the given
* name, or an empty list if none are found.
* @param sections ArrayList of the Section's to search though
* @param name The name of the Sections to search for
* @return ArrayList of the sections matching the given name
*/
private ArrayList<FileSummary.Section> getSubSectionsOfName(
ArrayList<FileSummary.Section> sections, SectionName name) {
ArrayList<FileSummary.Section> subSec = new ArrayList<>();
for (FileSummary.Section s : sections) {
String n = s.getName();
SectionName sectionName = SectionName.fromString(n);
if (sectionName == name) {
subSec.add(s);
}
}
return subSec;
}
/**
* Checks the number of threads configured for parallel loading and
* return an ExecutorService with configured number of threads. If the
* thread count is set to less than 1, it will be reset to the default
* value
* @return ExecutorServie with the correct number of threads
*/
private ExecutorService getParallelExecutorService() {
int threads = conf.getInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY,
DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT);
if (threads < 1) {
LOG.warn("Parallel is enabled and {} is set to {}. Setting to the " +
"default value {}", DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY,
threads, DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT);
threads = DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT;
}
ExecutorService executorService = Executors.newFixedThreadPool(
threads);
LOG.info("The fsimage will be loaded in parallel using {} threads",
threads);
return executorService;
}
private void loadInternal(RandomAccessFile raFile, FileInputStream fin) private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
throws IOException { throws IOException {
if (!FSImageUtil.checkFileFormat(raFile)) { if (!FSImageUtil.checkFileFormat(raFile)) {
@ -294,6 +391,14 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
* a particular step to be started for once. * a particular step to be started for once.
*/ */
Step currentStep = null; Step currentStep = null;
boolean loadInParallel = enableParallelSaveAndLoad(conf);
ExecutorService executorService = null;
ArrayList<FileSummary.Section> subSections =
getAndRemoveSubSections(sections);
if (loadInParallel) {
executorService = getParallelExecutorService();
}
for (FileSummary.Section s : sections) { for (FileSummary.Section s : sections) {
channel.position(s.getOffset()); channel.position(s.getOffset());
@ -308,6 +413,8 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
if (sectionName == null) { if (sectionName == null) {
throw new IOException("Unrecognized section " + n); throw new IOException("Unrecognized section " + n);
} }
ArrayList<FileSummary.Section> stageSubSections;
switch (sectionName) { switch (sectionName) {
case NS_INFO: case NS_INFO:
loadNameSystemSection(in); loadNameSystemSection(in);
@ -318,14 +425,28 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
case INODE: { case INODE: {
currentStep = new Step(StepType.INODES); currentStep = new Step(StepType.INODES);
prog.beginStep(Phase.LOADING_FSIMAGE, currentStep); prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
inodeLoader.loadINodeSection(in, prog, currentStep); stageSubSections = getSubSectionsOfName(
subSections, SectionName.INODE_SUB);
if (loadInParallel && (stageSubSections.size() > 0)) {
inodeLoader.loadINodeSectionInParallel(executorService,
stageSubSections, summary.getCodec(), prog, currentStep);
} else {
inodeLoader.loadINodeSection(in, prog, currentStep);
}
} }
break; break;
case INODE_REFERENCE: case INODE_REFERENCE:
snapshotLoader.loadINodeReferenceSection(in); snapshotLoader.loadINodeReferenceSection(in);
break; break;
case INODE_DIR: case INODE_DIR:
inodeLoader.loadINodeDirectorySection(in); stageSubSections = getSubSectionsOfName(
subSections, SectionName.INODE_DIR_SUB);
if (loadInParallel && stageSubSections.size() > 0) {
inodeLoader.loadINodeDirectorySectionInParallel(executorService,
stageSubSections, summary.getCodec());
} else {
inodeLoader.loadINodeDirectorySection(in);
}
break; break;
case FILES_UNDERCONSTRUCTION: case FILES_UNDERCONSTRUCTION:
inodeLoader.loadFilesUnderConstructionSection(in); inodeLoader.loadFilesUnderConstructionSection(in);
@ -362,6 +483,9 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
break; break;
} }
} }
if (executorService != null) {
executorService.shutdown();
}
} }
private void loadNameSystemSection(InputStream in) throws IOException { private void loadNameSystemSection(InputStream in) throws IOException {
@ -450,12 +574,34 @@ private void loadErasureCodingSection(InputStream in)
} }
} }
private static boolean enableParallelSaveAndLoad(Configuration conf) {
boolean loadInParallel =
conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY,
DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT);
boolean compressionEnabled = conf.getBoolean(
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
if (loadInParallel) {
if (compressionEnabled) {
LOG.warn("Parallel Image loading and saving is not supported when {}" +
" is set to true. Parallel will be disabled.",
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY);
loadInParallel = false;
}
}
return loadInParallel;
}
public static final class Saver { public static final class Saver {
public static final int CHECK_CANCEL_INTERVAL = 4096; public static final int CHECK_CANCEL_INTERVAL = 4096;
private boolean writeSubSections = false;
private int inodesPerSubSection = Integer.MAX_VALUE;
private final SaveNamespaceContext context; private final SaveNamespaceContext context;
private final SaverContext saverContext; private final SaverContext saverContext;
private long currentOffset = FSImageUtil.MAGIC_HEADER.length; private long currentOffset = FSImageUtil.MAGIC_HEADER.length;
private long subSectionOffset = currentOffset;
private MD5Hash savedDigest; private MD5Hash savedDigest;
private FileChannel fileChannel; private FileChannel fileChannel;
@ -463,10 +609,12 @@ public static final class Saver {
private OutputStream sectionOutputStream; private OutputStream sectionOutputStream;
private CompressionCodec codec; private CompressionCodec codec;
private OutputStream underlyingOutputStream; private OutputStream underlyingOutputStream;
private Configuration conf;
Saver(SaveNamespaceContext context) { Saver(SaveNamespaceContext context, Configuration conf) {
this.context = context; this.context = context;
this.saverContext = new SaverContext(); this.saverContext = new SaverContext();
this.conf = conf;
} }
public MD5Hash getSavedDigest() { public MD5Hash getSavedDigest() {
@ -481,6 +629,29 @@ public SaverContext getSaverContext() {
return saverContext; return saverContext;
} }
public int getInodesPerSubSection() {
return inodesPerSubSection;
}
public boolean shouldWriteSubSections() {
return writeSubSections;
}
/**
* Commit the length and offset of a fsimage section to the summary index,
* including the sub section, which will be committed before the section is
* committed.
* @param summary The image summary object
* @param name The name of the section to commit
* @param subSectionName The name of the sub-section to commit
* @throws IOException
*/
public void commitSectionAndSubSection(FileSummary.Builder summary,
SectionName name, SectionName subSectionName) throws IOException {
commitSubSection(summary, subSectionName);
commitSection(summary, name);
}
public void commitSection(FileSummary.Builder summary, SectionName name) public void commitSection(FileSummary.Builder summary, SectionName name)
throws IOException { throws IOException {
long oldOffset = currentOffset; long oldOffset = currentOffset;
@ -495,6 +666,35 @@ public void commitSection(FileSummary.Builder summary, SectionName name)
summary.addSections(FileSummary.Section.newBuilder().setName(name.name) summary.addSections(FileSummary.Section.newBuilder().setName(name.name)
.setLength(length).setOffset(currentOffset)); .setLength(length).setOffset(currentOffset));
currentOffset += length; currentOffset += length;
subSectionOffset = currentOffset;
}
/**
* Commit the length and offset of a fsimage sub-section to the summary
* index.
* @param summary The image summary object
* @param name The name of the sub-section to commit
* @throws IOException
*/
public void commitSubSection(FileSummary.Builder summary, SectionName name)
throws IOException {
if (!writeSubSections) {
return;
}
LOG.debug("Saving a subsection for {}", name.toString());
// The output stream must be flushed before the length is obtained
// as the flush can move the length forward.
sectionOutputStream.flush();
long length = fileChannel.position() - subSectionOffset;
if (length == 0) {
LOG.warn("The requested section for {} is empty. It will not be " +
"output to the image", name.toString());
return;
}
summary.addSections(FileSummary.Section.newBuilder().setName(name.name)
.setLength(length).setOffset(subSectionOffset));
subSectionOffset += length;
} }
private void flushSectionOutputStream() throws IOException { private void flushSectionOutputStream() throws IOException {
@ -509,6 +709,7 @@ private void flushSectionOutputStream() throws IOException {
* @throws IOException on fatal error. * @throws IOException on fatal error.
*/ */
long save(File file, FSImageCompression compression) throws IOException { long save(File file, FSImageCompression compression) throws IOException {
enableSubSectionsIfRequired();
FileOutputStream fout = new FileOutputStream(file); FileOutputStream fout = new FileOutputStream(file);
fileChannel = fout.getChannel(); fileChannel = fout.getChannel();
try { try {
@ -525,6 +726,47 @@ long save(File file, FSImageCompression compression) throws IOException {
} }
} }
private void enableSubSectionsIfRequired() {
boolean parallelEnabled = enableParallelSaveAndLoad(conf);
int inodeThreshold = conf.getInt(
DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY,
DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT);
int targetSections = conf.getInt(
DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY,
DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT);
if (parallelEnabled) {
if (targetSections <= 0) {
LOG.warn("{} is set to {}. It must be greater than zero. Setting to" +
" default of {}",
DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY,
targetSections,
DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT);
targetSections =
DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT;
}
if (inodeThreshold <= 0) {
LOG.warn("{} is set to {}. It must be greater than zero. Setting to" +
" default of {}",
DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY,
inodeThreshold,
DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT);
inodeThreshold =
DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT;
}
int inodeCount = context.getSourceNamesystem().dir.getInodeMapSize();
// Only enable parallel sections if there are enough inodes
if (inodeCount >= inodeThreshold) {
writeSubSections = true;
// Calculate the inodes per section rounded up to the nearest int
inodesPerSubSection = (inodeCount + targetSections - 1) /
targetSections;
}
} else {
writeSubSections = false;
}
}
private static void saveFileSummary(OutputStream out, FileSummary summary) private static void saveFileSummary(OutputStream out, FileSummary summary)
throws IOException { throws IOException {
summary.writeDelimitedTo(out); summary.writeDelimitedTo(out);
@ -737,11 +979,15 @@ public enum SectionName {
EXTENDED_ACL("EXTENDED_ACL"), EXTENDED_ACL("EXTENDED_ACL"),
ERASURE_CODING("ERASURE_CODING"), ERASURE_CODING("ERASURE_CODING"),
INODE("INODE"), INODE("INODE"),
INODE_SUB("INODE_SUB"),
INODE_REFERENCE("INODE_REFERENCE"), INODE_REFERENCE("INODE_REFERENCE"),
INODE_REFERENCE_SUB("INODE_REFERENCE_SUB"),
SNAPSHOT("SNAPSHOT"), SNAPSHOT("SNAPSHOT"),
INODE_DIR("INODE_DIR"), INODE_DIR("INODE_DIR"),
INODE_DIR_SUB("INODE_DIR_SUB"),
FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"), FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"),
SNAPSHOT_DIFF("SNAPSHOT_DIFF"), SNAPSHOT_DIFF("SNAPSHOT_DIFF"),
SNAPSHOT_DIFF_SUB("SNAPSHOT_DIFF_SUB"),
SECRET_MANAGER("SECRET_MANAGER"), SECRET_MANAGER("SECRET_MANAGER"),
CACHE_MANAGER("CACHE_MANAGER"); CACHE_MANAGER("CACHE_MANAGER");

View File

@ -529,9 +529,14 @@ public void serializeSnapshotDiffSection(OutputStream out)
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
context.checkCancelled(); context.checkCancelled();
} }
if (i % parent.getInodesPerSubSection() == 0) {
parent.commitSubSection(headers,
FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB);
}
} }
parent.commitSection(headers, parent.commitSectionAndSubSection(headers,
FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF); FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF,
FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB);
} }
private void serializeFileDiffList(INodeFile file, OutputStream out) private void serializeFileDiffList(INodeFile file, OutputStream out)

View File

@ -1385,6 +1385,57 @@
</description> </description>
</property> </property>
<property>
<name>dfs.image.parallel.load</name>
<value>true</value>
<description>
If true, write sub-section entries to the fsimage index so it can
be loaded in parallel. Also controls whether parallel loading
will be used for an image previously created with sub-sections.
If the image contains sub-sections and this is set to false,
parallel loading will not be used.
Parallel loading is not compatible with image compression,
so if dfs.image.compress is set to true this setting will be
ignored and no parallel loading will occur.
</description>
</property>
<property>
<name>dfs.image.parallel.target.sections</name>
<value>12</value>
<description>
Controls the number of sub-sections that will be written to
fsimage for each section. This should be larger than
dfs.image.parallel.threads, otherwise all threads will not be
used when loading. Ideally, have at least twice the number
of target sections as threads, so each thread must load more
than one section to avoid one long running section affecting
the load time.
</description>
</property>
<property>
<name>dfs.image.parallel.inode.threshold</name>
<value>1000000</value>
<description>
If the image contains less inodes than this setting, then
do not write sub-sections and hence disable parallel loading.
This is because small images load very quickly in serial and
parallel loading is not needed.
</description>
</property>
<property>
<name>dfs.image.parallel.threads</name>
<value>4</value>
<description>
The number of threads to use when dfs.image.parallel.load is
enabled. This setting should be less than
dfs.image.parallel.target.sections. The optimal number of
threads will depend on the hardware and environment.
</description>
</property>
<property> <property>
<name>dfs.edit.log.transfer.timeout</name> <name>dfs.edit.log.transfer.timeout</name>
<value>30000</value> <value>30000</value>

View File

@ -606,4 +606,27 @@ public static long getStorageTxId(NameNode node, URI storageUri)
getStorageDirectory(storageUri); getStorageDirectory(storageUri);
return NNStorage.readTransactionIdFile(sDir); return NNStorage.readTransactionIdFile(sDir);
} }
/**
* Returns the summary section from the latest fsimage stored on the cluster.
* This is effectively the image index which contains the offset of each
* section and subsection.
* @param cluster The cluster to load the image from
* @return The FileSummary section of the fsimage
* @throws IOException
*/
public static FsImageProto.FileSummary getLatestImageSummary(
MiniDFSCluster cluster) throws IOException {
RandomAccessFile raFile = null;
try {
File image = FSImageTestUtil.findLatestImageFile(FSImageTestUtil
.getFSImage(cluster.getNameNode()).getStorage().getStorageDir(0));
raFile = new RandomAccessFile(image, "r");
return FSImageUtil.loadSummary(raFile);
} finally {
if (raFile != null) {
raFile.close();
}
}
}
} }

View File

@ -32,8 +32,10 @@
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import com.google.common.collect.Lists;
import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -72,6 +74,8 @@
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary.Section;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
@ -1000,4 +1004,152 @@ private boolean isPolicyEnabledInFsImage(ErasureCodingPolicy testPolicy) {
} }
throw new AssertionError("Policy is not found!"); throw new AssertionError("Policy is not found!");
} }
}
private ArrayList<Section> getSubSectionsOfName(ArrayList<Section> sections,
FSImageFormatProtobuf.SectionName name) {
ArrayList<Section> subSec = new ArrayList<>();
for (Section s : sections) {
if (s.getName().equals(name.toString())) {
subSec.add(s);
}
}
return subSec;
}
private MiniDFSCluster createAndLoadParallelFSImage(Configuration conf)
throws IOException {
conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true");
conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1");
conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4");
conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
// Create 10 directories, each containing 5 files
String baseDir = "/abc/def";
for (int i=0; i<10; i++) {
Path dir = new Path(baseDir+"/"+i);
for (int j=0; j<5; j++) {
Path f = new Path(dir, Integer.toString(j));
FSDataOutputStream os = fs.create(f);
os.write(1);
os.close();
}
}
// checkpoint
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.restartNameNode();
cluster.waitActive();
fs = cluster.getFileSystem();
// Ensure all the files created above exist, proving they were loaded
// correctly
for (int i=0; i<10; i++) {
Path dir = new Path(baseDir+"/"+i);
assertTrue(fs.getFileStatus(dir).isDirectory());
for (int j=0; j<5; j++) {
Path f = new Path(dir, Integer.toString(j));
assertTrue(fs.exists(f));
}
}
return cluster;
}
@Test
public void testParallelSaveAndLoad() throws IOException {
Configuration conf = new Configuration();
MiniDFSCluster cluster = null;
try {
cluster = createAndLoadParallelFSImage(conf);
// Obtain the image summary section to check the sub-sections
// are being correctly created when the image is saved.
FsImageProto.FileSummary summary = FSImageTestUtil.
getLatestImageSummary(cluster);
ArrayList<Section> sections = Lists.newArrayList(
summary.getSectionsList());
ArrayList<Section> inodeSubSections =
getSubSectionsOfName(sections, SectionName.INODE_SUB);
ArrayList<Section> dirSubSections =
getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB);
Section inodeSection =
getSubSectionsOfName(sections, SectionName.INODE).get(0);
Section dirSection = getSubSectionsOfName(sections,
SectionName.INODE_DIR).get(0);
// Expect 4 sub-sections for inodes and directories as target Sections
// is 4
assertEquals(4, inodeSubSections.size());
assertEquals(4, dirSubSections.size());
// Expect the sub-section offset and lengths do not overlap and cover a
// continuous range of the file. They should also line up with the parent
ensureSubSectionsAlignWithParent(inodeSubSections, inodeSection);
ensureSubSectionsAlignWithParent(dirSubSections, dirSection);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testNoParallelSectionsWithCompressionEnabled()
throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
"org.apache.hadoop.io.compress.GzipCodec");
MiniDFSCluster cluster = null;
try {
cluster = createAndLoadParallelFSImage(conf);
// Obtain the image summary section to check the sub-sections
// are being correctly created when the image is saved.
FsImageProto.FileSummary summary = FSImageTestUtil.
getLatestImageSummary(cluster);
ArrayList<Section> sections = Lists.newArrayList(
summary.getSectionsList());
ArrayList<Section> inodeSubSections =
getSubSectionsOfName(sections, SectionName.INODE_SUB);
ArrayList<Section> dirSubSections =
getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB);
// As compression is enabled, there should be no sub-sections in the
// image header
assertEquals(0, inodeSubSections.size());
assertEquals(0, dirSubSections.size());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private void ensureSubSectionsAlignWithParent(ArrayList<Section> subSec,
Section parent) {
// For each sub-section, check its offset + length == the next section
// offset
for (int i=0; i<subSec.size()-1; i++) {
Section s = subSec.get(i);
long endOffset = s.getOffset() + s.getLength();
assertEquals(subSec.get(i+1).getOffset(), endOffset);
}
// The last sub-section should align with the parent section
Section lastSubSection = subSec.get(subSec.size()-1);
assertEquals(parent.getLength()+parent.getOffset(),
lastSubSection.getLength() + lastSubSection.getOffset());
// The first sub-section and parent section should have the same offset
assertEquals(parent.getOffset(), subSec.get(0).getOffset());
}
}

View File

@ -143,7 +143,8 @@ private HdfsDataOutputStream appendFileWithoutClosing(Path file, int length)
private File saveFSImageToTempFile() throws IOException { private File saveFSImageToTempFile() throws IOException {
SaveNamespaceContext context = new SaveNamespaceContext(fsn, txid, SaveNamespaceContext context = new SaveNamespaceContext(fsn, txid,
new Canceler()); new Canceler());
FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context); FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context,
conf);
FSImageCompression compression = FSImageCompression.createCompression(conf); FSImageCompression compression = FSImageCompression.createCompression(conf);
File imageFile = getImageFile(testDir, txid); File imageFile = getImageFile(testDir, txid);
fsn.readLock(); fsn.readLock();