diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 9bd124627f..95806defc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -883,6 +883,22 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize"; public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024; + public static final String DFS_IMAGE_PARALLEL_LOAD_KEY = + "dfs.image.parallel.load"; + public static final boolean DFS_IMAGE_PARALLEL_LOAD_DEFAULT = true; + + public static final String DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY = + "dfs.image.parallel.target.sections"; + public static final int DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT = 12; + + public static final String DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY = + "dfs.image.parallel.inode.threshold"; + public static final int DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT = 1000000; + + public static final String DFS_IMAGE_PARALLEL_THREADS_KEY = + "dfs.image.parallel.threads"; + public static final int DFS_IMAGE_PARALLEL_THREADS_DEFAULT = 4; + // Edit Log segment transfer timeout public static final String DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY = "dfs.edit.log.transfer.timeout"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index cfba091976..cea18b7f00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -985,7 +985,8 @@ void saveFSImage(SaveNamespaceContext context, StorageDirectory sd, File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid); File dstFile = NNStorage.getStorageFile(sd, dstType, txid); - FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context); + FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context, + conf); FSImageCompression compression = FSImageCompression.createCompression(conf); long numErrors = saver.save(newFile, compression); if (numErrors > 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 6825a5c485..d84e8c5b1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -25,6 +25,11 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +95,8 @@ public final class FSImageFormatPBINode { private static final Logger LOG = LoggerFactory.getLogger(FSImageFormatPBINode.class); + private static final int DIRECTORY_ENTRY_BATCH_SIZE = 1000; + // the loader must decode all fields referencing serial number based fields // via to methods with the string table. public final static class Loader { @@ -197,16 +204,66 @@ public static void updateBlocksMap(INodeFile file, BlockManager bm) { private final FSDirectory dir; private final FSNamesystem fsn; private final FSImageFormatProtobuf.Loader parent; + private ReentrantLock cacheNameMapLock; + private ReentrantLock blockMapLock; Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) { this.fsn = fsn; this.dir = fsn.dir; this.parent = parent; + cacheNameMapLock = new ReentrantLock(true); + blockMapLock = new ReentrantLock(true); + } + + void loadINodeDirectorySectionInParallel(ExecutorService service, + ArrayList sections, String compressionCodec) + throws IOException { + LOG.info("Loading the INodeDirectory section in parallel with {} sub-" + + "sections", sections.size()); + CountDownLatch latch = new CountDownLatch(sections.size()); + final CopyOnWriteArrayList exceptions = + new CopyOnWriteArrayList<>(); + for (FileSummary.Section s : sections) { + service.submit(() -> { + InputStream ins = null; + try { + ins = parent.getInputStreamForSection(s, + compressionCodec); + loadINodeDirectorySection(ins); + } catch (Exception e) { + LOG.error("An exception occurred loading INodeDirectories in " + + "parallel", e); + exceptions.add(new IOException(e)); + } finally { + latch.countDown(); + try { + if (ins != null) { + ins.close(); + } + } catch (IOException ioe) { + LOG.warn("Failed to close the input stream, ignoring", ioe); + } + } + }); + } + try { + latch.await(); + } catch (InterruptedException e) { + LOG.error("Interrupted waiting for countdown latch", e); + throw new IOException(e); + } + if (exceptions.size() != 0) { + LOG.error("{} exceptions occurred loading INodeDirectories", + exceptions.size()); + throw exceptions.get(0); + } + LOG.info("Completed loading all INodeDirectory sub-sections"); } void loadINodeDirectorySection(InputStream in) throws IOException { final List refList = parent.getLoaderContext() .getRefList(); + ArrayList inodeList = new ArrayList<>(); while (true) { INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry .parseDelimitedFrom(in); @@ -217,33 +274,159 @@ void loadINodeDirectorySection(InputStream in) throws IOException { INodeDirectory p = dir.getInode(e.getParent()).asDirectory(); for (long id : e.getChildrenList()) { INode child = dir.getInode(id); - addToParent(p, child); + if (addToParent(p, child)) { + if (child.isFile()) { + inodeList.add(child); + } + if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) { + addToCacheAndBlockMap(inodeList); + inodeList.clear(); + } + } else { + LOG.warn("Failed to add the inode {} to the directory {}", + child.getId(), p.getId()); + } } + for (int refId : e.getRefChildrenList()) { INodeReference ref = refList.get(refId); - addToParent(p, ref); + if (addToParent(p, ref)) { + if (ref.isFile()) { + inodeList.add(ref); + } + if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) { + addToCacheAndBlockMap(inodeList); + inodeList.clear(); + } + } else { + LOG.warn("Failed to add the inode reference {} to the directory {}", + ref.getId(), p.getId()); + } } } + addToCacheAndBlockMap(inodeList); + } + + private void addToCacheAndBlockMap(ArrayList inodeList) { + try { + cacheNameMapLock.lock(); + for (INode i : inodeList) { + dir.cacheName(i); + } + } finally { + cacheNameMapLock.unlock(); + } + + try { + blockMapLock.lock(); + for (INode i : inodeList) { + updateBlocksMap(i.asFile(), fsn.getBlockManager()); + } + } finally { + blockMapLock.unlock(); + } } void loadINodeSection(InputStream in, StartupProgress prog, Step currentStep) throws IOException { + loadINodeSectionHeader(in, prog, currentStep); + Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep); + int totalLoaded = loadINodesInSection(in, counter); + LOG.info("Successfully loaded {} inodes", totalLoaded); + } + + private int loadINodesInSection(InputStream in, Counter counter) + throws IOException { + // As the input stream is a LimitInputStream, the reading will stop when + // EOF is encountered at the end of the stream. + int cntr = 0; + while (true) { + INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in); + if (p == null) { + break; + } + if (p.getId() == INodeId.ROOT_INODE_ID) { + synchronized(this) { + loadRootINode(p); + } + } else { + INode n = loadINode(p); + synchronized(this) { + dir.addToInodeMap(n); + } + } + cntr++; + if (counter != null) { + counter.increment(); + } + } + return cntr; + } + + + private long loadINodeSectionHeader(InputStream in, StartupProgress prog, + Step currentStep) throws IOException { INodeSection s = INodeSection.parseDelimitedFrom(in); fsn.dir.resetLastInodeId(s.getLastInodeId()); long numInodes = s.getNumInodes(); LOG.info("Loading " + numInodes + " INodes."); prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes); - Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep); - for (int i = 0; i < numInodes; ++i) { - INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in); - if (p.getId() == INodeId.ROOT_INODE_ID) { - loadRootINode(p); - } else { - INode n = loadINode(p); - dir.addToInodeMap(n); + return numInodes; + } + + void loadINodeSectionInParallel(ExecutorService service, + ArrayList sections, + String compressionCodec, StartupProgress prog, + Step currentStep) throws IOException { + LOG.info("Loading the INode section in parallel with {} sub-sections", + sections.size()); + long expectedInodes = 0; + CountDownLatch latch = new CountDownLatch(sections.size()); + AtomicInteger totalLoaded = new AtomicInteger(0); + final CopyOnWriteArrayList exceptions = + new CopyOnWriteArrayList<>(); + + for (int i=0; i < sections.size(); i++) { + FileSummary.Section s = sections.get(i); + InputStream ins = parent.getInputStreamForSection(s, compressionCodec); + if (i == 0) { + // The first inode section has a header which must be processed first + expectedInodes = loadINodeSectionHeader(ins, prog, currentStep); } - counter.increment(); + service.submit(() -> { + try { + totalLoaded.addAndGet(loadINodesInSection(ins, null)); + prog.setCount(Phase.LOADING_FSIMAGE, currentStep, + totalLoaded.get()); + } catch (Exception e) { + LOG.error("An exception occurred loading INodes in parallel", e); + exceptions.add(new IOException(e)); + } finally { + latch.countDown(); + try { + ins.close(); + } catch (IOException ioe) { + LOG.warn("Failed to close the input stream, ignoring", ioe); + } + } + }); } + try { + latch.await(); + } catch (InterruptedException e) { + LOG.info("Interrupted waiting for countdown latch"); + } + if (exceptions.size() != 0) { + LOG.error("{} exceptions occurred loading INodes", exceptions.size()); + throw exceptions.get(0); + } + if (totalLoaded.get() != expectedInodes) { + throw new IOException("Expected to load "+expectedInodes+" in " + + "parallel, but loaded "+totalLoaded.get()+". The image may " + + "be corrupt."); + } + LOG.info("Completed loading all INode sections. Loaded {} inodes.", + totalLoaded.get()); } /** @@ -261,22 +444,18 @@ void loadFilesUnderConstructionSection(InputStream in) throws IOException { } } - private void addToParent(INodeDirectory parent, INode child) { - if (parent == dir.rootDir && FSDirectory.isReservedName(child)) { + private boolean addToParent(INodeDirectory parentDir, INode child) { + if (parentDir == dir.rootDir && FSDirectory.isReservedName(child)) { throw new HadoopIllegalArgumentException("File name \"" + child.getLocalName() + "\" is reserved. Please " + " change the name of the existing file or directory to another " + "name before upgrading to this release."); } // NOTE: This does not update space counts for parents - if (!parent.addChildAtLoading(child)) { - return; - } - dir.cacheName(child); - - if (child.isFile()) { - updateBlocksMap(child.asFile(), fsn.getBlockManager()); + if (!parentDir.addChildAtLoading(child)) { + return false; } + return true; } private INode loadINode(INodeSection.INode n) { @@ -527,6 +706,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { final ArrayList refList = parent.getSaverContext() .getRefList(); int i = 0; + int outputInodes = 0; while (iter.hasNext()) { INodeWithAdditionalFields n = iter.next(); if (!n.isDirectory()) { @@ -558,6 +738,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { refList.add(inode.asReference()); b.addRefChildren(refList.size() - 1); } + outputInodes++; } INodeDirectorySection.DirEntry e = b.build(); e.writeDelimitedTo(out); @@ -567,9 +748,15 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { context.checkCancelled(); } + if (outputInodes >= parent.getInodesPerSubSection()) { + outputInodes = 0; + parent.commitSubSection(summary, + FSImageFormatProtobuf.SectionName.INODE_DIR_SUB); + } } - parent.commitSection(summary, - FSImageFormatProtobuf.SectionName.INODE_DIR); + parent.commitSectionAndSubSection(summary, + FSImageFormatProtobuf.SectionName.INODE_DIR, + FSImageFormatProtobuf.SectionName.INODE_DIR_SUB); } void serializeINodeSection(OutputStream out) throws IOException { @@ -589,8 +776,14 @@ void serializeINodeSection(OutputStream out) throws IOException { if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { context.checkCancelled(); } + if (i % parent.getInodesPerSubSection() == 0) { + parent.commitSubSection(summary, + FSImageFormatProtobuf.SectionName.INODE_SUB); + } } - parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE); + parent.commitSectionAndSubSection(summary, + FSImageFormatProtobuf.SectionName.INODE, + FSImageFormatProtobuf.SectionName.INODE_SUB); } void serializeFilesUCSection(OutputStream out) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index b887a1438e..3144d4b17c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -40,7 +40,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.io.compress.CompressionOutputStream; @@ -150,6 +154,8 @@ public static final class Loader implements FSImageFormat.AbstractLoader { */ private final boolean requireSameLayoutVersion; + private File filename; + Loader(Configuration conf, FSNamesystem fsn, boolean requireSameLayoutVersion) { this.conf = conf; @@ -229,6 +235,7 @@ public String toString() { } void load(File file) throws IOException { + filename = file; long start = Time.monotonicNow(); DigestThread dt = new DigestThread(file); dt.start(); @@ -250,6 +257,96 @@ void load(File file) throws IOException { } } + /** + * Given a FSImage FileSummary.section, return a LimitInput stream set to + * the starting position of the section and limited to the section length. + * @param section The FileSummary.Section containing the offset and length + * @param compressionCodec The compression codec in use, if any + * @return An InputStream for the given section + * @throws IOException + */ + public InputStream getInputStreamForSection(FileSummary.Section section, + String compressionCodec) + throws IOException { + FileInputStream fin = new FileInputStream(filename); + FileChannel channel = fin.getChannel(); + channel.position(section.getOffset()); + InputStream in = new BufferedInputStream(new LimitInputStream(fin, + section.getLength())); + + in = FSImageUtil.wrapInputStreamForCompression(conf, + compressionCodec, in); + return in; + } + + /** + * Takes an ArrayList of Section's and removes all Section's whose + * name ends in _SUB, indicating they are sub-sections. The original + * array list is modified and a new list of the removed Section's is + * returned. + * @param sections Array List containing all Sections and Sub Sections + * in the image. + * @return ArrayList of the sections removed, or an empty list if none are + * removed. + */ + private ArrayList getAndRemoveSubSections( + ArrayList sections) { + ArrayList subSections = new ArrayList<>(); + Iterator iter = sections.iterator(); + while (iter.hasNext()) { + FileSummary.Section s = iter.next(); + String name = s.getName(); + if (name.matches(".*_SUB$")) { + subSections.add(s); + iter.remove(); + } + } + return subSections; + } + + /** + * Given an ArrayList of Section's, return all Section's with the given + * name, or an empty list if none are found. + * @param sections ArrayList of the Section's to search though + * @param name The name of the Sections to search for + * @return ArrayList of the sections matching the given name + */ + private ArrayList getSubSectionsOfName( + ArrayList sections, SectionName name) { + ArrayList subSec = new ArrayList<>(); + for (FileSummary.Section s : sections) { + String n = s.getName(); + SectionName sectionName = SectionName.fromString(n); + if (sectionName == name) { + subSec.add(s); + } + } + return subSec; + } + + /** + * Checks the number of threads configured for parallel loading and + * return an ExecutorService with configured number of threads. If the + * thread count is set to less than 1, it will be reset to the default + * value + * @return ExecutorServie with the correct number of threads + */ + private ExecutorService getParallelExecutorService() { + int threads = conf.getInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, + DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT); + if (threads < 1) { + LOG.warn("Parallel is enabled and {} is set to {}. Setting to the " + + "default value {}", DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, + threads, DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT); + threads = DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT; + } + ExecutorService executorService = Executors.newFixedThreadPool( + threads); + LOG.info("The fsimage will be loaded in parallel using {} threads", + threads); + return executorService; + } + private void loadInternal(RandomAccessFile raFile, FileInputStream fin) throws IOException { if (!FSImageUtil.checkFileFormat(raFile)) { @@ -294,6 +391,14 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) { * a particular step to be started for once. */ Step currentStep = null; + boolean loadInParallel = enableParallelSaveAndLoad(conf); + + ExecutorService executorService = null; + ArrayList subSections = + getAndRemoveSubSections(sections); + if (loadInParallel) { + executorService = getParallelExecutorService(); + } for (FileSummary.Section s : sections) { channel.position(s.getOffset()); @@ -308,6 +413,8 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) { if (sectionName == null) { throw new IOException("Unrecognized section " + n); } + + ArrayList stageSubSections; switch (sectionName) { case NS_INFO: loadNameSystemSection(in); @@ -318,14 +425,28 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) { case INODE: { currentStep = new Step(StepType.INODES); prog.beginStep(Phase.LOADING_FSIMAGE, currentStep); - inodeLoader.loadINodeSection(in, prog, currentStep); + stageSubSections = getSubSectionsOfName( + subSections, SectionName.INODE_SUB); + if (loadInParallel && (stageSubSections.size() > 0)) { + inodeLoader.loadINodeSectionInParallel(executorService, + stageSubSections, summary.getCodec(), prog, currentStep); + } else { + inodeLoader.loadINodeSection(in, prog, currentStep); + } } break; case INODE_REFERENCE: snapshotLoader.loadINodeReferenceSection(in); break; case INODE_DIR: - inodeLoader.loadINodeDirectorySection(in); + stageSubSections = getSubSectionsOfName( + subSections, SectionName.INODE_DIR_SUB); + if (loadInParallel && stageSubSections.size() > 0) { + inodeLoader.loadINodeDirectorySectionInParallel(executorService, + stageSubSections, summary.getCodec()); + } else { + inodeLoader.loadINodeDirectorySection(in); + } break; case FILES_UNDERCONSTRUCTION: inodeLoader.loadFilesUnderConstructionSection(in); @@ -362,6 +483,9 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) { break; } } + if (executorService != null) { + executorService.shutdown(); + } } private void loadNameSystemSection(InputStream in) throws IOException { @@ -450,12 +574,34 @@ private void loadErasureCodingSection(InputStream in) } } + private static boolean enableParallelSaveAndLoad(Configuration conf) { + boolean loadInParallel = + conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, + DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT); + boolean compressionEnabled = conf.getBoolean( + DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, + DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT); + + if (loadInParallel) { + if (compressionEnabled) { + LOG.warn("Parallel Image loading and saving is not supported when {}" + + " is set to true. Parallel will be disabled.", + DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY); + loadInParallel = false; + } + } + return loadInParallel; + } + public static final class Saver { public static final int CHECK_CANCEL_INTERVAL = 4096; + private boolean writeSubSections = false; + private int inodesPerSubSection = Integer.MAX_VALUE; private final SaveNamespaceContext context; private final SaverContext saverContext; private long currentOffset = FSImageUtil.MAGIC_HEADER.length; + private long subSectionOffset = currentOffset; private MD5Hash savedDigest; private FileChannel fileChannel; @@ -463,10 +609,12 @@ public static final class Saver { private OutputStream sectionOutputStream; private CompressionCodec codec; private OutputStream underlyingOutputStream; + private Configuration conf; - Saver(SaveNamespaceContext context) { + Saver(SaveNamespaceContext context, Configuration conf) { this.context = context; this.saverContext = new SaverContext(); + this.conf = conf; } public MD5Hash getSavedDigest() { @@ -481,6 +629,29 @@ public SaverContext getSaverContext() { return saverContext; } + public int getInodesPerSubSection() { + return inodesPerSubSection; + } + + public boolean shouldWriteSubSections() { + return writeSubSections; + } + + /** + * Commit the length and offset of a fsimage section to the summary index, + * including the sub section, which will be committed before the section is + * committed. + * @param summary The image summary object + * @param name The name of the section to commit + * @param subSectionName The name of the sub-section to commit + * @throws IOException + */ + public void commitSectionAndSubSection(FileSummary.Builder summary, + SectionName name, SectionName subSectionName) throws IOException { + commitSubSection(summary, subSectionName); + commitSection(summary, name); + } + public void commitSection(FileSummary.Builder summary, SectionName name) throws IOException { long oldOffset = currentOffset; @@ -495,6 +666,35 @@ public void commitSection(FileSummary.Builder summary, SectionName name) summary.addSections(FileSummary.Section.newBuilder().setName(name.name) .setLength(length).setOffset(currentOffset)); currentOffset += length; + subSectionOffset = currentOffset; + } + + /** + * Commit the length and offset of a fsimage sub-section to the summary + * index. + * @param summary The image summary object + * @param name The name of the sub-section to commit + * @throws IOException + */ + public void commitSubSection(FileSummary.Builder summary, SectionName name) + throws IOException { + if (!writeSubSections) { + return; + } + + LOG.debug("Saving a subsection for {}", name.toString()); + // The output stream must be flushed before the length is obtained + // as the flush can move the length forward. + sectionOutputStream.flush(); + long length = fileChannel.position() - subSectionOffset; + if (length == 0) { + LOG.warn("The requested section for {} is empty. It will not be " + + "output to the image", name.toString()); + return; + } + summary.addSections(FileSummary.Section.newBuilder().setName(name.name) + .setLength(length).setOffset(subSectionOffset)); + subSectionOffset += length; } private void flushSectionOutputStream() throws IOException { @@ -509,6 +709,7 @@ private void flushSectionOutputStream() throws IOException { * @throws IOException on fatal error. */ long save(File file, FSImageCompression compression) throws IOException { + enableSubSectionsIfRequired(); FileOutputStream fout = new FileOutputStream(file); fileChannel = fout.getChannel(); try { @@ -525,6 +726,47 @@ long save(File file, FSImageCompression compression) throws IOException { } } + private void enableSubSectionsIfRequired() { + boolean parallelEnabled = enableParallelSaveAndLoad(conf); + int inodeThreshold = conf.getInt( + DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, + DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT); + int targetSections = conf.getInt( + DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, + DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT); + + if (parallelEnabled) { + if (targetSections <= 0) { + LOG.warn("{} is set to {}. It must be greater than zero. Setting to" + + " default of {}", + DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, + targetSections, + DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT); + targetSections = + DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT; + } + if (inodeThreshold <= 0) { + LOG.warn("{} is set to {}. It must be greater than zero. Setting to" + + " default of {}", + DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, + inodeThreshold, + DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT); + inodeThreshold = + DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT; + } + int inodeCount = context.getSourceNamesystem().dir.getInodeMapSize(); + // Only enable parallel sections if there are enough inodes + if (inodeCount >= inodeThreshold) { + writeSubSections = true; + // Calculate the inodes per section rounded up to the nearest int + inodesPerSubSection = (inodeCount + targetSections - 1) / + targetSections; + } + } else { + writeSubSections = false; + } + } + private static void saveFileSummary(OutputStream out, FileSummary summary) throws IOException { summary.writeDelimitedTo(out); @@ -737,11 +979,15 @@ public enum SectionName { EXTENDED_ACL("EXTENDED_ACL"), ERASURE_CODING("ERASURE_CODING"), INODE("INODE"), + INODE_SUB("INODE_SUB"), INODE_REFERENCE("INODE_REFERENCE"), + INODE_REFERENCE_SUB("INODE_REFERENCE_SUB"), SNAPSHOT("SNAPSHOT"), INODE_DIR("INODE_DIR"), + INODE_DIR_SUB("INODE_DIR_SUB"), FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"), SNAPSHOT_DIFF("SNAPSHOT_DIFF"), + SNAPSHOT_DIFF_SUB("SNAPSHOT_DIFF_SUB"), SECRET_MANAGER("SECRET_MANAGER"), CACHE_MANAGER("CACHE_MANAGER"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java index 2157554cd6..cd5051dd39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java @@ -529,9 +529,14 @@ public void serializeSnapshotDiffSection(OutputStream out) if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { context.checkCancelled(); } + if (i % parent.getInodesPerSubSection() == 0) { + parent.commitSubSection(headers, + FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB); + } } - parent.commitSection(headers, - FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF); + parent.commitSectionAndSubSection(headers, + FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF, + FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB); } private void serializeFileDiffList(INodeFile file, OutputStream out) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 79811aad7b..74c4f40938 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1385,6 +1385,57 @@ + + dfs.image.parallel.load + true + + If true, write sub-section entries to the fsimage index so it can + be loaded in parallel. Also controls whether parallel loading + will be used for an image previously created with sub-sections. + If the image contains sub-sections and this is set to false, + parallel loading will not be used. + Parallel loading is not compatible with image compression, + so if dfs.image.compress is set to true this setting will be + ignored and no parallel loading will occur. + + + + + dfs.image.parallel.target.sections + 12 + + Controls the number of sub-sections that will be written to + fsimage for each section. This should be larger than + dfs.image.parallel.threads, otherwise all threads will not be + used when loading. Ideally, have at least twice the number + of target sections as threads, so each thread must load more + than one section to avoid one long running section affecting + the load time. + + + + + dfs.image.parallel.inode.threshold + 1000000 + + If the image contains less inodes than this setting, then + do not write sub-sections and hence disable parallel loading. + This is because small images load very quickly in serial and + parallel loading is not needed. + + + + + dfs.image.parallel.threads + 4 + + The number of threads to use when dfs.image.parallel.load is + enabled. This setting should be less than + dfs.image.parallel.target.sections. The optimal number of + threads will depend on the hardware and environment. + + + dfs.edit.log.transfer.timeout 30000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java index 985ab35ba1..c82d317d88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java @@ -606,4 +606,27 @@ public static long getStorageTxId(NameNode node, URI storageUri) getStorageDirectory(storageUri); return NNStorage.readTransactionIdFile(sDir); } + + /** + * Returns the summary section from the latest fsimage stored on the cluster. + * This is effectively the image index which contains the offset of each + * section and subsection. + * @param cluster The cluster to load the image from + * @return The FileSummary section of the fsimage + * @throws IOException + */ + public static FsImageProto.FileSummary getLatestImageSummary( + MiniDFSCluster cluster) throws IOException { + RandomAccessFile raFile = null; + try { + File image = FSImageTestUtil.findLatestImageFile(FSImageTestUtil + .getFSImage(cluster.getNameNode()).getStorage().getStorageDir(0)); + raFile = new RandomAccessFile(image, "r"); + return FSImageUtil.loadSummary(raFile); + } finally { + if (raFile != null) { + raFile.close(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java index 0beb7582e9..793a749be2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java @@ -32,8 +32,10 @@ import java.io.ByteArrayOutputStream; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.EnumSet; +import com.google.common.collect.Lists; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.Block; @@ -72,6 +74,8 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection; +import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary.Section; +import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; @@ -1000,4 +1004,152 @@ private boolean isPolicyEnabledInFsImage(ErasureCodingPolicy testPolicy) { } throw new AssertionError("Policy is not found!"); } -} + + private ArrayList
getSubSectionsOfName(ArrayList
sections, + FSImageFormatProtobuf.SectionName name) { + ArrayList
subSec = new ArrayList<>(); + for (Section s : sections) { + if (s.getName().equals(name.toString())) { + subSec.add(s); + } + } + return subSec; + } + + private MiniDFSCluster createAndLoadParallelFSImage(Configuration conf) + throws IOException { + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true"); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1"); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4"); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4"); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + + // Create 10 directories, each containing 5 files + String baseDir = "/abc/def"; + for (int i=0; i<10; i++) { + Path dir = new Path(baseDir+"/"+i); + for (int j=0; j<5; j++) { + Path f = new Path(dir, Integer.toString(j)); + FSDataOutputStream os = fs.create(f); + os.write(1); + os.close(); + } + } + + // checkpoint + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + + cluster.restartNameNode(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + + // Ensure all the files created above exist, proving they were loaded + // correctly + for (int i=0; i<10; i++) { + Path dir = new Path(baseDir+"/"+i); + assertTrue(fs.getFileStatus(dir).isDirectory()); + for (int j=0; j<5; j++) { + Path f = new Path(dir, Integer.toString(j)); + assertTrue(fs.exists(f)); + } + } + return cluster; + } + + @Test + public void testParallelSaveAndLoad() throws IOException { + Configuration conf = new Configuration(); + + MiniDFSCluster cluster = null; + try { + cluster = createAndLoadParallelFSImage(conf); + + // Obtain the image summary section to check the sub-sections + // are being correctly created when the image is saved. + FsImageProto.FileSummary summary = FSImageTestUtil. + getLatestImageSummary(cluster); + ArrayList
sections = Lists.newArrayList( + summary.getSectionsList()); + + ArrayList
inodeSubSections = + getSubSectionsOfName(sections, SectionName.INODE_SUB); + ArrayList
dirSubSections = + getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB); + Section inodeSection = + getSubSectionsOfName(sections, SectionName.INODE).get(0); + Section dirSection = getSubSectionsOfName(sections, + SectionName.INODE_DIR).get(0); + + // Expect 4 sub-sections for inodes and directories as target Sections + // is 4 + assertEquals(4, inodeSubSections.size()); + assertEquals(4, dirSubSections.size()); + + // Expect the sub-section offset and lengths do not overlap and cover a + // continuous range of the file. They should also line up with the parent + ensureSubSectionsAlignWithParent(inodeSubSections, inodeSection); + ensureSubSectionsAlignWithParent(dirSubSections, dirSection); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testNoParallelSectionsWithCompressionEnabled() + throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true); + conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, + "org.apache.hadoop.io.compress.GzipCodec"); + + MiniDFSCluster cluster = null; + try { + cluster = createAndLoadParallelFSImage(conf); + + // Obtain the image summary section to check the sub-sections + // are being correctly created when the image is saved. + FsImageProto.FileSummary summary = FSImageTestUtil. + getLatestImageSummary(cluster); + ArrayList
sections = Lists.newArrayList( + summary.getSectionsList()); + + ArrayList
inodeSubSections = + getSubSectionsOfName(sections, SectionName.INODE_SUB); + ArrayList
dirSubSections = + getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB); + + // As compression is enabled, there should be no sub-sections in the + // image header + assertEquals(0, inodeSubSections.size()); + assertEquals(0, dirSubSections.size()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void ensureSubSectionsAlignWithParent(ArrayList
subSec, + Section parent) { + // For each sub-section, check its offset + length == the next section + // offset + for (int i=0; i