HDFS-16145. CopyListing fails with FNF exception with snapshot diff. (#3234)
This commit is contained in:
parent
b4a524722a
commit
dac10fcc20
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@ -37,6 +38,7 @@
|
||||
import java.util.EnumMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* This class provides the basic functionality to sync two FileSystems based on
|
||||
@ -59,6 +61,9 @@ class DistCpSync {
|
||||
//
|
||||
private EnumMap<SnapshotDiffReport.DiffType, List<DiffInfo>> diffMap;
|
||||
private DiffInfo[] renameDiffs;
|
||||
// entries which are marked deleted because of rename to a excluded target
|
||||
// path
|
||||
private List<DiffInfo> deletedByExclusionDiffs;
|
||||
private CopyFilter copyFilter;
|
||||
|
||||
DistCpSync(DistCpContext context, Configuration conf) {
|
||||
@ -68,6 +73,11 @@ class DistCpSync {
|
||||
this.copyFilter.initialize();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setCopyFilter(CopyFilter copyFilter) {
|
||||
this.copyFilter = copyFilter;
|
||||
}
|
||||
|
||||
private boolean isRdiff() {
|
||||
return context.shouldUseRdiff();
|
||||
}
|
||||
@ -222,7 +232,7 @@ private boolean getAllDiffs() throws IOException {
|
||||
SnapshotDiffReport.DiffType.values()) {
|
||||
diffMap.put(type, new ArrayList<DiffInfo>());
|
||||
}
|
||||
|
||||
deletedByExclusionDiffs = null;
|
||||
for (SnapshotDiffReport.DiffReportEntry entry : report.getDiffList()) {
|
||||
// If the entry is the snapshot root, usually a item like "M\t."
|
||||
// in the diff report. We don't need to handle it and cannot handle it,
|
||||
@ -250,8 +260,13 @@ private boolean getAllDiffs() throws IOException {
|
||||
list.add(new DiffInfo(source, target, dt));
|
||||
} else {
|
||||
list = diffMap.get(SnapshotDiffReport.DiffType.DELETE);
|
||||
list.add(new DiffInfo(source, target,
|
||||
SnapshotDiffReport.DiffType.DELETE));
|
||||
DiffInfo info = new DiffInfo(source, target,
|
||||
SnapshotDiffReport.DiffType.DELETE);
|
||||
list.add(info);
|
||||
if (deletedByExclusionDiffs == null) {
|
||||
deletedByExclusionDiffs = new ArrayList<>();
|
||||
}
|
||||
deletedByExclusionDiffs.add(info);
|
||||
}
|
||||
} else if (copyFilter.shouldCopy(relativeTarget)) {
|
||||
list = diffMap.get(SnapshotDiffReport.DiffType.CREATE);
|
||||
@ -260,6 +275,9 @@ private boolean getAllDiffs() throws IOException {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (deletedByExclusionDiffs != null) {
|
||||
Collections.sort(deletedByExclusionDiffs, DiffInfo.sourceComparator);
|
||||
}
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
DistCp.LOG.warn("Failed to compute snapshot diff on " + ssDir, e);
|
||||
@ -515,6 +533,33 @@ private DiffInfo getRenameItem(DiffInfo diff, DiffInfo[] renameDiffArray) {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* checks if a parent dir is marked deleted as a part of dir rename happening
|
||||
* to a path which is excluded by the the filter.
|
||||
* @return true if it's marked deleted
|
||||
*/
|
||||
private boolean isParentOrSelfMarkedDeleted(DiffInfo diff,
|
||||
List<DiffInfo> deletedDirDiffArray) {
|
||||
for (DiffInfo item : deletedDirDiffArray) {
|
||||
if (item.getSource().equals(diff.getSource())) {
|
||||
// The same path string may appear in:
|
||||
// 1. both deleted and modified snapshot diff entries.
|
||||
// 2. both deleted and created snapshot diff entries.
|
||||
// Case 1 is the about same file/directory, whereas case 2
|
||||
// is about two different files/directories.
|
||||
// We are finding case 1 here, thus we check against DiffType.MODIFY.
|
||||
if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) {
|
||||
return true;
|
||||
}
|
||||
} else if (isParentOf(item.getSource(), diff.getSource())) {
|
||||
// If deleted entry is the parent of diff entry, then both MODIFY and
|
||||
// CREATE diff entries should be handled.
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* For a given sourcePath, get its real path if it or its parent was renamed.
|
||||
*
|
||||
@ -567,6 +612,19 @@ public ArrayList<DiffInfo> prepareDiffListForCopyListing() {
|
||||
renameDiffsList.toArray(new DiffInfo[renameDiffsList.size()]);
|
||||
Arrays.sort(renameDiffArray, DiffInfo.sourceComparator);
|
||||
for (DiffInfo diff : modifyAndCreateDiffs) {
|
||||
// In cases, where files/dirs got created after a snapshot is taken
|
||||
// and then the parent dir is moved to location which is excluded by
|
||||
// the filters. For example, files/dirs created inside a dir in an
|
||||
// encryption zone in HDFS. When the parent dir gets deleted, it will be
|
||||
// moved to trash within which is inside the encryption zone itself.
|
||||
// If the trash path gets excluded by filters , the dir will be marked
|
||||
// for DELETE for the target location. All the subsequent creates should
|
||||
// for such dirs should be ignored as well as the modify operation
|
||||
// on the dir itself.
|
||||
if (deletedByExclusionDiffs != null && isParentOrSelfMarkedDeleted(diff,
|
||||
deletedByExclusionDiffs)) {
|
||||
continue;
|
||||
}
|
||||
DiffInfo renameItem = getRenameItem(diff, renameDiffArray);
|
||||
if (renameItem == null) {
|
||||
diff.setTarget(diff.getSource());
|
||||
|
@ -53,6 +53,7 @@
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class TestDistCpSync {
|
||||
private MiniDFSCluster cluster;
|
||||
@ -292,6 +293,175 @@ public void testSync() throws Exception {
|
||||
verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the basic functionality.
|
||||
*/
|
||||
@Test
|
||||
public void testSync1() throws Exception {
|
||||
Path srcpath = new Path(source, "encz-mock");
|
||||
dfs.mkdirs(srcpath);
|
||||
dfs.mkdirs(new Path(source, "encz-mock/datedir"));
|
||||
enableAndCreateFirstSnapshot();
|
||||
|
||||
// before sync, make some further changes on source
|
||||
DFSTestUtil.createFile(dfs, new Path(source, "encz-mock/datedir/file1"),
|
||||
BLOCK_SIZE, DATA_NUM, 0);
|
||||
dfs.delete(new Path(source, "encz-mock/datedir"), true);
|
||||
dfs.mkdirs(new Path(source, "encz-mock/datedir"));
|
||||
DFSTestUtil.createFile(dfs, new Path(source, "encz-mock/datedir/file2"),
|
||||
BLOCK_SIZE, DATA_NUM, 0);
|
||||
dfs.createSnapshot(source, "s2");
|
||||
Assert.assertTrue(dfs.exists(new Path(source, "encz-mock/datedir/file2")));
|
||||
|
||||
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
|
||||
System.out.println(report);
|
||||
|
||||
DistCpSync distCpSync = new DistCpSync(context, conf);
|
||||
|
||||
// do the sync
|
||||
Assert.assertTrue(distCpSync.sync());
|
||||
// make sure the source path has been updated to the snapshot path
|
||||
final Path spath = new Path(source,
|
||||
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
|
||||
Assert.assertEquals(spath, context.getSourcePaths().get(0));
|
||||
|
||||
// build copy listing
|
||||
final Path listingPath = new Path("/tmp/META/fileList.seq");
|
||||
CopyListing listing =
|
||||
new SimpleCopyListing(conf, new Credentials(), distCpSync);
|
||||
listing.buildListing(listingPath, context);
|
||||
|
||||
Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(conf, null, 0);
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context mapContext =
|
||||
stubContext.getContext();
|
||||
copyMapper.setup(mapContext);
|
||||
for (Map.Entry<Text, CopyListingFileStatus> entry : copyListing
|
||||
.entrySet()) {
|
||||
copyMapper.map(entry.getKey(), entry.getValue(), mapContext);
|
||||
}
|
||||
Assert.assertTrue(dfs.exists(new Path(target, "encz-mock/datedir/file2")));
|
||||
// verify the source and target now has the same structure
|
||||
verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the basic functionality.
|
||||
*/
|
||||
@Test
|
||||
public void testSyncNew() throws Exception {
|
||||
Path srcpath = new Path(source, "encz-mock");
|
||||
dfs.mkdirs(srcpath);
|
||||
dfs.mkdirs(new Path(source, "encz-mock/datedir"));
|
||||
dfs.mkdirs(new Path(source, "trash"));
|
||||
enableAndCreateFirstSnapshot();
|
||||
|
||||
// before sync, make some further changes on source
|
||||
DFSTestUtil.createFile(dfs, new Path(source, "encz-mock/datedir/file1"),
|
||||
BLOCK_SIZE, DATA_NUM, 0);
|
||||
dfs.rename(new Path(source, "encz-mock/datedir"),
|
||||
new Path(source, "trash"));
|
||||
dfs.mkdirs(new Path(source, "encz-mock/datedir"));
|
||||
DFSTestUtil.createFile(dfs, new Path(source, "encz-mock/datedir/file2"),
|
||||
BLOCK_SIZE, DATA_NUM, 0);
|
||||
dfs.createSnapshot(source, "s2");
|
||||
Assert.assertTrue(dfs.exists(new Path(source, "encz-mock/datedir/file2")));
|
||||
|
||||
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
|
||||
System.out.println(report);
|
||||
|
||||
DistCpSync distCpSync = new DistCpSync(context, conf);
|
||||
|
||||
// do the sync
|
||||
Assert.assertTrue(distCpSync.sync());
|
||||
// make sure the source path has been updated to the snapshot path
|
||||
final Path spath = new Path(source,
|
||||
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
|
||||
Assert.assertEquals(spath, context.getSourcePaths().get(0));
|
||||
|
||||
// build copy listing
|
||||
final Path listingPath = new Path("/tmp/META/fileList.seq");
|
||||
CopyListing listing =
|
||||
new SimpleCopyListing(conf, new Credentials(), distCpSync);
|
||||
listing.buildListing(listingPath, context);
|
||||
|
||||
Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(conf, null, 0);
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context mapContext =
|
||||
stubContext.getContext();
|
||||
copyMapper.setup(mapContext);
|
||||
for (Map.Entry<Text, CopyListingFileStatus> entry : copyListing
|
||||
.entrySet()) {
|
||||
copyMapper.map(entry.getKey(), entry.getValue(), mapContext);
|
||||
}
|
||||
Assert.assertTrue(dfs.exists(new Path(target, "encz-mock/datedir/file2")));
|
||||
Assert.assertTrue(dfs.exists(new Path(target, "trash/datedir/file1")));
|
||||
// verify the source and target now has the same structure
|
||||
verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the basic functionality.
|
||||
*/
|
||||
@Test
|
||||
public void testSyncWithFilters() throws Exception {
|
||||
Path srcpath = new Path(source, "encz-mock");
|
||||
dfs.mkdirs(srcpath);
|
||||
dfs.mkdirs(new Path(source, "encz-mock/datedir"));
|
||||
dfs.mkdirs(new Path(source, "trash"));
|
||||
enableAndCreateFirstSnapshot();
|
||||
|
||||
// before sync, make some further changes on source
|
||||
DFSTestUtil.createFile(dfs, new Path(source, "encz-mock/datedir/file1"),
|
||||
BLOCK_SIZE, DATA_NUM, 0);
|
||||
dfs.rename(new Path(source, "encz-mock/datedir"),
|
||||
new Path(source, "trash"));
|
||||
dfs.mkdirs(new Path(source, "encz-mock/datedir"));
|
||||
DFSTestUtil.createFile(dfs, new Path(source, "encz-mock/datedir/file2"),
|
||||
BLOCK_SIZE, DATA_NUM, 0);
|
||||
dfs.createSnapshot(source, "s2");
|
||||
Assert.assertTrue(dfs.exists(new Path(source, "encz-mock/datedir/file2")));
|
||||
|
||||
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
|
||||
System.out.println(report);
|
||||
List<Pattern> filters = new ArrayList<>();
|
||||
filters.add(Pattern.compile(".*trash.*"));
|
||||
RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile");
|
||||
regexCopyFilter.setFilters(filters);
|
||||
|
||||
DistCpSync distCpSync = new DistCpSync(context, conf);
|
||||
distCpSync.setCopyFilter(regexCopyFilter);
|
||||
|
||||
// do the sync
|
||||
Assert.assertTrue(distCpSync.sync());
|
||||
// make sure the source path has been updated to the snapshot path
|
||||
final Path spath = new Path(source,
|
||||
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
|
||||
Assert.assertEquals(spath, context.getSourcePaths().get(0));
|
||||
|
||||
// build copy listing
|
||||
final Path listingPath = new Path("/tmp/META/fileList.seq");
|
||||
CopyListing listing =
|
||||
new SimpleCopyListing(conf, new Credentials(), distCpSync);
|
||||
listing.buildListing(listingPath, context);
|
||||
|
||||
Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(conf, null, 0);
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context mapContext =
|
||||
stubContext.getContext();
|
||||
copyMapper.setup(mapContext);
|
||||
for (Map.Entry<Text, CopyListingFileStatus> entry : copyListing
|
||||
.entrySet()) {
|
||||
copyMapper.map(entry.getKey(), entry.getValue(), mapContext);
|
||||
}
|
||||
Assert.assertTrue(dfs.exists(new Path(target, "encz-mock/datedir/file2")));
|
||||
Assert.assertFalse(dfs.exists(new Path(target, "encz-mock/datedir/file1")));
|
||||
Assert.assertFalse(dfs.exists(new Path(target, "trash/datedir/file1")));
|
||||
}
|
||||
|
||||
private Map<Text, CopyListingFileStatus> getListing(Path listingPath)
|
||||
throws Exception {
|
||||
SequenceFile.Reader reader = new SequenceFile.Reader(conf,
|
||||
|
Loading…
Reference in New Issue
Block a user