HDFS-8036. Use snapshot path as source when using snapshot diff report in DistCp. Contributed by Jing Zhao.
This commit is contained in:
parent
3c7adaaf35
commit
75cb1d42ab
@ -1345,6 +1345,9 @@ Release 2.7.0 - UNRELEASED
|
|||||||
HDFS-7748. Separate ECN flags from the Status in the DataTransferPipelineAck.
|
HDFS-7748. Separate ECN flags from the Status in the DataTransferPipelineAck.
|
||||||
(Anu Engineer and Haohui Mai via wheat9)
|
(Anu Engineer and Haohui Mai via wheat9)
|
||||||
|
|
||||||
|
HDFS-8036. Use snapshot path as source when using snapshot diff report in
|
||||||
|
DistCp. (Jing Zhao via wheat9)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -86,6 +87,22 @@ static boolean sync(DistCpOptions inputOptions, Configuration conf)
|
|||||||
} finally {
|
} finally {
|
||||||
deleteTargetTmpDir(targetFs, tmpDir);
|
deleteTargetTmpDir(targetFs, tmpDir);
|
||||||
// TODO: since we have tmp directory, we can support "undo" with failures
|
// TODO: since we have tmp directory, we can support "undo" with failures
|
||||||
|
// set the source path using the snapshot path
|
||||||
|
inputOptions.setSourcePaths(Arrays.asList(getSourceSnapshotPath(sourceDir,
|
||||||
|
inputOptions.getToSnapshot())));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getSnapshotName(String name) {
|
||||||
|
return Path.CUR_DIR.equals(name) ? "" : name;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Path getSourceSnapshotPath(Path sourceDir, String snapshotName) {
|
||||||
|
if (Path.CUR_DIR.equals(snapshotName)) {
|
||||||
|
return sourceDir;
|
||||||
|
} else {
|
||||||
|
return new Path(sourceDir,
|
||||||
|
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + snapshotName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,8 +153,10 @@ private static boolean checkNoChange(DistCpOptions inputOptions,
|
|||||||
static DiffInfo[] getDiffs(DistCpOptions inputOptions,
|
static DiffInfo[] getDiffs(DistCpOptions inputOptions,
|
||||||
DistributedFileSystem fs, Path sourceDir, Path targetDir) {
|
DistributedFileSystem fs, Path sourceDir, Path targetDir) {
|
||||||
try {
|
try {
|
||||||
|
final String from = getSnapshotName(inputOptions.getFromSnapshot());
|
||||||
|
final String to = getSnapshotName(inputOptions.getToSnapshot());
|
||||||
SnapshotDiffReport sourceDiff = fs.getSnapshotDiffReport(sourceDir,
|
SnapshotDiffReport sourceDiff = fs.getSnapshotDiffReport(sourceDir,
|
||||||
inputOptions.getFromSnapshot(), inputOptions.getToSnapshot());
|
from, to);
|
||||||
return DiffInfo.getDiffs(sourceDiff, targetDir);
|
return DiffInfo.getDiffs(sourceDiff, targetDir);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e);
|
DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e);
|
||||||
|
@ -90,8 +90,7 @@ public void commitJob(JobContext jobContext) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)
|
if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) {
|
||||||
&& !(conf.getBoolean(DistCpConstants.CONF_LABEL_DIFF, false))) {
|
|
||||||
deleteMissing(conf);
|
deleteMissing(conf);
|
||||||
} else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
|
} else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
|
||||||
commitData(conf);
|
commitData(conf);
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
@ -97,6 +98,8 @@ public void testFallback() throws Exception {
|
|||||||
dfs.createSnapshot(source, "s2");
|
dfs.createSnapshot(source, "s2");
|
||||||
dfs.createSnapshot(target, "s1");
|
dfs.createSnapshot(target, "s1");
|
||||||
Assert.assertTrue(DistCpSync.sync(options, conf));
|
Assert.assertTrue(DistCpSync.sync(options, conf));
|
||||||
|
// reset source paths in options
|
||||||
|
options.setSourcePaths(Arrays.asList(source));
|
||||||
|
|
||||||
// changes have been made in target
|
// changes have been made in target
|
||||||
final Path subTarget = new Path(target, "sub");
|
final Path subTarget = new Path(target, "sub");
|
||||||
@ -183,9 +186,21 @@ public void testSync() throws Exception {
|
|||||||
changeData(source);
|
changeData(source);
|
||||||
dfs.createSnapshot(source, "s2");
|
dfs.createSnapshot(source, "s2");
|
||||||
|
|
||||||
|
// before sync, make some further changes on source. this should not affect
|
||||||
|
// the later distcp since we're copying (s2-s1) to target
|
||||||
|
final Path toDelete = new Path(source, "foo/d1/foo/f1");
|
||||||
|
dfs.delete(toDelete, true);
|
||||||
|
final Path newdir = new Path(source, "foo/d1/foo/newdir");
|
||||||
|
dfs.mkdirs(newdir);
|
||||||
|
|
||||||
// do the sync
|
// do the sync
|
||||||
Assert.assertTrue(DistCpSync.sync(options, conf));
|
Assert.assertTrue(DistCpSync.sync(options, conf));
|
||||||
|
|
||||||
|
// make sure the source path has been updated to the snapshot path
|
||||||
|
final Path spath = new Path(source,
|
||||||
|
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
|
||||||
|
Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
||||||
|
|
||||||
// build copy listing
|
// build copy listing
|
||||||
final Path listingPath = new Path("/tmp/META/fileList.seq");
|
final Path listingPath = new Path("/tmp/META/fileList.seq");
|
||||||
CopyListing listing = new GlobbedCopyListing(conf, new Credentials());
|
CopyListing listing = new GlobbedCopyListing(conf, new Credentials());
|
||||||
@ -209,7 +224,7 @@ public void testSync() throws Exception {
|
|||||||
.getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
|
.getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
|
||||||
|
|
||||||
// verify the source and target now has the same structure
|
// verify the source and target now has the same structure
|
||||||
verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
|
verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<Text, CopyListingFileStatus> getListing(Path listingPath)
|
private Map<Text, CopyListingFileStatus> getListing(Path listingPath)
|
||||||
@ -248,6 +263,29 @@ private void verifyCopy(FileStatus s, FileStatus t, boolean compareName)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar test with testSync, but the "to" snapshot is specified as "."
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSyncWithCurrent() throws Exception {
|
||||||
|
options.setUseDiff(true, "s1", ".");
|
||||||
|
initData(source);
|
||||||
|
initData(target);
|
||||||
|
dfs.allowSnapshot(source);
|
||||||
|
dfs.allowSnapshot(target);
|
||||||
|
dfs.createSnapshot(source, "s1");
|
||||||
|
dfs.createSnapshot(target, "s1");
|
||||||
|
|
||||||
|
// make changes under source
|
||||||
|
changeData(source);
|
||||||
|
|
||||||
|
// do the sync
|
||||||
|
Assert.assertTrue(DistCpSync.sync(options, conf));
|
||||||
|
// make sure the source path is still unchanged
|
||||||
|
Assert.assertEquals(source, options.getSourcePaths().get(0));
|
||||||
|
}
|
||||||
|
|
||||||
private void initData2(Path dir) throws Exception {
|
private void initData2(Path dir) throws Exception {
|
||||||
final Path test = new Path(dir, "test");
|
final Path test = new Path(dir, "test");
|
||||||
final Path foo = new Path(dir, "foo");
|
final Path foo = new Path(dir, "foo");
|
||||||
|
Loading…
Reference in New Issue
Block a user