HADOOP-17122: Preserving Directory Attributes in DistCp with Atomic Copy (#2133)
Contributed by Swaminathan Balachandran
This commit is contained in:
parent
f734455e5d
commit
872c2909bd
@ -318,8 +318,10 @@ private void preserveFileAttributesForDirectories(Configuration conf)
|
||||
SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
|
||||
SequenceFile.Reader.file(sourceListing));
|
||||
long totalLen = clusterFS.getFileStatus(sourceListing).getLen();
|
||||
|
||||
Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
|
||||
// For Atomic Copy the Final & Work Path are different & atomic copy has
|
||||
// already moved it to final path.
|
||||
Path targetRoot =
|
||||
new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
|
||||
|
||||
long preservedEntries = 0;
|
||||
try {
|
||||
|
@ -53,6 +53,8 @@
|
||||
import java.util.*;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
||||
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH;
|
||||
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_WORK_PATH;
|
||||
import static org.apache.hadoop.tools.util.TestDistCpUtils.*;
|
||||
|
||||
public class TestCopyCommitter {
|
||||
@ -160,10 +162,10 @@ public void testPreserveStatus() throws IOException {
|
||||
context.setTargetPathExists(false);
|
||||
|
||||
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
|
||||
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
|
||||
Path listingFile = new Path("/tmp1/" + rand.nextLong());
|
||||
listing.buildListing(listingFile, context);
|
||||
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
|
||||
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
||||
|
||||
committer.commitJob(jobContext);
|
||||
checkDirectoryPermissions(fs, targetBase, sourcePerm);
|
||||
@ -179,6 +181,45 @@ public void testPreserveStatus() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreserveStatusWithAtomicCommit() throws IOException {
|
||||
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
|
||||
JobContext jobContext = new JobContextImpl(
|
||||
taskAttemptContext.getConfiguration(),
|
||||
taskAttemptContext.getTaskAttemptID().getJobID());
|
||||
Configuration conf = jobContext.getConfiguration();
|
||||
String sourceBase;
|
||||
String workBase;
|
||||
String targetBase;
|
||||
FileSystem fs = null;
|
||||
try {
|
||||
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
|
||||
fs = FileSystem.get(conf);
|
||||
FsPermission sourcePerm = new FsPermission((short) 511);
|
||||
FsPermission initialPerm = new FsPermission((short) 448);
|
||||
sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm);
|
||||
workBase = TestDistCpUtils.createTestSetup(fs, initialPerm);
|
||||
targetBase = "/tmp1/" + rand.nextLong();
|
||||
final DistCpOptions options = new DistCpOptions.Builder(
|
||||
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
|
||||
.preserve(FileAttribute.PERMISSION).build();
|
||||
options.appendToConf(conf);
|
||||
final DistCpContext context = new DistCpContext(options);
|
||||
context.setTargetPathExists(false);
|
||||
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
|
||||
Path listingFile = new Path("/tmp1/" + rand.nextLong());
|
||||
listing.buildListing(listingFile, context);
|
||||
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
||||
conf.set(CONF_LABEL_TARGET_WORK_PATH, workBase);
|
||||
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
|
||||
committer.commitJob(jobContext);
|
||||
checkDirectoryPermissions(fs, targetBase, sourcePerm);
|
||||
} finally {
|
||||
TestDistCpUtils.delete(fs, "/tmp1");
|
||||
conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteMissing() throws IOException {
|
||||
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
|
||||
@ -207,8 +248,8 @@ public void testDeleteMissing() throws IOException {
|
||||
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
|
||||
listing.buildListing(listingFile, context);
|
||||
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
||||
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
|
||||
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
||||
|
||||
committer.commitJob(jobContext);
|
||||
verifyFoldersAreInSync(fs, targetBase, sourceBase);
|
||||
@ -256,8 +297,8 @@ public void testPreserveTimeWithDeleteMiss() throws IOException {
|
||||
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
|
||||
listing.buildListing(listingFile, context);
|
||||
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
||||
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
|
||||
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
||||
|
||||
Path sourceListing = new Path(
|
||||
conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
|
||||
@ -320,8 +361,8 @@ public void testDeleteMissingFlatInterleavedFiles() throws IOException {
|
||||
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
|
||||
listing.buildListing(listingFile, context);
|
||||
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
||||
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
|
||||
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
||||
|
||||
committer.commitJob(jobContext);
|
||||
verifyFoldersAreInSync(fs, targetBase, sourceBase);
|
||||
@ -353,8 +394,8 @@ public void testAtomicCommitMissingFinal() throws IOException {
|
||||
fs = FileSystem.get(conf);
|
||||
fs.mkdirs(new Path(workPath));
|
||||
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
|
||||
conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath);
|
||||
conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath);
|
||||
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
|
||||
|
||||
assertPathExists(fs, "Work path", new Path(workPath));
|
||||
@ -391,8 +432,8 @@ public void testAtomicCommitExistingFinal() throws IOException {
|
||||
fs.mkdirs(new Path(workPath));
|
||||
fs.mkdirs(new Path(finalPath));
|
||||
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
|
||||
conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath);
|
||||
conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath);
|
||||
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
|
||||
|
||||
assertPathExists(fs, "Work path", new Path(workPath));
|
||||
@ -463,8 +504,8 @@ private void testCommitWithChecksumMismatch(boolean skipCrc)
|
||||
+ String.valueOf(rand.nextLong()));
|
||||
listing.buildListing(listingFile, context);
|
||||
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
|
||||
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
||||
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
|
||||
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
||||
|
||||
OutputCommitter committer = new CopyCommitter(
|
||||
null, taskAttemptContext);
|
||||
|
Loading…
Reference in New Issue
Block a user