diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 1f21871ac7..08fd39f481 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -790,6 +790,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { outputInodes = 0; parent.commitSubSection(summary, FSImageFormatProtobuf.SectionName.INODE_DIR_SUB); + out = parent.getSectionOutputStream(); } } parent.commitSectionAndSubSection(summary, @@ -817,6 +818,7 @@ void serializeINodeSection(OutputStream out) throws IOException { if (i % parent.getInodesPerSubSection() == 0) { parent.commitSubSection(summary, FSImageFormatProtobuf.SectionName.INODE_SUB); + out = parent.getSectionOutputStream(); } } parent.commitSectionAndSubSection(summary, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index 58c24d4377..edacb7eaaf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -584,18 +584,6 @@ private void loadErasureCodingSection(InputStream in) private static boolean enableParallelSaveAndLoad(Configuration conf) { boolean loadInParallel = enableParallelLoad; - boolean compressionEnabled = conf.getBoolean( - DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, - DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT); - - if (loadInParallel) { - if (compressionEnabled) { - LOG.warn("Parallel Image loading and saving is not supported when {}" + - " is set to true. Parallel will be disabled.", - DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY); - loadInParallel = false; - } - } return loadInParallel; } @@ -653,7 +641,11 @@ public int getInodesPerSubSection() { return inodesPerSubSection; } - /** + public OutputStream getSectionOutputStream() { + return sectionOutputStream; + } + + /** * Commit the length and offset of a fsimage section to the summary index, * including the sub section, which will be committed before the section is * committed. @@ -664,14 +656,22 @@ public int getInodesPerSubSection() { */ public void commitSectionAndSubSection(FileSummary.Builder summary, SectionName name, SectionName subSectionName) throws IOException { - commitSubSection(summary, subSectionName); - commitSection(summary, name); + commitSubSection(summary, subSectionName, true); + commitSection(summary, name, true); } public void commitSection(FileSummary.Builder summary, SectionName name) - throws IOException { + throws IOException { + commitSection(summary, name, false); + } + + public void commitSection(FileSummary.Builder summary, SectionName name, + boolean afterSubSectionCommit) throws IOException { long oldOffset = currentOffset; - flushSectionOutputStream(); + boolean subSectionCommitted = afterSubSectionCommit && writeSubSections; + if (!subSectionCommitted) { + flushSectionOutputStream(); + } if (codec != null) { sectionOutputStream = codec.createOutputStream(underlyingOutputStream); @@ -685,14 +685,20 @@ public void commitSection(FileSummary.Builder summary, SectionName name) subSectionOffset = currentOffset; } + public void commitSubSection(FileSummary.Builder summary, SectionName name) + throws IOException { + this.commitSubSection(summary, name, false); + } + /** * Commit the length and offset of a fsimage sub-section to the summary * index. * @param summary The image summary object * @param name The name of the sub-section to commit + * @param isLast True if sub-section is the last sub-section of each section * @throws IOException */ - public void commitSubSection(FileSummary.Builder summary, SectionName name) + public void commitSubSection(FileSummary.Builder summary, SectionName name, boolean isLast) throws IOException { if (!writeSubSections) { return; @@ -701,7 +707,15 @@ public void commitSubSection(FileSummary.Builder summary, SectionName name) LOG.debug("Saving a subsection for {}", name.toString()); // The output stream must be flushed before the length is obtained // as the flush can move the length forward. - sectionOutputStream.flush(); + flushSectionOutputStream(); + + if (codec == null || isLast) { + // To avoid empty sub-section, Do not create CompressionOutputStream + // if sub-section is last sub-section of each section + sectionOutputStream = underlyingOutputStream; + } else { + sectionOutputStream = codec.createOutputStream(underlyingOutputStream); + } long length = fileChannel.position() - subSectionOffset; if (length == 0) { LOG.warn("The requested section for {} is empty. It will not be " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java index a9b2191976..2365a6a266 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java @@ -530,8 +530,8 @@ public void serializeSnapshotDiffSection(OutputStream out) context.checkCancelled(); } if (i % parent.getInodesPerSubSection() == 0) { - parent.commitSubSection(headers, - FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB); + parent.commitSubSection(headers, FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB); + out = parent.getSectionOutputStream(); } } parent.commitSectionAndSubSection(headers, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java index 2a7a7105fd..cdc067aeb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java @@ -1120,7 +1120,7 @@ public void testParallelSaveAndLoad() throws IOException { } @Test - public void testNoParallelSectionsWithCompressionEnabled() + public void testParallelSaveAndLoadWithCompression() throws IOException { Configuration conf = new Configuration(); conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true); @@ -1137,16 +1137,21 @@ public void testNoParallelSectionsWithCompressionEnabled() getLatestImageSummary(cluster); ArrayList
sections = Lists.newArrayList( summary.getSectionsList()); + Section inodeSection = + getSubSectionsOfName(sections, SectionName.INODE).get(0); + Section dirSection = getSubSectionsOfName(sections, + SectionName.INODE_DIR).get(0); ArrayList
inodeSubSections = getSubSectionsOfName(sections, SectionName.INODE_SUB); ArrayList
dirSubSections = getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB); + // Compression and parallel can be enabled at the same time. + assertEquals(4, inodeSubSections.size()); + assertEquals(4, dirSubSections.size()); - // As compression is enabled, there should be no sub-sections in the - // image header - assertEquals(0, inodeSubSections.size()); - assertEquals(0, dirSubSections.size()); + ensureSubSectionsAlignWithParent(inodeSubSections, inodeSection); + ensureSubSectionsAlignWithParent(dirSubSections, dirSection); } finally { if (cluster != null) { cluster.shutdown(); @@ -1229,4 +1234,4 @@ public void testUpdateBlocksMapAndNameCacheAsync() throws IOException { SnapshotTestHelper.compareDumpedTreeInFile( preRestartTree, postRestartTree, true); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java index 1c1bb61721..48a6a2b777 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.SafeModeAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -77,15 +78,18 @@ public class TestFSImageWithSnapshot { MiniDFSCluster cluster; FSNamesystem fsn; DistributedFileSystem hdfs; + + public void createCluster() throws IOException { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); + cluster.waitActive(); + fsn = cluster.getNamesystem(); + hdfs = cluster.getFileSystem(); + } @Before public void setUp() throws Exception { conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES) - .build(); - cluster.waitActive(); - fsn = cluster.getNamesystem(); - hdfs = cluster.getFileSystem(); + createCluster(); } @After @@ -512,6 +516,32 @@ public void testSaveLoadImageAfterSnapshotDeletion() hdfs = cluster.getFileSystem(); } + /** + * Test parallel compressed fsimage can be loaded serially. + */ + @Test + public void testLoadParallelCompressedImageSerial() throws Exception { + int s = 0; + cluster.shutdown(); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); + cluster.waitActive(); + fsn = cluster.getNamesystem(); + hdfs = cluster.getFileSystem(); + hdfs.mkdirs(dir); + SnapshotTestHelper.createSnapshot(hdfs, dir, "s"); + + Path sub1 = new Path(dir, "sub1"); + Path sub1file1 = new Path(sub1, "sub1file1"); + Path sub1file2 = new Path(sub1, "sub1file2"); + DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, (short) 1, seed); + DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, (short) 1, seed); + + conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false); + conf.setBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, false); + checkImage(s); + } + void rename(Path src, Path dst) throws Exception { printTree("Before rename " + src + " -> " + dst); hdfs.rename(src, dst); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshotParallelAndCompress.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshotParallelAndCompress.java new file mode 100644 index 0000000000..0a80bc9d54 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshotParallelAndCompress.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; + +import org.slf4j.event.Level; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.test.GenericTestUtils; + +/** + * This test extends TestFSImageWithSnapshot to test + * enable both fsimage load parallel and fsimage compress. + */ +public class TestFSImageWithSnapshotParallelAndCompress extends TestFSImageWithSnapshot { + { + SnapshotTestHelper.disableLogs(); + GenericTestUtils.setLogLevel(INode.LOG, Level.TRACE); + } + + @Override + public void createCluster() throws IOException { + + // turn on both parallelization and compression + conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true); + conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, GzipCodec.class.getCanonicalName()); + conf.setBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, true); + conf.setInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, 2); + conf.setInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, 2); + conf.setInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, 2); + + conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); + cluster.waitActive(); + fsn = cluster.getNamesystem(); + hdfs = cluster.getFileSystem(); + } +}