From f81c7b0252839ae0dcd92fe2dc626ff9f87cd2c9 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Fri, 6 Jun 2014 14:45:39 +0000 Subject: [PATCH] MAPREDUCE-5898. distcp to support preserving HDFS extended attributes(XAttrs). Contributed by Yi Liu. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1600900 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../org/apache/hadoop/tools/CopyListing.java | 16 + .../hadoop/tools/CopyListingFileStatus.java | 69 +++- .../java/org/apache/hadoop/tools/DistCp.java | 6 + .../apache/hadoop/tools/DistCpConstants.java | 1 + .../hadoop/tools/DistCpOptionSwitch.java | 8 +- .../apache/hadoop/tools/DistCpOptions.java | 2 +- .../hadoop/tools/SimpleCopyListing.java | 13 +- .../hadoop/tools/mapred/CopyMapper.java | 3 +- .../apache/hadoop/tools/util/DistCpUtils.java | 60 +++- .../hadoop/tools/TestDistCpWithXAttrs.java | 322 ++++++++++++++++++ .../hadoop/tools/TestOptionsParser.java | 8 +- .../hadoop/tools/mapred/TestCopyMapper.java | 3 + 13 files changed, 496 insertions(+), 18 deletions(-) create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithXAttrs.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 835ce61924..9d7f3ad737 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -142,6 +142,9 @@ Trunk (Unreleased) MAPREDUCE-5867. Fix NPE in KillAMPreemptionPolicy related to ProportionalCapacityPreemptionPolicy (Sunil G via devaraj) + MAPREDUCE-5898. distcp to support preserving HDFS extended attributes(XAttrs) + (Yi Liu via umamahesh) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java index a0d85c556a..ab5b8024d7 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java @@ -129,6 +129,7 @@ protected abstract void doBuildListing(Path pathToListFile, /** * Validate the final resulting path listing. Checks if there are duplicate * entries. If preserving ACLs, checks that file system can support ACLs. + * If preserving XAttrs, checks that file system can support XAttrs. * * @param pathToListFile - path listing build by doBuildListing * @param options - Input options to distcp @@ -151,6 +152,7 @@ private void validateFinalListing(Path pathToListFile, DistCpOptions options) Text currentKey = new Text(); Set aclSupportCheckFsSet = Sets.newHashSet(); + Set xAttrSupportCheckFsSet = Sets.newHashSet(); while (reader.next(currentKey)) { if (currentKey.equals(lastKey)) { CopyListingFileStatus currentFileStatus = new CopyListingFileStatus(); @@ -167,6 +169,14 @@ private void validateFinalListing(Path pathToListFile, DistCpOptions options) aclSupportCheckFsSet.add(lastFsUri); } } + if (options.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) { + FileSystem lastFs = lastFileStatus.getPath().getFileSystem(config); + URI lastFsUri = lastFs.getUri(); + if (!xAttrSupportCheckFsSet.contains(lastFsUri)) { + DistCpUtils.checkFileSystemXAttrSupport(lastFs); + xAttrSupportCheckFsSet.add(lastFsUri); + } + } lastKey.set(currentKey); } } finally { @@ -256,4 +266,10 @@ public AclsNotSupportedException(String message) { super(message); } } + + public static class XAttrsNotSupportedException extends RuntimeException { + public XAttrsNotSupportedException(String message) { + super(message); + } + } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java index 3a0c37fd3f..04437d78a2 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java @@ -21,7 +21,10 @@ import java.io.DataOutput; import java.io.IOException; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileStatus; @@ -34,6 +37,7 @@ import com.google.common.base.Objects; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * CopyListingFileStatus is a specialized subclass of {@link FileStatus} for @@ -45,6 +49,7 @@ public final class CopyListingFileStatus extends FileStatus { private static final byte NO_ACL_ENTRIES = -1; + private static final int NO_XATTRS = -1; // Retain static arrays of enum values to prevent repeated allocation of new // arrays during deserialization. @@ -53,6 +58,7 @@ public final class CopyListingFileStatus extends FileStatus { private static final FsAction[] FS_ACTIONS = FsAction.values(); private List aclEntries; + private Map xAttrs; /** * Default constructor. @@ -88,6 +94,24 @@ public List getAclEntries() { public void setAclEntries(List aclEntries) { this.aclEntries = aclEntries; } + + /** + * Returns all xAttrs. + * + * @return Map containing all xAttrs + */ + public Map getXAttrs() { + return xAttrs; + } + + /** + * Sets optional xAttrs. + * + * @param xAttrs Map containing all xAttrs + */ + public void setXAttrs(Map xAttrs) { + this.xAttrs = xAttrs; + } @Override public void write(DataOutput out) throws IOException { @@ -104,6 +128,26 @@ public void write(DataOutput out) throws IOException { } else { out.writeByte(NO_ACL_ENTRIES); } + + if (xAttrs != null) { + out.writeInt(xAttrs.size()); + Iterator> iter = xAttrs.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = iter.next(); + WritableUtils.writeString(out, entry.getKey()); + final byte[] value = entry.getValue(); + if (value != null) { + out.writeInt(value.length); + if (value.length > 0) { + out.write(value); + } + } else { + out.writeInt(-1); + } + } + } else { + out.writeInt(NO_XATTRS); + } } @Override @@ -123,6 +167,25 @@ public void readFields(DataInput in) throws IOException { } else { aclEntries = null; } + + int xAttrsSize = in.readInt(); + if (xAttrsSize != NO_XATTRS) { + xAttrs = Maps.newHashMap(); + for (int i = 0; i < xAttrsSize; ++i) { + final String name = WritableUtils.readString(in); + final int valueLen = in.readInt(); + byte[] value = null; + if (valueLen > -1) { + value = new byte[valueLen]; + if (valueLen > 0) { + in.readFully(value); + } + } + xAttrs.put(name, value); + } + } else { + xAttrs = null; + } } @Override @@ -134,12 +197,13 @@ public boolean equals(Object o) { return false; } CopyListingFileStatus other = (CopyListingFileStatus)o; - return Objects.equal(aclEntries, other.aclEntries); + return Objects.equal(aclEntries, other.aclEntries) && + Objects.equal(xAttrs, other.xAttrs); } @Override public int hashCode() { - return Objects.hashCode(super.hashCode(), aclEntries); + return Objects.hashCode(super.hashCode(), aclEntries, xAttrs); } @Override @@ -147,6 +211,7 @@ public String toString() { StringBuilder sb = new StringBuilder(super.toString()); sb.append('{'); sb.append("aclEntries = " + aclEntries); + sb.append(", xAttrs = " + xAttrs); sb.append('}'); return sb.toString(); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index b3c506ea09..d202f0a9bd 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -128,6 +128,9 @@ public int run(String[] argv) { } catch (AclsNotSupportedException e) { LOG.error("ACLs not supported on at least one file system: ", e); return DistCpConstants.ACLS_NOT_SUPPORTED; + } catch (XAttrsNotSupportedException e) { + LOG.error("XAttrs not supported on at least one file system: ", e); + return DistCpConstants.XATTRS_NOT_SUPPORTED; } catch (Exception e) { LOG.error("Exception encountered ", e); return DistCpConstants.UNKNOWN_ERROR; @@ -304,6 +307,9 @@ private void configureOutputFormat(Job job) throws IOException { if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) { DistCpUtils.checkFileSystemAclSupport(targetFS); } + if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) { + DistCpUtils.checkFileSystemXAttrSupport(targetFS); + } if (inputOptions.shouldAtomicCommit()) { Path workDir = inputOptions.getAtomicWorkPath(); if (workDir == null) { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 5fa26da613..d1dba19f2f 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -117,6 +117,7 @@ public class DistCpConstants { public static final int INVALID_ARGUMENT = -1; public static final int DUPLICATE_INPUT = -2; public static final int ACLS_NOT_SUPPORTED = -3; + public static final int XATTRS_NOT_SUPPORTED = -4; public static final int UNKNOWN_ERROR = -999; /** diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index bfaba966be..2f2eb7c838 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -45,10 +45,10 @@ public enum DistCpOptionSwitch { * */ PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS, - new Option("p", true, "preserve status (rbugpca)(replication, " + - "block-size, user, group, permission, checksum-type, ACL). If " + - "-p is specified with no , then preserves replication, block " + - "size, user, group, permission and checksum type.")), + new Option("p", true, "preserve status (rbugpcax)(replication, " + + "block-size, user, group, permission, checksum-type, ACL, XATTR). " + + "If -p is specified with no , then preserves replication, " + + "block size, user, group, permission and checksum type.")), /** * Update target location by copying only files that are missing diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 2d19c6afc1..1ed9ccdb3c 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -66,7 +66,7 @@ public class DistCpOptions { private boolean targetPathExists = true; public static enum FileAttribute{ - REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL; + REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR; public static FileAttribute getAttribute(char symbol) { for (FileAttribute attribute : values()) { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index 3bce893c14..ad29942206 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.IOUtils; @@ -36,7 +35,6 @@ import com.google.common.annotations.VisibleForTesting; import java.io.*; -import java.util.List; import java.util.Stack; /** @@ -123,7 +121,7 @@ public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws * the the source root is a directory, then the source root entry is not * written to the sequence file, because only the contents of the source * directory need to be copied in this case. - * See {@link org.apache.hadoop.tools.util.DistCpUtils.getRelativePath} for + * See {@link org.apache.hadoop.tools.util.DistCpUtils#getRelativePath} for * how relative path is computed. * See computeSourceRootPath method for how the root path of the source is * computed. @@ -147,7 +145,8 @@ public void doBuildListing(SequenceFile.Writer fileListWriter, if (!explore || rootStatus.isDirectory()) { CopyListingFileStatus rootCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus, - options.shouldPreserve(FileAttribute.ACL)); + options.shouldPreserve(FileAttribute.ACL), + options.shouldPreserve(FileAttribute.XATTR)); writeToFileListingRoot(fileListWriter, rootCopyListingStatus, sourcePathRoot, options); } @@ -159,7 +158,8 @@ public void doBuildListing(SequenceFile.Writer fileListWriter, CopyListingFileStatus sourceCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus, options.shouldPreserve(FileAttribute.ACL) && - sourceStatus.isDirectory()); + sourceStatus.isDirectory(), options.shouldPreserve( + FileAttribute.XATTR) && sourceStatus.isDirectory()); writeToFileListing(fileListWriter, sourceCopyListingStatus, sourcePathRoot, options); @@ -271,7 +271,8 @@ private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter, + sourceStatus.getPath() + " for copy."); CopyListingFileStatus childCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, child, - options.shouldPreserve(FileAttribute.ACL) && child.isDirectory()); + options.shouldPreserve(FileAttribute.ACL) && child.isDirectory(), + options.shouldPreserve(FileAttribute.XATTR) && child.isDirectory()); writeToFileListing(fileListWriter, childCopyListingStatus, sourcePathRoot, options); if (isDirectoryAndNotEmpty(sourceFS, child)) { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index 02337f78bb..4ee003d916 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -213,7 +213,8 @@ public void map(Text relPath, CopyListingFileStatus sourceFileStatus, sourceFS = sourcePath.getFileSystem(conf); sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, sourceFS.getFileStatus(sourcePath), - fileAttributes.contains(FileAttribute.ACL)); + fileAttributes.contains(FileAttribute.ACL), + fileAttributes.contains(FileAttribute.XATTR)); } catch (FileNotFoundException e) { throw new IOException(new RetriableFileCopyCommand.CopyReadException(e)); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java index 653634d8d9..c7b29a1088 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.tools.CopyListing.XAttrsNotSupportedException; import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.mapred.UniformSizeInputFormat; @@ -39,8 +40,11 @@ import java.io.IOException; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; import java.text.DecimalFormat; import java.net.URI; import java.net.InetAddress; @@ -210,6 +214,18 @@ public static void preserve(FileSystem targetFS, Path path, !srcFileStatus.getPermission().equals(targetFileStatus.getPermission())) { targetFS.setPermission(path, srcFileStatus.getPermission()); } + + if (attributes.contains(FileAttribute.XATTR)) { + Map srcXAttrs = srcFileStatus.getXAttrs(); + Map targetXAttrs = getXAttrs(targetFS, path); + if (!srcXAttrs.equals(targetXAttrs)) { + Iterator> iter = srcXAttrs.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = iter.next(); + targetFS.setXAttr(path, entry.getKey(), entry.getValue()); + } + } + } if (attributes.contains(FileAttribute.REPLICATION) && ! targetFileStatus.isDirectory() && srcFileStatus.getReplication() != targetFileStatus.getReplication()) { @@ -247,19 +263,34 @@ public static List getAcl(FileSystem fileSystem, .getEntries(); return AclUtil.getAclFromPermAndEntries(fileStatus.getPermission(), entries); } + + /** + * Returns a file's all xAttrs. + * + * @param fileSystem FileSystem containing the file + * @param path file path + * @return Map containing all xAttrs + * @throws IOException if there is an I/O error + */ + public static Map getXAttrs(FileSystem fileSystem, + Path path) throws IOException { + return fileSystem.getXAttrs(path); + } /** * Converts a FileStatus to a CopyListingFileStatus. If preserving ACLs, - * populates the CopyListingFileStatus with the ACLs. + * populates the CopyListingFileStatus with the ACLs. If preserving XAttrs, + * populates the CopyListingFileStatus with the XAttrs. * * @param fileSystem FileSystem containing the file * @param fileStatus FileStatus of file * @param preserveAcls boolean true if preserving ACLs + * @param preserveXAttrs boolean true if preserving XAttrs * @throws IOException if there is an I/O error */ public static CopyListingFileStatus toCopyListingFileStatus( - FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls) - throws IOException { + FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls, + boolean preserveXAttrs) throws IOException { CopyListingFileStatus copyListingFileStatus = new CopyListingFileStatus(fileStatus); if (preserveAcls) { @@ -270,6 +301,10 @@ public static CopyListingFileStatus toCopyListingFileStatus( copyListingFileStatus.setAclEntries(aclEntries); } } + if (preserveXAttrs) { + Map xAttrs = fileSystem.getXAttrs(fileStatus.getPath()); + copyListingFileStatus.setXAttrs(xAttrs); + } return copyListingFileStatus; } @@ -314,6 +349,25 @@ public static void checkFileSystemAclSupport(FileSystem fs) + fs.getUri()); } } + + /** + * Determines if a file system supports XAttrs by running a getXAttrs request + * on the file system root. This method is used before distcp job submission + * to fail fast if the user requested preserving XAttrs, but the file system + * cannot support XAttrs. + * + * @param fs FileSystem to check + * @throws XAttrsNotSupportedException if fs does not support XAttrs + */ + public static void checkFileSystemXAttrSupport(FileSystem fs) + throws XAttrsNotSupportedException { + try { + fs.getXAttrs(new Path(Path.SEPARATOR)); + } catch (Exception e) { + throw new XAttrsNotSupportedException("XAttrs not supported for file system: " + + fs.getUri()); + } + } /** * String utility to convert a number-of-bytes to human readable format. diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithXAttrs.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithXAttrs.java new file mode 100644 index 0000000000..cc13b8fc22 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithXAttrs.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.URI; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.ToolRunner; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +/** + * Tests distcp in combination with HDFS XAttrs. + */ +public class TestDistCpWithXAttrs { + + private static MiniDFSCluster cluster; + private static Configuration conf; + private static FileSystem fs; + + //XAttrs + private static final String name1 = "user.a1"; + private static final byte[] value1 = {0x31, 0x32, 0x33}; + private static final String name2 = "trusted.a2"; + private static final byte[] value2 = {0x37, 0x38, 0x39}; + private static final String name3 = "user.a3"; + private static final byte[] value3 = null; + private static final String name4 = "user.a4"; + private static final byte[] value4 = null; + + private static final Path dir1 = new Path("/src/dir1"); + private static final Path subDir1 = new Path(dir1, "subdir1"); + private static final Path file1 = new Path("/src/file1"); + private static final Path dir2 = new Path("/src/dir2"); + private static final Path file2 = new Path(dir2, "file2"); + private static final Path file3 = new Path(dir2, "file3"); + private static final Path file4 = new Path(dir2, "file4"); + private static final Path dstDir1 = new Path("/dstPreserveXAttrs/dir1"); + private static final Path dstSubDir1 = new Path(dstDir1, "subdir1"); + private static final Path dstFile1 = new Path("/dstPreserveXAttrs/file1"); + private static final Path dstDir2 = new Path("/dstPreserveXAttrs/dir2"); + private static final Path dstFile2 = new Path(dstDir2, "file2"); + private static final Path dstFile3 = new Path(dstDir2, "file3"); + private static final Path dstFile4 = new Path(dstDir2, "file4"); + + @BeforeClass + public static void init() throws Exception { + initCluster(true, true); + fs.mkdirs(subDir1); + fs.create(file1).close(); + fs.mkdirs(dir2); + fs.create(file2).close(); + fs.create(file3).close(); + fs.create(file4).close(); + + // dir1 + fs.setXAttr(dir1, name1, value1); + fs.setXAttr(dir1, name2, value2); + + // subDir1 + fs.setXAttr(subDir1, name1, value1); + fs.setXAttr(subDir1, name3, value3); + + // file1 + fs.setXAttr(file1, name1, value1); + fs.setXAttr(file1, name2, value2); + fs.setXAttr(file1, name3, value3); + + // dir2 + fs.setXAttr(dir2, name2, value2); + + // file2 + fs.setXAttr(file2, name1, value1); + fs.setXAttr(file2, name4, value4); + + // file3 + fs.setXAttr(file3, name3, value3); + fs.setXAttr(file3, name4, value4); + } + + @AfterClass + public static void shutdown() { + IOUtils.cleanup(null, fs); + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testPreserveXAttrs() throws Exception { + assertRunDistCp(DistCpConstants.SUCCESS, "/dstPreserveXAttrs"); + + // dstDir1 + Map xAttrs = Maps.newHashMap(); + xAttrs.put(name1, value1); + xAttrs.put(name2, value2); + assertXAttrs(dstDir1, xAttrs); + + // dstSubDir1 + xAttrs.clear(); + xAttrs.put(name1, value1); + xAttrs.put(name3, new byte[0]); + assertXAttrs(dstSubDir1, xAttrs); + + // dstFile1 + xAttrs.clear(); + xAttrs.put(name1, value1); + xAttrs.put(name2, value2); + xAttrs.put(name3, new byte[0]); + assertXAttrs(dstFile1, xAttrs); + + // dstDir2 + xAttrs.clear(); + xAttrs.put(name2, value2); + assertXAttrs(dstDir2, xAttrs); + + // dstFile2 + xAttrs.clear(); + xAttrs.put(name1, value1); + xAttrs.put(name4, new byte[0]); + assertXAttrs(dstFile2, xAttrs); + + // dstFile3 + xAttrs.clear(); + xAttrs.put(name3, new byte[0]); + xAttrs.put(name4, new byte[0]); + assertXAttrs(dstFile3, xAttrs); + + // dstFile4 + xAttrs.clear(); + assertXAttrs(dstFile4, xAttrs); + } + + @Test + public void testXAttrsNotEnabled() throws Exception { + try { + restart(false); + assertRunDistCp(DistCpConstants.XATTRS_NOT_SUPPORTED, + "/dstXAttrsNotEnabled"); + } finally { + restart(true); + } + } + + @Test + public void testXAttrsNotImplemented() throws Exception { + assertRunDistCp(DistCpConstants.XATTRS_NOT_SUPPORTED, + "stubfs://dstXAttrsNotImplemented"); + } + + /** + * Stub FileSystem implementation used for testing the case of attempting + * distcp with XAttrs preserved on a file system that does not support XAttrs. + * The base class implementation throws UnsupportedOperationException for + * the XAttr methods, so we don't need to override them. + */ + public static class StubFileSystem extends FileSystem { + + @Override + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) throws IOException { + return null; + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + return null; + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + return false; + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return null; + } + + @Override + public URI getUri() { + return URI.create("stubfs:///"); + } + + @Override + public Path getWorkingDirectory() { + return new Path(Path.SEPARATOR); + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + return null; + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + return false; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return null; + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return false; + } + + @Override + public void setWorkingDirectory(Path dir) { + } + } + + /** + * Asserts the XAttrs returned by getXAttrs for a specific path. + * + * @param path String path to check + * @param xAttrs XAttr[] expected xAttrs + * @throws Exception if there is any error + */ + private static void assertXAttrs(Path path, Map expectedXAttrs) + throws Exception { + Map xAttrs = fs.getXAttrs(path); + assertEquals(expectedXAttrs.size(), xAttrs.size()); + Iterator> i = expectedXAttrs.entrySet().iterator(); + while (i.hasNext()) { + Entry e = i.next(); + String name = e.getKey(); + byte[] value = e.getValue(); + if (value == null) { + assertTrue(xAttrs.containsKey(name) && xAttrs.get(name) == null); + } else { + assertArrayEquals(value, xAttrs.get(name)); + } + } + } + + /** + * Runs distcp from /src to specified destination, preserving XAttrs. Asserts + * expected exit code. + * + * @param int exitCode expected exit code + * @param dst String distcp destination + * @throws Exception if there is any error + */ + private static void assertRunDistCp(int exitCode, String dst) + throws Exception { + DistCp distCp = new DistCp(conf, null); + assertEquals(exitCode, + ToolRunner.run(conf, distCp, new String[] { "-px", "/src", dst })); + } + + /** + * Initialize the cluster, wait for it to become active, and get FileSystem. + * + * @param format if true, format the NameNode and DataNodes before starting up + * @param xAttrsEnabled if true, XAttr support is enabled + * @throws Exception if any step fails + */ + private static void initCluster(boolean format, boolean xAttrsEnabled) + throws Exception { + conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, xAttrsEnabled); + conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "stubfs:///"); + conf.setClass("fs.stubfs.impl", StubFileSystem.class, FileSystem.class); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(format) + .build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + } + + /** + * Restarts the cluster with XAttrs enabled or disabled. + * + * @param xAttrsEnabled if true, XAttr support is enabled + * @throws Exception if any step fails + */ + private static void restart(boolean xAttrsEnabled) throws Exception { + shutdown(); + initCluster(false, xAttrsEnabled); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index 8486aa1fce..d3da68ed38 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -414,6 +414,7 @@ public void testPreserve() { Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR)); options = OptionsParser.parse(new String[] { "-p", @@ -426,6 +427,7 @@ public void testPreserve() { Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR)); options = OptionsParser.parse(new String[] { "-pbr", @@ -439,6 +441,7 @@ public void testPreserve() { Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP)); Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR)); options = OptionsParser.parse(new String[] { "-pbrgup", @@ -452,9 +455,10 @@ public void testPreserve() { Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR)); options = OptionsParser.parse(new String[] { - "-pbrgupca", + "-pbrgupcax", "-f", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/"}); @@ -465,6 +469,7 @@ public void testPreserve() { Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); Assert.assertTrue(options.shouldPreserve(FileAttribute.ACL)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.XATTR)); options = OptionsParser.parse(new String[] { "-pc", @@ -478,6 +483,7 @@ public void testPreserve() { Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP)); Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR)); options = OptionsParser.parse(new String[] { "-p", diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index 2f16682fcd..3f84772dd2 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -450,6 +450,7 @@ public Mapper.Context run() { EnumSet preserveStatus = EnumSet.allOf(DistCpOptions.FileAttribute.class); preserveStatus.remove(DistCpOptions.FileAttribute.ACL); + preserveStatus.remove(DistCpOptions.FileAttribute.XATTR); context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS, DistCpUtils.packAttributes(preserveStatus)); @@ -588,6 +589,7 @@ public StubContext run() { EnumSet preserveStatus = EnumSet.allOf(DistCpOptions.FileAttribute.class); preserveStatus.remove(DistCpOptions.FileAttribute.ACL); + preserveStatus.remove(DistCpOptions.FileAttribute.XATTR); context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS, DistCpUtils.packAttributes(preserveStatus)); @@ -663,6 +665,7 @@ public StubContext run() { EnumSet preserveStatus = EnumSet.allOf(DistCpOptions.FileAttribute.class); preserveStatus.remove(DistCpOptions.FileAttribute.ACL); + preserveStatus.remove(DistCpOptions.FileAttribute.XATTR); final Mapper.Context context = stubContext.getContext();