diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java index 89033efdd5..58a218e5d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java @@ -35,6 +35,7 @@ import javax.management.ObjectName; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -75,7 +76,7 @@ public class SnapshotManager implements SnapshotStatsMXBean { public static final Log LOG = LogFactory.getLog(SnapshotManager.class); private final FSDirectory fsdir; - private final boolean captureOpenFiles; + private boolean captureOpenFiles; /** * If skipCaptureAccessTimeOnlyChange is set to true, if accessTime * of a file changed but there is no other modification made to the file, @@ -121,6 +122,11 @@ public SnapshotManager(final Configuration conf, final FSDirectory fsdir) { + snapshotDiffAllowSnapRootDescendant); } + @VisibleForTesting + void setCaptureOpenFiles(boolean captureOpenFiles) { + this.captureOpenFiles = captureOpenFiles; + } + /** * @return skipCaptureAccessTimeOnlyChange */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java index 537612ca29..be118a362b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java @@ -843,6 +843,123 @@ public void testOpenFilesSnapChecksumWithTrunkAndAppend() throws Exception { hbaseFileCksumBeforeTruncate, hbaseFileCksumS3); } + private Path createSnapshot(Path snapRootDir, String snapName, + String fileName) throws Exception { + final Path snap1Dir = SnapshotTestHelper.createSnapshot( + fs, snapRootDir, snapName); + return new Path(snap1Dir, fileName); + } + + private void verifyFileSize(long fileSize, Path... filePaths) throws + IOException { + for (Path filePath : filePaths) { + Assert.assertEquals(fileSize, fs.getFileStatus(filePath).getLen()); + } + } + + /** + * Verify open files captured in the snapshots across config disable + * and enable. + */ + @Test + public void testOpenFilesWithMixedConfig() throws Exception { + final Path snapRootDir = new Path("/level_0_A"); + final String flumeFileName = "flume.log"; + final String snap1Name = "s1"; + final String snap2Name = "s2"; + final String snap3Name = "s3"; + final String snap4Name = "s4"; + final String snap5Name = "s5"; + + // Create files and open streams + final Path flumeFile = new Path(snapRootDir, flumeFileName); + createFile(flumeFile); + FSDataOutputStream flumeOutputStream = fs.append(flumeFile); + + // 1. Disable capture open files + cluster.getNameNode().getNamesystem() + .getSnapshotManager().setCaptureOpenFiles(false); + + // Create Snapshot S1 + final Path flumeS1Path = createSnapshot(snapRootDir, + snap1Name, flumeFileName); + + // Verify if Snap S1 file length is same as the the current versions + verifyFileSize(FILELEN, flumeS1Path); + + // Write more data to files + long flumeFileWrittenDataLength = FILELEN; + int newWriteLength = (int) (BLOCKSIZE * 1.5); + byte[] buf = new byte[newWriteLength]; + Random random = new Random(); + random.nextBytes(buf); + flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); + + // Create Snapshot S2 + final Path flumeS2Path = createSnapshot(snapRootDir, + snap2Name, flumeFileName); + + // Since capture open files was disabled, all snapshots paths + // and the current version should have same file lengths. + verifyFileSize(flumeFileWrittenDataLength, + flumeFile, flumeS2Path, flumeS1Path); + + // 2. Enable capture open files + cluster.getNameNode().getNamesystem() + .getSnapshotManager() .setCaptureOpenFiles(true); + + // Write more data to files + flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); + long flumeFileLengthAfterS3 = flumeFileWrittenDataLength; + + // Create Snapshot S3 + final Path flumeS3Path = createSnapshot(snapRootDir, + snap3Name, flumeFileName); + + // Since open files captured in the previous snapshots were with config + // disabled, their file lengths are now same as the current version. + // With the config turned on, any new data written to the open files + // will no more reflect in the current version or old snapshot paths. + verifyFileSize(flumeFileWrittenDataLength, flumeFile, flumeS3Path, + flumeS2Path, flumeS1Path); + + // Write more data to files + flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); + + // Create Snapshot S4 + final Path flumeS4Path = createSnapshot(snapRootDir, + snap4Name, flumeFileName); + + // Verify S4 has the latest data + verifyFileSize(flumeFileWrittenDataLength, flumeFile, flumeS4Path); + + // But, open files captured as of Snapshot S3 and before should + // have their old file lengths intact. + verifyFileSize(flumeFileLengthAfterS3, flumeS3Path, + flumeS2Path, flumeS1Path); + + long flumeFileLengthAfterS4 = flumeFileWrittenDataLength; + + // 3. Disable capture open files + cluster.getNameNode().getNamesystem() + .getSnapshotManager() .setCaptureOpenFiles(false); + + // Create Snapshot S5 + final Path flumeS5Path = createSnapshot(snapRootDir, + snap5Name, flumeFileName); + + flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); + + // Since capture open files was disabled, any snapshots taken after the + // config change and the current version should have same file lengths + // for the open files. + verifyFileSize(flumeFileWrittenDataLength, flumeFile, flumeS5Path); + + // But, the old snapshots taken before the config disable should + // continue to be consistent. + verifyFileSize(flumeFileLengthAfterS4, flumeS4Path); + } + private void restartNameNode() throws Exception { cluster.triggerBlockReports(); NameNode nameNode = cluster.getNameNode();