MAPREDUCE-5809. Enhance distcp to support preserving HDFS ACLs. Contributed by Chris Nauroth.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1595283 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c9377189a0
commit
11be7334c4
@ -99,6 +99,21 @@ public FileStatus(long length, boolean isdir,
|
||||
assert (isdir && symlink == null) || !isdir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy constructor.
|
||||
*
|
||||
* @param other FileStatus to copy
|
||||
*/
|
||||
public FileStatus(FileStatus other) throws IOException {
|
||||
// It's important to call the getters here instead of directly accessing the
|
||||
// members. Subclasses like ViewFsFileStatus can override the getters.
|
||||
this(other.getLen(), other.isDirectory(), other.getReplication(),
|
||||
other.getBlockSize(), other.getModificationTime(), other.getAccessTime(),
|
||||
other.getPermission(), other.getOwner(), other.getGroup(),
|
||||
(other.isSymlink() ? other.getSymlink() : null),
|
||||
other.getPath());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the length of this file, in bytes.
|
||||
* @return the length of this file, in bytes.
|
||||
|
@ -0,0 +1,134 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.permission;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* AclUtil contains utility methods for manipulating ACLs.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
||||
@InterfaceStability.Unstable
|
||||
public final class AclUtil {
|
||||
|
||||
/**
|
||||
* Given permissions and extended ACL entries, returns the full logical ACL.
|
||||
*
|
||||
* @param perm FsPermission containing permissions
|
||||
* @param entries List<AclEntry> containing extended ACL entries
|
||||
* @return List<AclEntry> containing full logical ACL
|
||||
*/
|
||||
public static List<AclEntry> getAclFromPermAndEntries(FsPermission perm,
|
||||
List<AclEntry> entries) {
|
||||
List<AclEntry> acl = Lists.newArrayListWithCapacity(entries.size() + 3);
|
||||
|
||||
// Owner entry implied by owner permission bits.
|
||||
acl.add(new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.USER)
|
||||
.setPermission(perm.getUserAction())
|
||||
.build());
|
||||
|
||||
// All extended access ACL entries.
|
||||
boolean hasAccessAcl = false;
|
||||
Iterator<AclEntry> entryIter = entries.iterator();
|
||||
AclEntry curEntry = null;
|
||||
while (entryIter.hasNext()) {
|
||||
curEntry = entryIter.next();
|
||||
if (curEntry.getScope() == AclEntryScope.DEFAULT) {
|
||||
break;
|
||||
}
|
||||
hasAccessAcl = true;
|
||||
acl.add(curEntry);
|
||||
}
|
||||
|
||||
// Mask entry implied by group permission bits, or group entry if there is
|
||||
// no access ACL (only default ACL).
|
||||
acl.add(new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(hasAccessAcl ? AclEntryType.MASK : AclEntryType.GROUP)
|
||||
.setPermission(perm.getGroupAction())
|
||||
.build());
|
||||
|
||||
// Other entry implied by other bits.
|
||||
acl.add(new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.OTHER)
|
||||
.setPermission(perm.getOtherAction())
|
||||
.build());
|
||||
|
||||
// Default ACL entries.
|
||||
if (curEntry != null && curEntry.getScope() == AclEntryScope.DEFAULT) {
|
||||
acl.add(curEntry);
|
||||
while (entryIter.hasNext()) {
|
||||
acl.add(entryIter.next());
|
||||
}
|
||||
}
|
||||
|
||||
return acl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Translates the given permission bits to the equivalent minimal ACL.
|
||||
*
|
||||
* @param perm FsPermission to translate
|
||||
* @return List<AclEntry> containing exactly 3 entries representing the owner,
|
||||
* group and other permissions
|
||||
*/
|
||||
public static List<AclEntry> getMinimalAcl(FsPermission perm) {
|
||||
return Lists.newArrayList(
|
||||
new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.USER)
|
||||
.setPermission(perm.getUserAction())
|
||||
.build(),
|
||||
new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.GROUP)
|
||||
.setPermission(perm.getGroupAction())
|
||||
.build(),
|
||||
new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.OTHER)
|
||||
.setPermission(perm.getOtherAction())
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given entries represent a minimal ACL (contains exactly 3
|
||||
* entries).
|
||||
*
|
||||
* @param entries List<AclEntry> entries to check
|
||||
* @return boolean true if the entries represent a minimal ACL
|
||||
*/
|
||||
public static boolean isMinimalAcl(List<AclEntry> entries) {
|
||||
return entries.size() == 3;
|
||||
}
|
||||
|
||||
/**
|
||||
* There is no reason to instantiate this class.
|
||||
*/
|
||||
private AclUtil() {
|
||||
}
|
||||
}
|
@ -15,12 +15,13 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
package org.apache.hadoop.fs.permission;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclEntryScope;
|
||||
|
||||
@ -28,8 +29,9 @@
|
||||
* Groups a list of ACL entries into separate lists for access entries vs.
|
||||
* default entries.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
final class ScopedAclEntries {
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
||||
@InterfaceStability.Unstable
|
||||
public final class ScopedAclEntries {
|
||||
private static final int PIVOT_NOT_FOUND = -1;
|
||||
|
||||
private final List<AclEntry> accessEntries;
|
@ -18,7 +18,7 @@
|
||||
package org.apache.hadoop.fs.shell;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
@ -31,8 +31,10 @@
|
||||
import org.apache.hadoop.fs.permission.AclEntryScope;
|
||||
import org.apache.hadoop.fs.permission.AclEntryType;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.AclUtil;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.ScopedAclEntries;
|
||||
|
||||
/**
|
||||
* Acl related operations
|
||||
@ -84,67 +86,34 @@ protected void processPath(PathData item) throws IOException {
|
||||
(perm.getOtherAction().implies(FsAction.EXECUTE) ? "t" : "T"));
|
||||
}
|
||||
|
||||
if (perm.getAclBit()) {
|
||||
AclStatus aclStatus = item.fs.getAclStatus(item.path);
|
||||
List<AclEntry> entries = aclStatus.getEntries();
|
||||
printExtendedAcl(perm, entries);
|
||||
} else {
|
||||
printMinimalAcl(perm);
|
||||
}
|
||||
|
||||
List<AclEntry> entries = perm.getAclBit() ?
|
||||
item.fs.getAclStatus(item.path).getEntries() :
|
||||
Collections.<AclEntry>emptyList();
|
||||
ScopedAclEntries scopedEntries = new ScopedAclEntries(
|
||||
AclUtil.getAclFromPermAndEntries(perm, entries));
|
||||
printAclEntriesForSingleScope(scopedEntries.getAccessEntries());
|
||||
printAclEntriesForSingleScope(scopedEntries.getDefaultEntries());
|
||||
out.println();
|
||||
}
|
||||
|
||||
/**
|
||||
* Prints an extended ACL, including all extended ACL entries and also the
|
||||
* base entries implied by the permission bits.
|
||||
* Prints all the ACL entries in a single scope.
|
||||
*
|
||||
* @param perm FsPermission of file
|
||||
* @param entries List<AclEntry> containing ACL entries of file
|
||||
*/
|
||||
private void printExtendedAcl(FsPermission perm, List<AclEntry> entries) {
|
||||
// Print owner entry implied by owner permission bits.
|
||||
out.println(new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.USER)
|
||||
.setPermission(perm.getUserAction())
|
||||
.build());
|
||||
|
||||
// Print all extended access ACL entries.
|
||||
boolean hasAccessAcl = false;
|
||||
Iterator<AclEntry> entryIter = entries.iterator();
|
||||
AclEntry curEntry = null;
|
||||
while (entryIter.hasNext()) {
|
||||
curEntry = entryIter.next();
|
||||
if (curEntry.getScope() == AclEntryScope.DEFAULT) {
|
||||
break;
|
||||
}
|
||||
hasAccessAcl = true;
|
||||
printExtendedAclEntry(curEntry, perm.getGroupAction());
|
||||
private void printAclEntriesForSingleScope(List<AclEntry> entries) {
|
||||
if (entries.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Print mask entry implied by group permission bits, or print group entry
|
||||
// if there is no access ACL (only default ACL).
|
||||
out.println(new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(hasAccessAcl ? AclEntryType.MASK : AclEntryType.GROUP)
|
||||
.setPermission(perm.getGroupAction())
|
||||
.build());
|
||||
|
||||
// Print other entry implied by other bits.
|
||||
out.println(new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.OTHER)
|
||||
.setPermission(perm.getOtherAction())
|
||||
.build());
|
||||
|
||||
// Print default ACL entries.
|
||||
if (curEntry != null && curEntry.getScope() == AclEntryScope.DEFAULT) {
|
||||
out.println(curEntry);
|
||||
// ACL sort order guarantees default mask is the second-to-last entry.
|
||||
if (AclUtil.isMinimalAcl(entries)) {
|
||||
for (AclEntry entry: entries) {
|
||||
out.println(entry);
|
||||
}
|
||||
} else {
|
||||
// ACL sort order guarantees mask is the second-to-last entry.
|
||||
FsAction maskPerm = entries.get(entries.size() - 2).getPermission();
|
||||
while (entryIter.hasNext()) {
|
||||
printExtendedAclEntry(entryIter.next(), maskPerm);
|
||||
for (AclEntry entry: entries) {
|
||||
printExtendedAclEntry(entry, maskPerm);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -172,30 +141,6 @@ private void printExtendedAclEntry(AclEntry entry, FsAction maskPerm) {
|
||||
out.println(entry);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prints a minimal ACL, consisting of exactly 3 ACL entries implied by the
|
||||
* permission bits.
|
||||
*
|
||||
* @param perm FsPermission of file
|
||||
*/
|
||||
private void printMinimalAcl(FsPermission perm) {
|
||||
out.println(new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.USER)
|
||||
.setPermission(perm.getUserAction())
|
||||
.build());
|
||||
out.println(new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.GROUP)
|
||||
.setPermission(perm.getGroupAction())
|
||||
.build());
|
||||
out.println(new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.OTHER)
|
||||
.setPermission(perm.getOtherAction())
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -27,8 +27,10 @@
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclEntryScope;
|
||||
import org.apache.hadoop.fs.permission.AclEntryType;
|
||||
import org.apache.hadoop.fs.permission.AclUtil;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.ScopedAclEntries;
|
||||
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
|
||||
@ -90,7 +92,7 @@ public static void copyINodeDefaultAcl(INode child) {
|
||||
FsPermission childPerm = child.getFsPermission();
|
||||
|
||||
// Copy each default ACL entry from parent to new child's access ACL.
|
||||
boolean parentDefaultIsMinimal = isMinimalAcl(parentDefaultEntries);
|
||||
boolean parentDefaultIsMinimal = AclUtil.isMinimalAcl(parentDefaultEntries);
|
||||
for (AclEntry entry: parentDefaultEntries) {
|
||||
AclEntryType type = entry.getType();
|
||||
String name = entry.getName();
|
||||
@ -127,7 +129,7 @@ public static void copyINodeDefaultAcl(INode child) {
|
||||
Collections.<AclEntry>emptyList();
|
||||
|
||||
final FsPermission newPerm;
|
||||
if (!isMinimalAcl(accessEntries) || !defaultEntries.isEmpty()) {
|
||||
if (!AclUtil.isMinimalAcl(accessEntries) || !defaultEntries.isEmpty()) {
|
||||
// Save the new ACL to the child.
|
||||
child.addAclFeature(createAclFeature(accessEntries, defaultEntries));
|
||||
newPerm = createFsPermissionForExtendedAcl(accessEntries, childPerm);
|
||||
@ -172,7 +174,7 @@ public static List<AclEntry> readINodeLogicalAcl(INode inode) {
|
||||
FsPermission perm = inode.getFsPermission();
|
||||
AclFeature f = inode.getAclFeature();
|
||||
if (f == null) {
|
||||
return getMinimalAcl(perm);
|
||||
return AclUtil.getMinimalAcl(perm);
|
||||
}
|
||||
|
||||
final List<AclEntry> existingAcl;
|
||||
@ -208,7 +210,7 @@ public static List<AclEntry> readINodeLogicalAcl(INode inode) {
|
||||
} else {
|
||||
// It's possible that there is a default ACL but no access ACL. In this
|
||||
// case, add the minimal access ACL implied by the permission bits.
|
||||
existingAcl.addAll(getMinimalAcl(perm));
|
||||
existingAcl.addAll(AclUtil.getMinimalAcl(perm));
|
||||
}
|
||||
|
||||
// Add all default entries after the access entries.
|
||||
@ -267,7 +269,7 @@ public static void updateINodeAcl(INode inode, List<AclEntry> newAcl,
|
||||
assert newAcl.size() >= 3;
|
||||
FsPermission perm = inode.getFsPermission();
|
||||
final FsPermission newPerm;
|
||||
if (!isMinimalAcl(newAcl)) {
|
||||
if (!AclUtil.isMinimalAcl(newAcl)) {
|
||||
// This is an extended ACL. Split entries into access vs. default.
|
||||
ScopedAclEntries scoped = new ScopedAclEntries(newAcl);
|
||||
List<AclEntry> accessEntries = scoped.getAccessEntries();
|
||||
@ -321,7 +323,7 @@ private static AclFeature createAclFeature(List<AclEntry> accessEntries,
|
||||
// For the access ACL, the feature only needs to hold the named user and
|
||||
// group entries. For a correctly sorted ACL, these will be in a
|
||||
// predictable range.
|
||||
if (!isMinimalAcl(accessEntries)) {
|
||||
if (!AclUtil.isMinimalAcl(accessEntries)) {
|
||||
featureEntries.addAll(
|
||||
accessEntries.subList(1, accessEntries.size() - 2));
|
||||
}
|
||||
@ -366,41 +368,4 @@ private static FsPermission createFsPermissionForMinimalAcl(
|
||||
accessEntries.get(2).getPermission(),
|
||||
existingPerm.getStickyBit());
|
||||
}
|
||||
|
||||
/**
|
||||
* Translates the given permission bits to the equivalent minimal ACL.
|
||||
*
|
||||
* @param perm FsPermission to translate
|
||||
* @return List<AclEntry> containing exactly 3 entries representing the owner,
|
||||
* group and other permissions
|
||||
*/
|
||||
private static List<AclEntry> getMinimalAcl(FsPermission perm) {
|
||||
return Lists.newArrayList(
|
||||
new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.USER)
|
||||
.setPermission(perm.getUserAction())
|
||||
.build(),
|
||||
new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.GROUP)
|
||||
.setPermission(perm.getGroupAction())
|
||||
.build(),
|
||||
new AclEntry.Builder()
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setType(AclEntryType.OTHER)
|
||||
.setPermission(perm.getOtherAction())
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given entries represent a minimal ACL (contains exactly 3
|
||||
* entries).
|
||||
*
|
||||
* @param entries List<AclEntry> entries to check
|
||||
* @return boolean true if the entries represent a minimal ACL
|
||||
*/
|
||||
private static boolean isMinimalAcl(List<AclEntry> entries) {
|
||||
return entries.size() == 3;
|
||||
}
|
||||
}
|
||||
|
@ -40,6 +40,7 @@
|
||||
import org.apache.hadoop.fs.permission.AclEntryType;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.ScopedAclEntries;
|
||||
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||
|
||||
/**
|
||||
|
@ -197,6 +197,8 @@ Release 2.5.0 - UNRELEASED
|
||||
MAPREDUCE-5861. finishedSubMaps field in LocalContainerLauncher does not
|
||||
need to be volatile. (Tsuyoshi OZAWA via junping_du)
|
||||
|
||||
MAPREDUCE-5809. Enhance distcp to support preserving HDFS ACLs. (cnauroth)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -22,7 +22,6 @@
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
@ -31,11 +30,15 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.net.URI;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
/**
|
||||
* The CopyListing abstraction is responsible for how the list of
|
||||
* sources and targets is constructed, for DistCp's copy function.
|
||||
* The copy-listing should be a SequenceFile<Text, FileStatus>,
|
||||
* The copy-listing should be a SequenceFile<Text, CopyListingFileStatus>,
|
||||
* located at the path specified to buildListing(),
|
||||
* each entry being a pair of (Source relative path, source file status),
|
||||
* all the paths being fully qualified.
|
||||
@ -85,7 +88,7 @@ public final void buildListing(Path pathToListFile,
|
||||
config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, getBytesToCopy());
|
||||
config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths());
|
||||
|
||||
checkForDuplicates(pathToListFile);
|
||||
validateFinalListing(pathToListFile, options);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -124,13 +127,15 @@ protected abstract void doBuildListing(Path pathToListFile,
|
||||
protected abstract long getNumberOfPaths();
|
||||
|
||||
/**
|
||||
* Validate the final resulting path listing to see if there are any duplicate entries
|
||||
* Validate the final resulting path listing. Checks if there are duplicate
|
||||
* entries. If preserving ACLs, checks that file system can support ACLs.
|
||||
*
|
||||
* @param pathToListFile - path listing build by doBuildListing
|
||||
* @param options - Input options to distcp
|
||||
* @throws IOException - Any issues while checking for duplicates and throws
|
||||
* @throws DuplicateFileException - if there are duplicates
|
||||
*/
|
||||
private void checkForDuplicates(Path pathToListFile)
|
||||
private void validateFinalListing(Path pathToListFile, DistCpOptions options)
|
||||
throws DuplicateFileException, IOException {
|
||||
|
||||
Configuration config = getConf();
|
||||
@ -142,17 +147,26 @@ private void checkForDuplicates(Path pathToListFile)
|
||||
config, SequenceFile.Reader.file(sortedList));
|
||||
try {
|
||||
Text lastKey = new Text("*"); //source relative path can never hold *
|
||||
FileStatus lastFileStatus = new FileStatus();
|
||||
CopyListingFileStatus lastFileStatus = new CopyListingFileStatus();
|
||||
|
||||
Text currentKey = new Text();
|
||||
Set<URI> aclSupportCheckFsSet = Sets.newHashSet();
|
||||
while (reader.next(currentKey)) {
|
||||
if (currentKey.equals(lastKey)) {
|
||||
FileStatus currentFileStatus = new FileStatus();
|
||||
CopyListingFileStatus currentFileStatus = new CopyListingFileStatus();
|
||||
reader.getCurrentValue(currentFileStatus);
|
||||
throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " +
|
||||
currentFileStatus.getPath() + " would cause duplicates. Aborting");
|
||||
}
|
||||
reader.getCurrentValue(lastFileStatus);
|
||||
if (options.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
|
||||
FileSystem lastFs = lastFileStatus.getPath().getFileSystem(config);
|
||||
URI lastFsUri = lastFs.getUri();
|
||||
if (!aclSupportCheckFsSet.contains(lastFsUri)) {
|
||||
DistCpUtils.checkFileSystemAclSupport(lastFs);
|
||||
aclSupportCheckFsSet.add(lastFsUri);
|
||||
}
|
||||
}
|
||||
lastKey.set(currentKey);
|
||||
}
|
||||
} finally {
|
||||
@ -236,4 +250,10 @@ public InvalidInputException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
||||
public static class AclsNotSupportedException extends RuntimeException {
|
||||
public AclsNotSupportedException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,153 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclEntryType;
|
||||
import org.apache.hadoop.fs.permission.AclEntryScope;
|
||||
import org.apache.hadoop.fs.permission.AclUtil;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* CopyListingFileStatus is a specialized subclass of {@link FileStatus} for
|
||||
* attaching additional data members useful to distcp. This class does not
|
||||
* override {@link FileStatus#compareTo}, because the additional data members
|
||||
* are not relevant to sort order.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class CopyListingFileStatus extends FileStatus {
|
||||
|
||||
private static final byte NO_ACL_ENTRIES = -1;
|
||||
|
||||
// Retain static arrays of enum values to prevent repeated allocation of new
|
||||
// arrays during deserialization.
|
||||
private static final AclEntryType[] ACL_ENTRY_TYPES = AclEntryType.values();
|
||||
private static final AclEntryScope[] ACL_ENTRY_SCOPES = AclEntryScope.values();
|
||||
private static final FsAction[] FS_ACTIONS = FsAction.values();
|
||||
|
||||
private List<AclEntry> aclEntries;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public CopyListingFileStatus() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new CopyListingFileStatus by copying the members of the given
|
||||
* FileStatus.
|
||||
*
|
||||
* @param fileStatus FileStatus to copy
|
||||
*/
|
||||
public CopyListingFileStatus(FileStatus fileStatus) throws IOException {
|
||||
super(fileStatus);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the full logical ACL.
|
||||
*
|
||||
* @return List<AclEntry> containing full logical ACL
|
||||
*/
|
||||
public List<AclEntry> getAclEntries() {
|
||||
return AclUtil.getAclFromPermAndEntries(getPermission(),
|
||||
aclEntries != null ? aclEntries : Collections.<AclEntry>emptyList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets optional ACL entries.
|
||||
*
|
||||
* @param aclEntries List<AclEntry> containing all ACL entries
|
||||
*/
|
||||
public void setAclEntries(List<AclEntry> aclEntries) {
|
||||
this.aclEntries = aclEntries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
super.write(out);
|
||||
if (aclEntries != null) {
|
||||
// byte is sufficient, because 32 ACL entries is the max enforced by HDFS.
|
||||
out.writeByte(aclEntries.size());
|
||||
for (AclEntry entry: aclEntries) {
|
||||
out.writeByte(entry.getScope().ordinal());
|
||||
out.writeByte(entry.getType().ordinal());
|
||||
WritableUtils.writeString(out, entry.getName());
|
||||
out.writeByte(entry.getPermission().ordinal());
|
||||
}
|
||||
} else {
|
||||
out.writeByte(NO_ACL_ENTRIES);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
super.readFields(in);
|
||||
byte aclEntriesSize = in.readByte();
|
||||
if (aclEntriesSize != NO_ACL_ENTRIES) {
|
||||
aclEntries = Lists.newArrayListWithCapacity(aclEntriesSize);
|
||||
for (int i = 0; i < aclEntriesSize; ++i) {
|
||||
aclEntries.add(new AclEntry.Builder()
|
||||
.setScope(ACL_ENTRY_SCOPES[in.readByte()])
|
||||
.setType(ACL_ENTRY_TYPES[in.readByte()])
|
||||
.setName(WritableUtils.readString(in))
|
||||
.setPermission(FS_ACTIONS[in.readByte()])
|
||||
.build());
|
||||
}
|
||||
} else {
|
||||
aclEntries = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (!super.equals(o)) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
CopyListingFileStatus other = (CopyListingFileStatus)o;
|
||||
return Objects.equal(aclEntries, other.aclEntries);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(super.hashCode(), aclEntries);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder(super.toString());
|
||||
sb.append('{');
|
||||
sb.append("aclEntries = " + aclEntries);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
@ -125,6 +125,9 @@ public int run(String[] argv) {
|
||||
} catch (DuplicateFileException e) {
|
||||
LOG.error("Duplicate files in input path: ", e);
|
||||
return DistCpConstants.DUPLICATE_INPUT;
|
||||
} catch (AclsNotSupportedException e) {
|
||||
LOG.error("ACLs not supported on at least one file system: ", e);
|
||||
return DistCpConstants.ACLS_NOT_SUPPORTED;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception encountered ", e);
|
||||
return DistCpConstants.UNKNOWN_ERROR;
|
||||
@ -298,7 +301,9 @@ private void configureOutputFormat(Job job) throws IOException {
|
||||
FileSystem targetFS = targetPath.getFileSystem(configuration);
|
||||
targetPath = targetPath.makeQualified(targetFS.getUri(),
|
||||
targetFS.getWorkingDirectory());
|
||||
|
||||
if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
|
||||
DistCpUtils.checkFileSystemAclSupport(targetFS);
|
||||
}
|
||||
if (inputOptions.shouldAtomicCommit()) {
|
||||
Path workDir = inputOptions.getAtomicWorkPath();
|
||||
if (workDir == null) {
|
||||
|
@ -115,6 +115,7 @@ public class DistCpConstants {
|
||||
public static final int SUCCESS = 0;
|
||||
public static final int INVALID_ARGUMENT = -1;
|
||||
public static final int DUPLICATE_INPUT = -2;
|
||||
public static final int ACLS_NOT_SUPPORTED = -3;
|
||||
public static final int UNKNOWN_ERROR = -999;
|
||||
|
||||
/**
|
||||
|
@ -45,8 +45,10 @@ public enum DistCpOptionSwitch {
|
||||
*
|
||||
*/
|
||||
PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
|
||||
new Option("p", true, "preserve status (rbugpc)" +
|
||||
"(replication, block-size, user, group, permission, checksum-type)")),
|
||||
new Option("p", true, "preserve status (rbugpca)(replication, " +
|
||||
"block-size, user, group, permission, checksum-type, ACL). If " +
|
||||
"-p is specified with no <arg>, then preserves replication, block " +
|
||||
"size, user, group, permission and checksum type.")),
|
||||
|
||||
/**
|
||||
* Update target location by copying only files that are missing
|
||||
|
@ -65,7 +65,7 @@ public class DistCpOptions {
|
||||
private boolean targetPathExists = true;
|
||||
|
||||
public static enum FileAttribute{
|
||||
REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE;
|
||||
REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL;
|
||||
|
||||
public static FileAttribute getAttribute(char symbol) {
|
||||
for (FileAttribute attribute : values()) {
|
||||
|
@ -23,11 +23,12 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
||||
import org.apache.hadoop.tools.util.DistCpUtils;
|
||||
import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
@ -35,6 +36,7 @@
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.List;
|
||||
import java.util.Stack;
|
||||
|
||||
/**
|
||||
@ -139,28 +141,34 @@ public void doBuildListing(SequenceFile.Writer fileListWriter,
|
||||
|
||||
FileStatus rootStatus = sourceFS.getFileStatus(path);
|
||||
Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
|
||||
boolean localFile = (rootStatus.getClass() != FileStatus.class);
|
||||
|
||||
FileStatus[] sourceFiles = sourceFS.listStatus(path);
|
||||
boolean explore = (sourceFiles != null && sourceFiles.length > 0);
|
||||
if (!explore || rootStatus.isDirectory()) {
|
||||
writeToFileListingRoot(fileListWriter, rootStatus, sourcePathRoot,
|
||||
localFile, options);
|
||||
CopyListingFileStatus rootCopyListingStatus =
|
||||
DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus,
|
||||
options.shouldPreserve(FileAttribute.ACL));
|
||||
writeToFileListingRoot(fileListWriter, rootCopyListingStatus,
|
||||
sourcePathRoot, options);
|
||||
}
|
||||
if (explore) {
|
||||
for (FileStatus sourceStatus: sourceFiles) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
|
||||
}
|
||||
writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot,
|
||||
localFile, options);
|
||||
CopyListingFileStatus sourceCopyListingStatus =
|
||||
DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
|
||||
options.shouldPreserve(FileAttribute.ACL) &&
|
||||
sourceStatus.isDirectory());
|
||||
writeToFileListing(fileListWriter, sourceCopyListingStatus,
|
||||
sourcePathRoot, options);
|
||||
|
||||
if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Traversing non-empty source dir: " + sourceStatus.getPath());
|
||||
}
|
||||
traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot,
|
||||
localFile, options);
|
||||
options);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -233,7 +241,7 @@ private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
|
||||
return SequenceFile.createWriter(getConf(),
|
||||
SequenceFile.Writer.file(pathToListFile),
|
||||
SequenceFile.Writer.keyClass(Text.class),
|
||||
SequenceFile.Writer.valueClass(FileStatus.class),
|
||||
SequenceFile.Writer.valueClass(CopyListingFileStatus.class),
|
||||
SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
|
||||
}
|
||||
|
||||
@ -250,7 +258,6 @@ private static FileStatus[] getChildren(FileSystem fileSystem,
|
||||
private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
|
||||
FileStatus sourceStatus,
|
||||
Path sourcePathRoot,
|
||||
boolean localFile,
|
||||
DistCpOptions options)
|
||||
throws IOException {
|
||||
FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
|
||||
@ -262,8 +269,11 @@ private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Recording source-path: "
|
||||
+ sourceStatus.getPath() + " for copy.");
|
||||
writeToFileListing(fileListWriter, child, sourcePathRoot,
|
||||
localFile, options);
|
||||
CopyListingFileStatus childCopyListingStatus =
|
||||
DistCpUtils.toCopyListingFileStatus(sourceFS, child,
|
||||
options.shouldPreserve(FileAttribute.ACL) && child.isDirectory());
|
||||
writeToFileListing(fileListWriter, childCopyListingStatus,
|
||||
sourcePathRoot, options);
|
||||
if (isDirectoryAndNotEmpty(sourceFS, child)) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Traversing non-empty source dir: "
|
||||
@ -275,8 +285,7 @@ private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
|
||||
}
|
||||
|
||||
private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
|
||||
FileStatus fileStatus, Path sourcePathRoot,
|
||||
boolean localFile,
|
||||
CopyListingFileStatus fileStatus, Path sourcePathRoot,
|
||||
DistCpOptions options) throws IOException {
|
||||
boolean syncOrOverwrite = options.shouldSyncFolder() ||
|
||||
options.shouldOverwrite();
|
||||
@ -288,14 +297,12 @@ private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
|
||||
}
|
||||
return;
|
||||
}
|
||||
writeToFileListing(fileListWriter, fileStatus, sourcePathRoot, localFile,
|
||||
options);
|
||||
writeToFileListing(fileListWriter, fileStatus, sourcePathRoot, options);
|
||||
}
|
||||
|
||||
private void writeToFileListing(SequenceFile.Writer fileListWriter,
|
||||
FileStatus fileStatus,
|
||||
CopyListingFileStatus fileStatus,
|
||||
Path sourcePathRoot,
|
||||
boolean localFile,
|
||||
DistCpOptions options) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
|
||||
@ -303,9 +310,6 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
|
||||
}
|
||||
|
||||
FileStatus status = fileStatus;
|
||||
if (localFile) {
|
||||
status = getFileStatus(fileStatus);
|
||||
}
|
||||
|
||||
if (!shouldCopy(fileStatus.getPath(), options)) {
|
||||
return;
|
||||
@ -320,19 +324,4 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
|
||||
}
|
||||
totalPaths++;
|
||||
}
|
||||
|
||||
private static final ByteArrayOutputStream buffer = new ByteArrayOutputStream(64);
|
||||
private DataInputBuffer in = new DataInputBuffer();
|
||||
|
||||
private FileStatus getFileStatus(FileStatus fileStatus) throws IOException {
|
||||
FileStatus status = new FileStatus();
|
||||
|
||||
buffer.reset();
|
||||
DataOutputStream out = new DataOutputStream(buffer);
|
||||
fileStatus.write(out);
|
||||
|
||||
in.reset(buffer.toByteArray(), 0, buffer.size());
|
||||
status.readFields(in);
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
@ -178,7 +178,7 @@ private void preserveFileAttributesForDirectories(Configuration conf) throws IOE
|
||||
|
||||
long preservedEntries = 0;
|
||||
try {
|
||||
FileStatus srcFileStatus = new FileStatus();
|
||||
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
|
||||
Text srcRelPath = new Text();
|
||||
|
||||
// Iterate over every source path that was copied.
|
||||
@ -246,9 +246,9 @@ private void deleteMissing(Configuration conf) throws IOException {
|
||||
// Delete all from target that doesn't also exist on source.
|
||||
long deletedEntries = 0;
|
||||
try {
|
||||
FileStatus srcFileStatus = new FileStatus();
|
||||
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
|
||||
Text srcRelPath = new Text();
|
||||
FileStatus trgtFileStatus = new FileStatus();
|
||||
CopyListingFileStatus trgtFileStatus = new CopyListingFileStatus();
|
||||
Text trgtRelPath = new Text();
|
||||
|
||||
FileSystem targetFS = targetFinalPath.getFileSystem(conf);
|
||||
|
@ -24,9 +24,11 @@
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||
import org.apache.hadoop.tools.DistCpConstants;
|
||||
import org.apache.hadoop.tools.DistCpOptionSwitch;
|
||||
import org.apache.hadoop.tools.DistCpOptions;
|
||||
@ -37,12 +39,13 @@
|
||||
import java.io.*;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Mapper class that executes the DistCp copy operation.
|
||||
* Implements the o.a.h.mapreduce.Mapper<> interface.
|
||||
*/
|
||||
public class CopyMapper extends Mapper<Text, FileStatus, Text, Text> {
|
||||
public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text> {
|
||||
|
||||
/**
|
||||
* Hadoop counters for the DistCp CopyMapper.
|
||||
@ -172,8 +175,8 @@ private Path findCacheFile(Path[] cacheFiles, String fileName) {
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void map(Text relPath, FileStatus sourceFileStatus, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
|
||||
Context context) throws IOException, InterruptedException {
|
||||
Path sourcePath = sourceFileStatus.getPath();
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
@ -191,11 +194,13 @@ public void map(Text relPath, FileStatus sourceFileStatus, Context context)
|
||||
LOG.info(description);
|
||||
|
||||
try {
|
||||
FileStatus sourceCurrStatus;
|
||||
CopyListingFileStatus sourceCurrStatus;
|
||||
FileSystem sourceFS;
|
||||
try {
|
||||
sourceFS = sourcePath.getFileSystem(conf);
|
||||
sourceCurrStatus = sourceFS.getFileStatus(sourcePath);
|
||||
sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
|
||||
sourceFS.getFileStatus(sourcePath),
|
||||
fileAttributes.contains(FileAttribute.ACL));
|
||||
} catch (FileNotFoundException e) {
|
||||
throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
|
||||
}
|
||||
|
@ -23,11 +23,11 @@
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||
import org.apache.hadoop.tools.DistCpConstants;
|
||||
import org.apache.hadoop.tools.util.DistCpUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.mapreduce.*;
|
||||
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
|
||||
@ -44,7 +44,8 @@
|
||||
* that the total-number of bytes to be copied for each input split is
|
||||
* uniform.
|
||||
*/
|
||||
public class UniformSizeInputFormat extends InputFormat<Text, FileStatus> {
|
||||
public class UniformSizeInputFormat
|
||||
extends InputFormat<Text, CopyListingFileStatus> {
|
||||
private static final Log LOG
|
||||
= LogFactory.getLog(UniformSizeInputFormat.class);
|
||||
|
||||
@ -76,7 +77,7 @@ private List<InputSplit> getSplits(Configuration configuration, int numSplits,
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
|
||||
long nBytesPerSplit = (long) Math.ceil(totalSizeBytes * 1.0 / numSplits);
|
||||
|
||||
FileStatus srcFileStatus = new FileStatus();
|
||||
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
|
||||
Text srcRelPath = new Text();
|
||||
long currentSplitSize = 0;
|
||||
long lastSplitStart = 0;
|
||||
@ -161,9 +162,9 @@ private SequenceFile.Reader getListingFileReader(Configuration configuration) {
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Override
|
||||
public RecordReader<Text, FileStatus> createRecordReader(InputSplit split,
|
||||
TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
return new SequenceFileRecordReader<Text, FileStatus>();
|
||||
public RecordReader<Text, CopyListingFileStatus> createRecordReader(
|
||||
InputSplit split, TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||
import org.apache.hadoop.tools.DistCpConstants;
|
||||
import org.apache.hadoop.tools.util.DistCpUtils;
|
||||
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
|
||||
@ -90,7 +91,7 @@ private DynamicInputChunk(String chunkId, Configuration configuration)
|
||||
private void openForWrite() throws IOException {
|
||||
writer = SequenceFile.createWriter(
|
||||
chunkFilePath.getFileSystem(configuration), configuration,
|
||||
chunkFilePath, Text.class, FileStatus.class,
|
||||
chunkFilePath, Text.class, CopyListingFileStatus.class,
|
||||
SequenceFile.CompressionType.NONE);
|
||||
|
||||
}
|
||||
@ -117,7 +118,7 @@ public static DynamicInputChunk createChunkForWrite(String chunkId,
|
||||
* @param value Corresponding value from the listing file.
|
||||
* @throws IOException Exception onf failure to write to the file.
|
||||
*/
|
||||
public void write(Text key, FileStatus value) throws IOException {
|
||||
public void write(Text key, CopyListingFileStatus value) throws IOException {
|
||||
writer.append(key, value);
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
@ -133,7 +133,7 @@ private List<InputSplit> createSplits(JobContext jobContext,
|
||||
|
||||
List<DynamicInputChunk> chunksFinal = new ArrayList<DynamicInputChunk>();
|
||||
|
||||
FileStatus fileStatus = new FileStatus();
|
||||
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
|
||||
Text relPath = new Text();
|
||||
int recordCounter = 0;
|
||||
int chunkCount = 0;
|
||||
|
@ -25,15 +25,21 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclUtil;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
||||
import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
|
||||
import org.apache.hadoop.tools.CopyListing.AclsNotSupportedException;
|
||||
import org.apache.hadoop.tools.DistCpOptions;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.text.DecimalFormat;
|
||||
import java.net.URI;
|
||||
@ -181,7 +187,7 @@ public static EnumSet<FileAttribute> unpackAttributes(String attributes) {
|
||||
* change or any transient error)
|
||||
*/
|
||||
public static void preserve(FileSystem targetFS, Path path,
|
||||
FileStatus srcFileStatus,
|
||||
CopyListingFileStatus srcFileStatus,
|
||||
EnumSet<FileAttribute> attributes) throws IOException {
|
||||
|
||||
FileStatus targetFileStatus = targetFS.getFileStatus(path);
|
||||
@ -189,7 +195,18 @@ public static void preserve(FileSystem targetFS, Path path,
|
||||
String user = targetFileStatus.getOwner();
|
||||
boolean chown = false;
|
||||
|
||||
if (attributes.contains(FileAttribute.PERMISSION) &&
|
||||
if (attributes.contains(FileAttribute.ACL)) {
|
||||
List<AclEntry> srcAcl = srcFileStatus.getAclEntries();
|
||||
List<AclEntry> targetAcl = getAcl(targetFS, targetFileStatus);
|
||||
if (!srcAcl.equals(targetAcl)) {
|
||||
targetFS.setAcl(path, srcAcl);
|
||||
}
|
||||
// setAcl can't preserve sticky bit, so also call setPermission if needed.
|
||||
if (srcFileStatus.getPermission().getStickyBit() !=
|
||||
targetFileStatus.getPermission().getStickyBit()) {
|
||||
targetFS.setPermission(path, srcFileStatus.getPermission());
|
||||
}
|
||||
} else if (attributes.contains(FileAttribute.PERMISSION) &&
|
||||
!srcFileStatus.getPermission().equals(targetFileStatus.getPermission())) {
|
||||
targetFS.setPermission(path, srcFileStatus.getPermission());
|
||||
}
|
||||
@ -216,6 +233,46 @@ public static void preserve(FileSystem targetFS, Path path,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a file's full logical ACL.
|
||||
*
|
||||
* @param fileSystem FileSystem containing the file
|
||||
* @param fileStatus FileStatus of file
|
||||
* @return List<AclEntry> containing full logical ACL
|
||||
* @throws IOException if there is an I/O error
|
||||
*/
|
||||
public static List<AclEntry> getAcl(FileSystem fileSystem,
|
||||
FileStatus fileStatus) throws IOException {
|
||||
List<AclEntry> entries = fileSystem.getAclStatus(fileStatus.getPath())
|
||||
.getEntries();
|
||||
return AclUtil.getAclFromPermAndEntries(fileStatus.getPermission(), entries);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a FileStatus to a CopyListingFileStatus. If preserving ACLs,
|
||||
* populates the CopyListingFileStatus with the ACLs.
|
||||
*
|
||||
* @param fileSystem FileSystem containing the file
|
||||
* @param fileStatus FileStatus of file
|
||||
* @param preserveAcls boolean true if preserving ACLs
|
||||
* @throws IOException if there is an I/O error
|
||||
*/
|
||||
public static CopyListingFileStatus toCopyListingFileStatus(
|
||||
FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls)
|
||||
throws IOException {
|
||||
CopyListingFileStatus copyListingFileStatus =
|
||||
new CopyListingFileStatus(fileStatus);
|
||||
if (preserveAcls) {
|
||||
FsPermission perm = fileStatus.getPermission();
|
||||
if (perm.getAclBit()) {
|
||||
List<AclEntry> aclEntries = fileSystem.getAclStatus(
|
||||
fileStatus.getPath()).getEntries();
|
||||
copyListingFileStatus.setAclEntries(aclEntries);
|
||||
}
|
||||
}
|
||||
return copyListingFileStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort sequence file containing FileStatus and Text as key and value respecitvely
|
||||
*
|
||||
@ -227,7 +284,8 @@ public static void preserve(FileSystem targetFS, Path path,
|
||||
*/
|
||||
public static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing)
|
||||
throws IOException {
|
||||
SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, FileStatus.class, conf);
|
||||
SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class,
|
||||
CopyListingFileStatus.class, conf);
|
||||
Path output = new Path(sourceListing.toString() + "_sorted");
|
||||
|
||||
if (fs.exists(output)) {
|
||||
@ -238,6 +296,25 @@ public static Path sortListing(FileSystem fs, Configuration conf, Path sourceLis
|
||||
return output;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if a file system supports ACLs by running a canary getAclStatus
|
||||
* request on the file system root. This method is used before distcp job
|
||||
* submission to fail fast if the user requested preserving ACLs, but the file
|
||||
* system cannot support ACLs.
|
||||
*
|
||||
* @param fs FileSystem to check
|
||||
* @throws AclsNotSupportedException if fs does not support ACLs
|
||||
*/
|
||||
public static void checkFileSystemAclSupport(FileSystem fs)
|
||||
throws AclsNotSupportedException {
|
||||
try {
|
||||
fs.getAclStatus(new Path(Path.SEPARATOR));
|
||||
} catch (Exception e) {
|
||||
throw new AclsNotSupportedException("ACLs not supported for file system: "
|
||||
+ fs.getUri());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* String utility to convert a number-of-bytes to human readable format.
|
||||
*/
|
||||
|
@ -23,7 +23,6 @@
|
||||
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import java.util.List;
|
||||
@ -33,18 +32,19 @@
|
||||
public class StubContext {
|
||||
|
||||
private StubStatusReporter reporter = new StubStatusReporter();
|
||||
private RecordReader<Text, FileStatus> reader;
|
||||
private RecordReader<Text, CopyListingFileStatus> reader;
|
||||
private StubInMemoryWriter writer = new StubInMemoryWriter();
|
||||
private Mapper<Text, FileStatus, Text, Text>.Context mapperContext;
|
||||
private Mapper<Text, CopyListingFileStatus, Text, Text>.Context mapperContext;
|
||||
|
||||
public StubContext(Configuration conf, RecordReader<Text, FileStatus> reader,
|
||||
int taskId) throws IOException, InterruptedException {
|
||||
public StubContext(Configuration conf,
|
||||
RecordReader<Text, CopyListingFileStatus> reader, int taskId)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
WrappedMapper<Text, FileStatus, Text, Text> wrappedMapper
|
||||
= new WrappedMapper<Text, FileStatus, Text, Text>();
|
||||
WrappedMapper<Text, CopyListingFileStatus, Text, Text> wrappedMapper
|
||||
= new WrappedMapper<Text, CopyListingFileStatus, Text, Text>();
|
||||
|
||||
MapContextImpl<Text, FileStatus, Text, Text> contextImpl
|
||||
= new MapContextImpl<Text, FileStatus, Text, Text>(conf,
|
||||
MapContextImpl<Text, CopyListingFileStatus, Text, Text> contextImpl
|
||||
= new MapContextImpl<Text, CopyListingFileStatus, Text, Text>(conf,
|
||||
getTaskAttemptID(taskId), reader, writer,
|
||||
null, reporter, null);
|
||||
|
||||
@ -52,7 +52,7 @@ public StubContext(Configuration conf, RecordReader<Text, FileStatus> reader,
|
||||
this.mapperContext = wrappedMapper.getMapContext(contextImpl);
|
||||
}
|
||||
|
||||
public Mapper<Text, FileStatus, Text, Text>.Context getContext() {
|
||||
public Mapper<Text, CopyListingFileStatus, Text, Text>.Context getContext() {
|
||||
return mapperContext;
|
||||
}
|
||||
|
||||
@ -60,7 +60,7 @@ public StatusReporter getReporter() {
|
||||
return reporter;
|
||||
}
|
||||
|
||||
public RecordReader<Text, FileStatus> getReader() {
|
||||
public RecordReader<Text, CopyListingFileStatus> getReader() {
|
||||
return reader;
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,6 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
|
||||
import org.apache.hadoop.tools.util.TestDistCpUtils;
|
||||
@ -106,7 +105,7 @@ protected boolean shouldCopy(Path path, DistCpOptions options) {
|
||||
Assert.assertEquals(listing.getNumberOfPaths(), 3);
|
||||
SequenceFile.Reader reader = new SequenceFile.Reader(getConf(),
|
||||
SequenceFile.Reader.file(listingFile));
|
||||
FileStatus fileStatus = new FileStatus();
|
||||
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
|
||||
Text relativePath = new Text();
|
||||
Assert.assertTrue(reader.next(relativePath, fileStatus));
|
||||
Assert.assertEquals(relativePath.toString(), "/1");
|
||||
@ -274,7 +273,7 @@ public void testBuildListingForSingleFile() {
|
||||
|
||||
reader = new SequenceFile.Reader(getConf(), SequenceFile.Reader.file(listFile));
|
||||
|
||||
FileStatus fileStatus = new FileStatus();
|
||||
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
|
||||
Text relativePath = new Text();
|
||||
Assert.assertTrue(reader.next(relativePath, fileStatus));
|
||||
Assert.assertTrue(relativePath.toString().equals(""));
|
||||
|
@ -0,0 +1,329 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import static org.apache.hadoop.fs.permission.AclEntryScope.*;
|
||||
import static org.apache.hadoop.fs.permission.AclEntryType.*;
|
||||
import static org.apache.hadoop.fs.permission.FsAction.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclEntryScope;
|
||||
import org.apache.hadoop.fs.permission.AclEntryType;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests distcp in combination with HDFS ACLs.
|
||||
*/
|
||||
public class TestDistCpWithAcls {
|
||||
|
||||
private static MiniDFSCluster cluster;
|
||||
private static Configuration conf;
|
||||
private static FileSystem fs;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
initCluster(true, true);
|
||||
// Create this directory structure:
|
||||
// /src
|
||||
// /dir1
|
||||
// /subdir1
|
||||
// /dir2
|
||||
// /dir2/file2
|
||||
// /dir2/file3
|
||||
// /dir3sticky
|
||||
// /file1
|
||||
fs.mkdirs(new Path("/src/dir1/subdir1"));
|
||||
fs.mkdirs(new Path("/src/dir2"));
|
||||
fs.create(new Path("/src/dir2/file2")).close();
|
||||
fs.create(new Path("/src/dir2/file3")).close();
|
||||
fs.mkdirs(new Path("/src/dir3sticky"));
|
||||
fs.create(new Path("/src/file1")).close();
|
||||
|
||||
// Set a mix of ACLs and plain permissions throughout the tree.
|
||||
fs.modifyAclEntries(new Path("/src/dir1"), Arrays.asList(
|
||||
aclEntry(DEFAULT, USER, "bruce", ALL)));
|
||||
|
||||
fs.modifyAclEntries(new Path("/src/dir2/file2"), Arrays.asList(
|
||||
aclEntry(ACCESS, GROUP, "sales", NONE)));
|
||||
|
||||
fs.setPermission(new Path("/src/dir2/file3"),
|
||||
new FsPermission((short)0660));
|
||||
|
||||
fs.modifyAclEntries(new Path("/src/file1"), Arrays.asList(
|
||||
aclEntry(ACCESS, USER, "diana", READ)));
|
||||
|
||||
fs.setPermission(new Path("/src/dir3sticky"),
|
||||
new FsPermission((short)01777));
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdown() {
|
||||
IOUtils.cleanup(null, fs);
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreserveAcls() throws Exception {
|
||||
assertRunDistCp(DistCpConstants.SUCCESS, "/dstPreserveAcls");
|
||||
|
||||
assertAclEntries("/dstPreserveAcls/dir1", new AclEntry[] {
|
||||
aclEntry(DEFAULT, USER, ALL),
|
||||
aclEntry(DEFAULT, USER, "bruce", ALL),
|
||||
aclEntry(DEFAULT, GROUP, READ_EXECUTE),
|
||||
aclEntry(DEFAULT, MASK, ALL),
|
||||
aclEntry(DEFAULT, OTHER, READ_EXECUTE) } );
|
||||
assertPermission("/dstPreserveAcls/dir1", (short)0755);
|
||||
|
||||
assertAclEntries("/dstPreserveAcls/dir1/subdir1", new AclEntry[] { });
|
||||
assertPermission("/dstPreserveAcls/dir1/subdir1", (short)0755);
|
||||
|
||||
assertAclEntries("/dstPreserveAcls/dir2", new AclEntry[] { });
|
||||
assertPermission("/dstPreserveAcls/dir2", (short)0755);
|
||||
|
||||
assertAclEntries("/dstPreserveAcls/dir2/file2", new AclEntry[] {
|
||||
aclEntry(ACCESS, GROUP, READ),
|
||||
aclEntry(ACCESS, GROUP, "sales", NONE) } );
|
||||
assertPermission("/dstPreserveAcls/dir2/file2", (short)0644);
|
||||
|
||||
assertAclEntries("/dstPreserveAcls/dir2/file3", new AclEntry[] { });
|
||||
assertPermission("/dstPreserveAcls/dir2/file3", (short)0660);
|
||||
|
||||
assertAclEntries("/dstPreserveAcls/dir3sticky", new AclEntry[] { });
|
||||
assertPermission("/dstPreserveAcls/dir3sticky", (short)01777);
|
||||
|
||||
assertAclEntries("/dstPreserveAcls/file1", new AclEntry[] {
|
||||
aclEntry(ACCESS, USER, "diana", READ),
|
||||
aclEntry(ACCESS, GROUP, READ) } );
|
||||
assertPermission("/dstPreserveAcls/file1", (short)0644);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAclsNotEnabled() throws Exception {
|
||||
try {
|
||||
restart(false);
|
||||
assertRunDistCp(DistCpConstants.ACLS_NOT_SUPPORTED, "/dstAclsNotEnabled");
|
||||
} finally {
|
||||
restart(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAclsNotImplemented() throws Exception {
|
||||
assertRunDistCp(DistCpConstants.ACLS_NOT_SUPPORTED,
|
||||
"stubfs://dstAclsNotImplemented");
|
||||
}
|
||||
|
||||
/**
|
||||
* Stub FileSystem implementation used for testing the case of attempting
|
||||
* distcp with ACLs preserved on a file system that does not support ACLs.
|
||||
* The base class implementation throws UnsupportedOperationException for the
|
||||
* ACL methods, so we don't need to override them.
|
||||
*/
|
||||
public static class StubFileSystem extends FileSystem {
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream append(Path f, int bufferSize,
|
||||
Progressable progress) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream create(Path f, FsPermission permission,
|
||||
boolean overwrite, int bufferSize, short replication, long blockSize,
|
||||
Progressable progress) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus getFileStatus(Path f) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getUri() {
|
||||
return URI.create("stubfs:///");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getWorkingDirectory() {
|
||||
return new Path(Path.SEPARATOR);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus[] listStatus(Path f) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean mkdirs(Path f, FsPermission permission)
|
||||
throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rename(Path src, Path dst) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWorkingDirectory(Path dir) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new AclEntry with scope, type and permission (no name).
|
||||
*
|
||||
* @param scope AclEntryScope scope of the ACL entry
|
||||
* @param type AclEntryType ACL entry type
|
||||
* @param permission FsAction set of permissions in the ACL entry
|
||||
* @return AclEntry new AclEntry
|
||||
*/
|
||||
private static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
|
||||
FsAction permission) {
|
||||
return new AclEntry.Builder()
|
||||
.setScope(scope)
|
||||
.setType(type)
|
||||
.setPermission(permission)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new AclEntry with scope, type, name and permission.
|
||||
*
|
||||
* @param scope AclEntryScope scope of the ACL entry
|
||||
* @param type AclEntryType ACL entry type
|
||||
* @param name String optional ACL entry name
|
||||
* @param permission FsAction set of permissions in the ACL entry
|
||||
* @return AclEntry new AclEntry
|
||||
*/
|
||||
private static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
|
||||
String name, FsAction permission) {
|
||||
return new AclEntry.Builder()
|
||||
.setScope(scope)
|
||||
.setType(type)
|
||||
.setName(name)
|
||||
.setPermission(permission)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts the ACL entries returned by getAclStatus for a specific path.
|
||||
*
|
||||
* @param path String path to check
|
||||
* @param entries AclEntry[] expected ACL entries
|
||||
* @throws Exception if there is any error
|
||||
*/
|
||||
private static void assertAclEntries(String path, AclEntry[] entries)
|
||||
throws Exception {
|
||||
assertArrayEquals(entries, fs.getAclStatus(new Path(path)).getEntries()
|
||||
.toArray(new AclEntry[0]));
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts the value of the FsPermission bits on the inode of a specific path.
|
||||
*
|
||||
* @param path String path to check
|
||||
* @param perm short expected permission bits
|
||||
* @throws Exception if there is any error
|
||||
*/
|
||||
private static void assertPermission(String path, short perm)
|
||||
throws Exception {
|
||||
assertEquals(perm,
|
||||
fs.getFileStatus(new Path(path)).getPermission().toShort());
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs distcp from /src to specified destination, preserving ACLs. Asserts
|
||||
* expected exit code.
|
||||
*
|
||||
* @param int exitCode expected exit code
|
||||
* @param dst String distcp destination
|
||||
* @throws Exception if there is any error
|
||||
*/
|
||||
private static void assertRunDistCp(int exitCode, String dst)
|
||||
throws Exception {
|
||||
DistCp distCp = new DistCp(conf, null);
|
||||
assertEquals(exitCode, ToolRunner.run(
|
||||
conf, distCp, new String[] { "-pa", "/src", dst }));
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the cluster, wait for it to become active, and get FileSystem.
|
||||
*
|
||||
* @param format if true, format the NameNode and DataNodes before starting up
|
||||
* @param aclsEnabled if true, ACL support is enabled
|
||||
* @throws Exception if any step fails
|
||||
*/
|
||||
private static void initCluster(boolean format, boolean aclsEnabled)
|
||||
throws Exception {
|
||||
conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, aclsEnabled);
|
||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "stubfs:///");
|
||||
conf.setClass("fs.stubfs.impl", StubFileSystem.class, FileSystem.class);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(format)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
/**
|
||||
* Restarts the cluster with ACLs enabled or disabled.
|
||||
*
|
||||
* @param aclsEnabled if true, ACL support is enabled
|
||||
* @throws Exception if any step fails
|
||||
*/
|
||||
private static void restart(boolean aclsEnabled) throws Exception {
|
||||
shutdown();
|
||||
initCluster(false, aclsEnabled);
|
||||
}
|
||||
}
|
@ -23,7 +23,6 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
@ -531,7 +530,7 @@ private void checkResult(Path listFile, int count) throws IOException {
|
||||
SequenceFile.Reader.file(listFile));
|
||||
try {
|
||||
Text relPath = new Text();
|
||||
FileStatus fileStatus = new FileStatus();
|
||||
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
|
||||
while (reader.next(relPath, fileStatus)) {
|
||||
if (fileStatus.isDirectory() && relPath.toString().equals("")) {
|
||||
// ignore root with empty relPath, which is an entry to be
|
||||
|
@ -19,7 +19,6 @@
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
@ -121,7 +120,7 @@ private void verifyContents(Path listingPath) throws Exception {
|
||||
SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(),
|
||||
listingPath, new Configuration());
|
||||
Text key = new Text();
|
||||
FileStatus value = new FileStatus();
|
||||
CopyListingFileStatus value = new CopyListingFileStatus();
|
||||
Map<String, String> actualValues = new HashMap<String, String>();
|
||||
while (reader.next(key, value)) {
|
||||
if (value.isDirectory() && key.toString().equals("")) {
|
||||
|
@ -410,6 +410,7 @@ public void testPreserve() {
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-p",
|
||||
@ -421,6 +422,7 @@ public void testPreserve() {
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-pbr",
|
||||
@ -433,6 +435,7 @@ public void testPreserve() {
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-pbrgup",
|
||||
@ -445,9 +448,10 @@ public void testPreserve() {
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-pbrgupc",
|
||||
"-pbrgupca",
|
||||
"-f",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/"});
|
||||
@ -457,6 +461,7 @@ public void testPreserve() {
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.ACL));
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-pc",
|
||||
@ -469,6 +474,7 @@ public void testPreserve() {
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
|
||||
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
|
||||
Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL));
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-p",
|
||||
@ -485,7 +491,7 @@ public void testPreserve() {
|
||||
|
||||
try {
|
||||
OptionsParser.parse(new String[] {
|
||||
"-pabc",
|
||||
"-pabcd",
|
||||
"-f",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target"});
|
||||
|
@ -42,6 +42,7 @@
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||
import org.apache.hadoop.tools.DistCpConstants;
|
||||
import org.apache.hadoop.tools.DistCpOptionSwitch;
|
||||
import org.apache.hadoop.tools.DistCpOptions;
|
||||
@ -222,7 +223,7 @@ private void testCopy(boolean preserveChecksum) {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Mapper<Text, FileStatus, Text, Text>.Context context
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
||||
Configuration configuration = context.getConfiguration();
|
||||
@ -238,7 +239,7 @@ private void testCopy(boolean preserveChecksum) {
|
||||
|
||||
for (Path path: pathList) {
|
||||
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
|
||||
fs.getFileStatus(path), context);
|
||||
new CopyListingFileStatus(fs.getFileStatus(path)), context);
|
||||
}
|
||||
|
||||
// Check that the maps worked.
|
||||
@ -283,12 +284,11 @@ private void testCopy(boolean preserveChecksum) {
|
||||
}
|
||||
|
||||
private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
|
||||
Mapper<Text, FileStatus, Text, Text>.Context context) {
|
||||
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context) {
|
||||
try {
|
||||
for (Path path : pathList) {
|
||||
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
|
||||
fs.getFileStatus(path), context);
|
||||
new CopyListingFileStatus(fs.getFileStatus(path)), context);
|
||||
}
|
||||
|
||||
Assert.assertEquals(nFiles,
|
||||
@ -309,7 +309,7 @@ public void testMakeDirFailure() {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Mapper<Text, FileStatus, Text, Text>.Context context
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
||||
Configuration configuration = context.getConfiguration();
|
||||
@ -320,7 +320,7 @@ public void testMakeDirFailure() {
|
||||
copyMapper.setup(context);
|
||||
|
||||
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
|
||||
fs.getFileStatus(pathList.get(0)), context);
|
||||
new CopyListingFileStatus(fs.getFileStatus(pathList.get(0))), context);
|
||||
|
||||
Assert.assertTrue("There should have been an exception.", false);
|
||||
}
|
||||
@ -343,7 +343,7 @@ public void testDirToFile() {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Mapper<Text, FileStatus, Text, Text>.Context context
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
||||
mkdirs(SOURCE_PATH + "/src/file");
|
||||
@ -351,7 +351,8 @@ public void testDirToFile() {
|
||||
try {
|
||||
copyMapper.setup(context);
|
||||
copyMapper.map(new Text("/src/file"),
|
||||
fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
|
||||
new CopyListingFileStatus(fs.getFileStatus(
|
||||
new Path(SOURCE_PATH + "/src/file"))),
|
||||
context);
|
||||
} catch (IOException e) {
|
||||
Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
|
||||
@ -372,22 +373,24 @@ public void testPreserve() {
|
||||
|
||||
final CopyMapper copyMapper = new CopyMapper();
|
||||
|
||||
final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
|
||||
doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
|
||||
@Override
|
||||
public Mapper<Text, FileStatus, Text, Text>.Context run() {
|
||||
try {
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
return stubContext.getContext();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception encountered ", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
|
||||
tmpUser.doAs(
|
||||
new PrivilegedAction<Mapper<Text, CopyListingFileStatus, Text, Text>.Context>() {
|
||||
@Override
|
||||
public Mapper<Text, CopyListingFileStatus, Text, Text>.Context run() {
|
||||
try {
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
return stubContext.getContext();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception encountered ", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
|
||||
EnumSet.allOf(DistCpOptions.FileAttribute.class);
|
||||
preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
|
||||
|
||||
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
|
||||
DistCpUtils.packAttributes(preserveStatus));
|
||||
@ -415,7 +418,8 @@ public Integer run() {
|
||||
try {
|
||||
copyMapper.setup(context);
|
||||
copyMapper.map(new Text("/src/file"),
|
||||
tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
|
||||
new CopyListingFileStatus(tmpFS.getFileStatus(
|
||||
new Path(SOURCE_PATH + "/src/file"))),
|
||||
context);
|
||||
Assert.fail("Expected copy to fail");
|
||||
} catch (AccessControlException e) {
|
||||
@ -442,19 +446,20 @@ public void testCopyReadableFiles() {
|
||||
|
||||
final CopyMapper copyMapper = new CopyMapper();
|
||||
|
||||
final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
|
||||
doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
|
||||
@Override
|
||||
public Mapper<Text, FileStatus, Text, Text>.Context run() {
|
||||
try {
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
return stubContext.getContext();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception encountered ", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
|
||||
tmpUser.doAs(
|
||||
new PrivilegedAction<Mapper<Text, CopyListingFileStatus, Text, Text>.Context>() {
|
||||
@Override
|
||||
public Mapper<Text, CopyListingFileStatus, Text, Text>.Context run() {
|
||||
try {
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
return stubContext.getContext();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception encountered ", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
touchFile(SOURCE_PATH + "/src/file");
|
||||
mkdirs(TARGET_PATH);
|
||||
@ -481,7 +486,8 @@ public Integer run() {
|
||||
try {
|
||||
copyMapper.setup(context);
|
||||
copyMapper.map(new Text("/src/file"),
|
||||
tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
|
||||
new CopyListingFileStatus(tmpFS.getFileStatus(
|
||||
new Path(SOURCE_PATH + "/src/file"))),
|
||||
context);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
@ -518,9 +524,11 @@ public StubContext run() {
|
||||
}
|
||||
});
|
||||
|
||||
final Mapper<Text, FileStatus, Text, Text>.Context context = stubContext.getContext();
|
||||
final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
|
||||
stubContext.getContext();
|
||||
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
|
||||
EnumSet.allOf(DistCpOptions.FileAttribute.class);
|
||||
preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
|
||||
|
||||
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
|
||||
DistCpUtils.packAttributes(preserveStatus));
|
||||
@ -551,7 +559,8 @@ public Integer run() {
|
||||
try {
|
||||
copyMapper.setup(context);
|
||||
copyMapper.map(new Text("/src/file"),
|
||||
tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
|
||||
new CopyListingFileStatus(tmpFS.getFileStatus(
|
||||
new Path(SOURCE_PATH + "/src/file"))),
|
||||
context);
|
||||
Assert.assertEquals(stubContext.getWriter().values().size(), 1);
|
||||
Assert.assertTrue(stubContext.getWriter().values().get(0).toString().startsWith("SKIP"));
|
||||
@ -594,8 +603,9 @@ public StubContext run() {
|
||||
|
||||
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
|
||||
EnumSet.allOf(DistCpOptions.FileAttribute.class);
|
||||
preserveStatus.remove(DistCpOptions.FileAttribute.ACL);
|
||||
|
||||
final Mapper<Text, FileStatus, Text, Text>.Context context
|
||||
final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
||||
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
|
||||
@ -629,7 +639,8 @@ public Integer run() {
|
||||
try {
|
||||
copyMapper.setup(context);
|
||||
copyMapper.map(new Text("/src/file"),
|
||||
tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
|
||||
new CopyListingFileStatus(tmpFS.getFileStatus(
|
||||
new Path(SOURCE_PATH + "/src/file"))),
|
||||
context);
|
||||
Assert.fail("Didn't expect the file to be copied");
|
||||
} catch (AccessControlException ignore) {
|
||||
@ -661,7 +672,7 @@ public void testFileToDir() {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Mapper<Text, FileStatus, Text, Text>.Context context
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
||||
touchFile(SOURCE_PATH + "/src/file");
|
||||
@ -669,7 +680,8 @@ public void testFileToDir() {
|
||||
try {
|
||||
copyMapper.setup(context);
|
||||
copyMapper.map(new Text("/src/file"),
|
||||
fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
|
||||
new CopyListingFileStatus(fs.getFileStatus(
|
||||
new Path(SOURCE_PATH + "/src/file"))),
|
||||
context);
|
||||
} catch (IOException e) {
|
||||
Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
|
||||
@ -688,7 +700,7 @@ private void doTestIgnoreFailures(boolean ignoreFailures) {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Mapper<Text, FileStatus, Text, Text>.Context context
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
||||
Configuration configuration = context.getConfiguration();
|
||||
@ -705,7 +717,7 @@ private void doTestIgnoreFailures(boolean ignoreFailures) {
|
||||
if (!fileStatus.isDirectory()) {
|
||||
fs.delete(path, true);
|
||||
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
|
||||
fileStatus, context);
|
||||
new CopyListingFileStatus(fileStatus), context);
|
||||
}
|
||||
}
|
||||
if (ignoreFailures) {
|
||||
@ -745,7 +757,7 @@ public void testCopyFailOnBlockSizeDifference() {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Mapper<Text, FileStatus, Text, Text>.Context context
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
||||
Configuration configuration = context.getConfiguration();
|
||||
@ -759,7 +771,7 @@ public void testCopyFailOnBlockSizeDifference() {
|
||||
for (Path path : pathList) {
|
||||
final FileStatus fileStatus = fs.getFileStatus(path);
|
||||
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
|
||||
fileStatus, context);
|
||||
new CopyListingFileStatus(fileStatus), context);
|
||||
}
|
||||
|
||||
Assert.fail("Copy should have failed because of block-size difference.");
|
||||
@ -780,7 +792,7 @@ private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Mapper<Text, FileStatus, Text, Text>.Context context
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
||||
Configuration configuration = context.getConfiguration();
|
||||
@ -798,7 +810,7 @@ private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
|
||||
for (Path path : pathList) {
|
||||
final FileStatus fileStatus = fs.getFileStatus(path);
|
||||
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
|
||||
fileStatus, context);
|
||||
new CopyListingFileStatus(fileStatus), context);
|
||||
}
|
||||
|
||||
// Check that the block-size/replication aren't preserved.
|
||||
@ -855,7 +867,7 @@ public void testSingleFileCopy() {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Mapper<Text, FileStatus, Text, Text>.Context context
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
||||
context.getConfiguration().set(
|
||||
@ -863,7 +875,8 @@ public void testSingleFileCopy() {
|
||||
targetFilePath.getParent().toString()); // Parent directory.
|
||||
copyMapper.setup(context);
|
||||
|
||||
final FileStatus sourceFileStatus = fs.getFileStatus(sourceFilePath);
|
||||
final CopyListingFileStatus sourceFileStatus = new CopyListingFileStatus(
|
||||
fs.getFileStatus(sourceFilePath));
|
||||
|
||||
long before = fs.getFileStatus(targetFilePath).getModificationTime();
|
||||
copyMapper.map(new Text(DistCpUtils.getRelativePath(
|
||||
@ -907,7 +920,7 @@ private void testPreserveUserGroupImpl(boolean preserve){
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Mapper<Text, FileStatus, Text, Text>.Context context
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
||||
Configuration configuration = context.getConfiguration();
|
||||
@ -926,7 +939,7 @@ private void testPreserveUserGroupImpl(boolean preserve){
|
||||
for (Path path : pathList) {
|
||||
final FileStatus fileStatus = fs.getFileStatus(path);
|
||||
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
|
||||
fileStatus, context);
|
||||
new CopyListingFileStatus(fileStatus), context);
|
||||
}
|
||||
|
||||
// Check that the user/group attributes are preserved
|
||||
|
@ -30,6 +30,7 @@
|
||||
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
|
||||
import org.apache.hadoop.tools.CopyListing;
|
||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||
import org.apache.hadoop.tools.DistCpOptions;
|
||||
import org.apache.hadoop.tools.StubContext;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
@ -122,8 +123,8 @@ public void testGetSplits(int nMaps) throws Exception {
|
||||
for (int i=0; i<splits.size(); ++i) {
|
||||
InputSplit split = splits.get(i);
|
||||
int currentSplitSize = 0;
|
||||
RecordReader<Text, FileStatus> recordReader = uniformSizeInputFormat.createRecordReader(
|
||||
split, null);
|
||||
RecordReader<Text, CopyListingFileStatus> recordReader =
|
||||
uniformSizeInputFormat.createRecordReader(split, null);
|
||||
StubContext stubContext = new StubContext(jobContext.getConfiguration(),
|
||||
recordReader, 0);
|
||||
final TaskAttemptContext taskAttemptContext
|
||||
@ -168,7 +169,7 @@ private void checkSplits(Path listFile, List<InputSplit> splits) throws IOExcept
|
||||
|
||||
try {
|
||||
reader.seek(lastEnd);
|
||||
FileStatus srcFileStatus = new FileStatus();
|
||||
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
|
||||
Text srcRelPath = new Text();
|
||||
Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
|
||||
} finally {
|
||||
|
@ -25,13 +25,13 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.*;
|
||||
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
||||
import org.apache.hadoop.tools.CopyListing;
|
||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||
import org.apache.hadoop.tools.DistCpOptions;
|
||||
import org.apache.hadoop.tools.StubContext;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
@ -118,15 +118,15 @@ public void testGetSplits() throws Exception {
|
||||
+"/tmp/testDynInputFormat/fileList.seq"), options);
|
||||
|
||||
JobContext jobContext = new JobContextImpl(configuration, new JobID());
|
||||
DynamicInputFormat<Text, FileStatus> inputFormat =
|
||||
new DynamicInputFormat<Text, FileStatus>();
|
||||
DynamicInputFormat<Text, CopyListingFileStatus> inputFormat =
|
||||
new DynamicInputFormat<Text, CopyListingFileStatus>();
|
||||
List<InputSplit> splits = inputFormat.getSplits(jobContext);
|
||||
|
||||
int nFiles = 0;
|
||||
int taskId = 0;
|
||||
|
||||
for (InputSplit split : splits) {
|
||||
RecordReader<Text, FileStatus> recordReader =
|
||||
RecordReader<Text, CopyListingFileStatus> recordReader =
|
||||
inputFormat.createRecordReader(split, null);
|
||||
StubContext stubContext = new StubContext(jobContext.getConfiguration(),
|
||||
recordReader, taskId);
|
||||
@ -136,7 +136,7 @@ public void testGetSplits() throws Exception {
|
||||
recordReader.initialize(splits.get(0), taskAttemptContext);
|
||||
float previousProgressValue = 0f;
|
||||
while (recordReader.nextKeyValue()) {
|
||||
FileStatus fileStatus = recordReader.getCurrentValue();
|
||||
CopyListingFileStatus fileStatus = recordReader.getCurrentValue();
|
||||
String source = fileStatus.getPath().toString();
|
||||
System.out.println(source);
|
||||
Assert.assertTrue(expectedFilePaths.contains(source));
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.junit.Assert;
|
||||
@ -106,7 +107,8 @@ public void testPreserve() {
|
||||
Path src = new Path("/tmp/src");
|
||||
fs.mkdirs(path);
|
||||
fs.mkdirs(src);
|
||||
FileStatus srcStatus = fs.getFileStatus(src);
|
||||
CopyListingFileStatus srcStatus = new CopyListingFileStatus(
|
||||
fs.getFileStatus(src));
|
||||
|
||||
FsPermission noPerm = new FsPermission((short) 0);
|
||||
fs.setPermission(path, noPerm);
|
||||
|
Loading…
Reference in New Issue
Block a user