HADOOP-16440. Distcp can not preserve timestamp with -delete option. Contributed by ludun.
This commit is contained in:
parent
0b45293abb
commit
e60f5e2572
@ -109,13 +109,6 @@ public void commitJob(JobContext jobContext) throws IOException {
|
|||||||
|
|
||||||
cleanupTempFiles(jobContext);
|
cleanupTempFiles(jobContext);
|
||||||
|
|
||||||
String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
|
|
||||||
final boolean preserveRawXattrs =
|
|
||||||
conf.getBoolean(DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
|
|
||||||
if ((attributes != null && !attributes.isEmpty()) || preserveRawXattrs) {
|
|
||||||
preserveFileAttributesForDirectories(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) {
|
if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) {
|
||||||
deleteMissing(conf);
|
deleteMissing(conf);
|
||||||
@ -125,6 +118,13 @@ public void commitJob(JobContext jobContext) throws IOException {
|
|||||||
// save missing information to a directory
|
// save missing information to a directory
|
||||||
trackMissing(conf);
|
trackMissing(conf);
|
||||||
}
|
}
|
||||||
|
// for HDFS-14621, should preserve status after -delete
|
||||||
|
String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
|
||||||
|
final boolean preserveRawXattrs = conf.getBoolean(
|
||||||
|
DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
|
||||||
|
if ((attributes != null && !attributes.isEmpty()) || preserveRawXattrs) {
|
||||||
|
preserveFileAttributesForDirectories(conf);
|
||||||
|
}
|
||||||
taskAttemptContext.setStatus("Commit Successful");
|
taskAttemptContext.setStatus("Commit Successful");
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
@ -26,11 +26,15 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.*;
|
import org.apache.hadoop.mapreduce.*;
|
||||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||||
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||||
import org.apache.hadoop.tools.CopyListing;
|
import org.apache.hadoop.tools.CopyListing;
|
||||||
|
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||||
import org.apache.hadoop.tools.DistCpConstants;
|
import org.apache.hadoop.tools.DistCpConstants;
|
||||||
import org.apache.hadoop.tools.DistCpContext;
|
import org.apache.hadoop.tools.DistCpContext;
|
||||||
import org.apache.hadoop.tools.DistCpOptions;
|
import org.apache.hadoop.tools.DistCpOptions;
|
||||||
@ -204,6 +208,61 @@ public void testDeleteMissing() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for HDFS-14621, should preserve times after -delete
|
||||||
|
@Test
|
||||||
|
public void testPreserveTimeWithDeleteMiss() throws IOException {
|
||||||
|
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
|
||||||
|
JobContext jobContext = new JobContextImpl(
|
||||||
|
taskAttemptContext.getConfiguration(),
|
||||||
|
taskAttemptContext.getTaskAttemptID().getJobID());
|
||||||
|
Configuration conf = jobContext.getConfiguration();
|
||||||
|
|
||||||
|
FileSystem fs = null;
|
||||||
|
try {
|
||||||
|
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
|
||||||
|
fs = FileSystem.get(conf);
|
||||||
|
String sourceBase = TestDistCpUtils.createTestSetup(
|
||||||
|
fs, FsPermission.getDefault());
|
||||||
|
String targetBase = TestDistCpUtils.createTestSetup(
|
||||||
|
fs, FsPermission.getDefault());
|
||||||
|
String targetBaseAdd = TestDistCpUtils.createTestSetup(
|
||||||
|
fs, FsPermission.getDefault());
|
||||||
|
fs.rename(new Path(targetBaseAdd), new Path(targetBase));
|
||||||
|
|
||||||
|
final DistCpOptions options = new DistCpOptions.Builder(
|
||||||
|
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
|
||||||
|
.withSyncFolder(true).withDeleteMissing(true)
|
||||||
|
.preserve(FileAttribute.TIMES).build();
|
||||||
|
options.appendToConf(conf);
|
||||||
|
final DistCpContext context = new DistCpContext(options);
|
||||||
|
|
||||||
|
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
|
||||||
|
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
|
||||||
|
listing.buildListing(listingFile, context);
|
||||||
|
|
||||||
|
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
|
||||||
|
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
||||||
|
|
||||||
|
Path sourceListing = new Path(
|
||||||
|
conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
|
||||||
|
SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
|
||||||
|
SequenceFile.Reader.file(sourceListing));
|
||||||
|
Path targetRoot = new Path(targetBase);
|
||||||
|
|
||||||
|
committer.commitJob(jobContext);
|
||||||
|
checkDirectoryTimes(fs, sourceReader, targetRoot);
|
||||||
|
|
||||||
|
//Test for idempotent commit
|
||||||
|
committer.commitJob(jobContext);
|
||||||
|
checkDirectoryTimes(fs, sourceReader, targetRoot);
|
||||||
|
} finally {
|
||||||
|
TestDistCpUtils.delete(fs, "/tmp1");
|
||||||
|
conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
|
||||||
|
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteMissingFlatInterleavedFiles() throws IOException {
|
public void testDeleteMissingFlatInterleavedFiles() throws IOException {
|
||||||
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
|
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
|
||||||
@ -364,6 +423,27 @@ private void checkDirectoryPermissions(FileSystem fs, String targetBase,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkDirectoryTimes(
|
||||||
|
FileSystem fs, SequenceFile.Reader sourceReader, Path targetRoot)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
|
||||||
|
Text srcRelPath = new Text();
|
||||||
|
|
||||||
|
// Iterate over every source path that was copied.
|
||||||
|
while (sourceReader.next(srcRelPath, srcFileStatus)) {
|
||||||
|
Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
|
||||||
|
FileStatus targetStatus = fs.getFileStatus(targetFile);
|
||||||
|
Assert.assertEquals(srcFileStatus.getModificationTime(),
|
||||||
|
targetStatus.getModificationTime());
|
||||||
|
Assert.assertEquals(srcFileStatus.getAccessTime(),
|
||||||
|
targetStatus.getAccessTime());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeStream(sourceReader);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class NullInputFormat extends InputFormat {
|
private static class NullInputFormat extends InputFormat {
|
||||||
@Override
|
@Override
|
||||||
public List getSplits(JobContext context)
|
public List getSplits(JobContext context)
|
||||||
|
Loading…
Reference in New Issue
Block a user