HADOOP-18117. Add an option to preserve root directory permissions (#3970)

This commit is contained in:
Mohanad Elsafty 2022-02-18 19:12:50 +08:00 committed by GitHub
parent de526e166c
commit a4f459097b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 115 additions and 7 deletions

View File

@ -86,6 +86,8 @@ private DistCpConstants() {
public static final String CONF_LABEL_SPLIT_RATIO = public static final String CONF_LABEL_SPLIT_RATIO =
"distcp.dynamic.split.ratio"; "distcp.dynamic.split.ratio";
public static final String CONF_LABEL_DIRECT_WRITE = "distcp.direct.write"; public static final String CONF_LABEL_DIRECT_WRITE = "distcp.direct.write";
public static final String CONF_LABEL_UPDATE_ROOT =
"distcp.update.root.attributes";
/* Total bytes to be copied. Updated by copylisting. Unfiltered count */ /* Total bytes to be copied. Updated by copylisting. Unfiltered count */
public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected"; public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";

View File

@ -175,6 +175,10 @@ public boolean shouldUseIterator() {
return options.shouldUseIterator(); return options.shouldUseIterator();
} }
public boolean shouldUpdateRoot() {
return options.shouldUpdateRoot();
}
public final boolean splitLargeFile() { public final boolean splitLargeFile() {
return options.getBlocksPerChunk() > 0; return options.getBlocksPerChunk() > 0;
} }

View File

@ -244,8 +244,12 @@ public enum DistCpOptionSwitch {
USE_ITERATOR(DistCpConstants.CONF_LABEL_USE_ITERATOR, USE_ITERATOR(DistCpConstants.CONF_LABEL_USE_ITERATOR,
new Option("useiterator", false, new Option("useiterator", false,
"Use single threaded list status iterator to build " "Use single threaded list status iterator to build "
+ "the listing to save the memory utilisation at the client")); + "the listing to save the memory utilisation at the client")),
UPDATE_ROOT(DistCpConstants.CONF_LABEL_UPDATE_ROOT,
new Option("updateRoot", false,
"Update root directory attributes "
+ "(eg permissions, ownership ...)"));
public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct"; public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct";
private final String confLabel; private final String confLabel;

View File

@ -162,6 +162,8 @@ public final class DistCpOptions {
private final boolean useIterator; private final boolean useIterator;
private final boolean updateRoot;
/** /**
* File attributes for preserve. * File attributes for preserve.
* *
@ -228,6 +230,8 @@ private DistCpOptions(Builder builder) {
this.directWrite = builder.directWrite; this.directWrite = builder.directWrite;
this.useIterator = builder.useIterator; this.useIterator = builder.useIterator;
this.updateRoot = builder.updateRoot;
} }
public Path getSourceFileListing() { public Path getSourceFileListing() {
@ -374,6 +378,10 @@ public boolean shouldUseIterator() {
return useIterator; return useIterator;
} }
public boolean shouldUpdateRoot() {
return updateRoot;
}
/** /**
* Add options to configuration. These will be used in the Mapper/committer * Add options to configuration. These will be used in the Mapper/committer
* *
@ -427,6 +435,9 @@ public void appendToConf(Configuration conf) {
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.USE_ITERATOR, DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.USE_ITERATOR,
String.valueOf(useIterator)); String.valueOf(useIterator));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.UPDATE_ROOT,
String.valueOf(updateRoot));
} }
/** /**
@ -465,6 +476,7 @@ public String toString() {
", verboseLog=" + verboseLog + ", verboseLog=" + verboseLog +
", directWrite=" + directWrite + ", directWrite=" + directWrite +
", useiterator=" + useIterator + ", useiterator=" + useIterator +
", updateRoot=" + updateRoot +
'}'; '}';
} }
@ -518,6 +530,8 @@ public static class Builder {
private boolean useIterator = false; private boolean useIterator = false;
private boolean updateRoot = false;
public Builder(List<Path> sourcePaths, Path targetPath) { public Builder(List<Path> sourcePaths, Path targetPath) {
Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(), Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(),
"Source paths should not be null or empty!"); "Source paths should not be null or empty!");
@ -780,6 +794,11 @@ public Builder withUseIterator(boolean useItr) {
this.useIterator = useItr; this.useIterator = useItr;
return this; return this;
} }
public Builder withUpdateRoot(boolean updateRootAttrs) {
this.updateRoot = updateRootAttrs;
return this;
}
} }
} }

View File

@ -117,7 +117,9 @@ public static DistCpOptions parse(String[] args)
.withDirectWrite( .withDirectWrite(
command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch())) command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch()))
.withUseIterator( .withUseIterator(
command.hasOption(DistCpOptionSwitch.USE_ITERATOR.getSwitch())); command.hasOption(DistCpOptionSwitch.USE_ITERATOR.getSwitch()))
.withUpdateRoot(
command.hasOption(DistCpOptionSwitch.UPDATE_ROOT.getSwitch()));
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) { if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
String[] snapshots = getVals(command, String[] snapshots = getVals(command,

View File

@ -616,10 +616,12 @@ private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
DistCpContext context) throws IOException { DistCpContext context) throws IOException {
boolean syncOrOverwrite = context.shouldSyncFolder() || boolean syncOrOverwrite = context.shouldSyncFolder() ||
context.shouldOverwrite(); context.shouldOverwrite();
boolean skipRootPath = syncOrOverwrite && !context.shouldUpdateRoot();
for (CopyListingFileStatus fs : fileStatus) { for (CopyListingFileStatus fs : fileStatus) {
if (fs.getPath().equals(sourcePathRoot) && if (fs.getPath().equals(sourcePathRoot) &&
fs.isDirectory() && syncOrOverwrite) { fs.isDirectory() && skipRootPath) {
// Skip the root-paths when syncOrOverwrite // Skip the root-paths when skipRootPath (syncOrOverwrite and
// update root directory is not a must).
LOG.debug("Skip {}", fs.getPath()); LOG.debug("Skip {}", fs.getPath());
return; return;
} }

View File

@ -75,6 +75,7 @@ public class CopyCommitter extends FileOutputCommitter {
private boolean ignoreFailures = false; private boolean ignoreFailures = false;
private boolean skipCrc = false; private boolean skipCrc = false;
private int blocksPerChunk = 0; private int blocksPerChunk = 0;
private boolean updateRoot = false;
/** /**
* Create a output committer * Create a output committer
@ -100,6 +101,8 @@ public void commitJob(JobContext jobContext) throws IOException {
Configuration conf = jobContext.getConfiguration(); Configuration conf = jobContext.getConfiguration();
syncFolder = conf.getBoolean(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, false); syncFolder = conf.getBoolean(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, false);
overwrite = conf.getBoolean(DistCpConstants.CONF_LABEL_OVERWRITE, false); overwrite = conf.getBoolean(DistCpConstants.CONF_LABEL_OVERWRITE, false);
updateRoot =
conf.getBoolean(CONF_LABEL_UPDATE_ROOT, false);
targetPathExists = conf.getBoolean( targetPathExists = conf.getBoolean(
DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true); DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true);
ignoreFailures = conf.getBoolean( ignoreFailures = conf.getBoolean(
@ -336,9 +339,12 @@ private void preserveFileAttributesForDirectories(Configuration conf)
Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath); Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
// //
// Skip the root folder when syncOrOverwrite is true. // Skip the root folder when skipRoot is true.
// //
if (targetRoot.equals(targetFile) && syncOrOverwrite) continue; boolean skipRoot = syncOrOverwrite && !updateRoot;
if (targetRoot.equals(targetFile) && skipRoot) {
continue;
}
FileSystem targetFS = targetFile.getFileSystem(conf); FileSystem targetFS = targetFile.getFileSystem(conf);
DistCpUtils.preserve(targetFS, targetFile, srcFileStatus, attributes, DistCpUtils.preserve(targetFS, targetFile, srcFileStatus, attributes,

View File

@ -363,6 +363,7 @@ Command Line Options
| `-xtrack <path>` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option. | | `-xtrack <path>` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option. |
| `-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store | | `-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store |
| `-useiterator` | Uses single threaded listStatusIterator to build listing | Useful for saving memory at the client side. Using this option will ignore the numListstatusThreads option | | `-useiterator` | Uses single threaded listStatusIterator to build listing | Useful for saving memory at the client side. Using this option will ignore the numListstatusThreads option |
| `-updateRoot` | Update root directory attributes (eg permissions, ownership ...) | Useful if you need to enforce root directory attributes update when using distcp |
Architecture of DistCp Architecture of DistCp
---------------------- ----------------------

View File

@ -289,7 +289,7 @@ public void testToString() {
"atomicWorkPath=null, logPath=null, sourceFileListing=abc, " + "atomicWorkPath=null, logPath=null, sourceFileListing=abc, " +
"sourcePaths=null, targetPath=xyz, filtersFile='null', " + "sourcePaths=null, targetPath=xyz, filtersFile='null', " +
"blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " + "blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " +
"directWrite=false, useiterator=false}"; "directWrite=false, useiterator=false, updateRoot=false}";
String optionString = option.toString(); String optionString = option.toString();
Assert.assertEquals(val, optionString); Assert.assertEquals(val, optionString);
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
@ -563,4 +563,15 @@ public void testAppendToConf() {
"otherwise it may not be fetched properly", "otherwise it may not be fetched properly",
expectedValForEmptyConfigKey, config.get("")); expectedValForEmptyConfigKey, config.get(""));
} }
@Test
public void testUpdateRoot() {
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(
new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"))
.withUpdateRoot(true)
.build();
Assert.assertTrue(options.shouldUpdateRoot());
}
} }

View File

@ -21,6 +21,7 @@
import static org.apache.hadoop.test.GenericTestUtils.getMethodName; import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -44,6 +45,8 @@
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.util.DistCpTestUtils;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -551,4 +554,44 @@ public void testSourceRoot() throws Exception {
String[] args2 = new String[]{rootStr, tgtStr2}; String[] args2 = new String[]{rootStr, tgtStr2};
Assert.assertThat(ToolRunner.run(conf, new DistCp(), args2), is(0)); Assert.assertThat(ToolRunner.run(conf, new DistCp(), args2), is(0));
} }
@Test
public void testUpdateRoot() throws Exception {
FileSystem fs = cluster.getFileSystem();
Path source = new Path("/src");
Path dest1 = new Path("/dest1");
Path dest2 = new Path("/dest2");
fs.delete(source, true);
fs.delete(dest1, true);
fs.delete(dest2, true);
// Create a source dir
fs.mkdirs(source);
fs.setOwner(source, "userA", "groupA");
fs.setTimes(source, new Random().nextLong(), new Random().nextLong());
GenericTestUtils.createFiles(fs, source, 3, 5, 5);
// should not preserve attrs
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
dest1.toString(), "-p -update", conf);
FileStatus srcStatus = fs.getFileStatus(source);
FileStatus destStatus1 = fs.getFileStatus(dest1);
assertNotEquals(srcStatus.getOwner(), destStatus1.getOwner());
assertNotEquals(srcStatus.getModificationTime(),
destStatus1.getModificationTime());
// should preserve attrs
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
dest2.toString(), "-p -update -updateRoot",
conf);
FileStatus destStatus2 = fs.getFileStatus(dest2);
assertEquals(srcStatus.getOwner(), destStatus2.getOwner());
assertEquals(srcStatus.getModificationTime(),
destStatus2.getModificationTime());
}
} }

View File

@ -804,4 +804,18 @@ public void testExclusionsOption() {
"hdfs://localhost:8020/target/"}); "hdfs://localhost:8020/target/"});
assertThat(options.getFiltersFile()).isEqualTo("/tmp/filters.txt"); assertThat(options.getFiltersFile()).isEqualTo("/tmp/filters.txt");
} }
@Test
public void testParseUpdateRoot() {
DistCpOptions options = OptionsParser.parse(new String[] {
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertFalse(options.shouldUpdateRoot());
options = OptionsParser.parse(new String[] {
"-updateRoot",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertTrue(options.shouldUpdateRoot());
}
} }