diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index b0b8fb7c33..1624ce2bc3 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -26,6 +26,9 @@ Trunk (Unreleased) HADOOP-11627. Remove io.native.lib.available. (Brahma Reddy Battula via aajisaka) + HADOOP-11698. Remove DistCpV1 and Logalyzer. + (Brahma Reddy Battula via aajisaka) + NEW FEATURES HADOOP-6590. Add a username check for hadoop sub-commands (John Smith via diff --git a/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java b/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java deleted file mode 100644 index 39ac5c37cc..0000000000 --- a/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java +++ /dev/null @@ -1,1674 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.tools; - -import java.io.BufferedReader; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; -import java.util.Stack; -import java.util.StringTokenizer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileChecksum; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.Trash; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.protocol.QuotaExceededException; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.InvalidInputException; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileRecordReader; -import org.apache.hadoop.mapreduce.JobSubmissionFiles; -import org.apache.hadoop.mapreduce.security.TokenCache; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; - -/** - * A Map-reduce program to recursively copy directories between - * different file-systems. - */ -@Deprecated -public class DistCpV1 implements Tool { - public static final Log LOG = LogFactory.getLog(DistCpV1.class); - - private static final String NAME = "distcp"; - - private static final String usage = NAME - + " [OPTIONS] * " + - "\n\nOPTIONS:" + - "\n-p[rbugpt] Preserve status" + - "\n r: replication number" + - "\n b: block size" + - "\n u: user" + - "\n g: group" + - "\n p: permission" + - "\n t: modification and access times" + - "\n -p alone is equivalent to -prbugpt" + - "\n-i Ignore failures" + - "\n-basedir Use as the base directory when copying files from " + - "\n-log Write logs to " + - "\n-m Maximum number of simultaneous copies" + - "\n-overwrite Overwrite destination" + - "\n-update Overwrite if src size different from dst size" + - "\n-skipcrccheck Do not use CRC check to determine if src is " + - "\n different from dest. Relevant only if -update" + - "\n is specified" + - "\n-f Use list at as src list" + - "\n-filelimit Limit the total number of files to be <= n" + - "\n-sizelimit Limit the total size to be <= n bytes" + - "\n-delete Delete the files existing in the dst but not in src" + - "\n-dryrun Display count of files and total size of files" + - "\n in src and then exit. Copy is not done at all." + - "\n desturl should not be speicified with out -update." + - "\n-mapredSslConf Filename of SSL configuration for mapper task" + - - "\n\nNOTE 1: if -overwrite or -update are set, each source URI is " + - "\n interpreted as an isomorphic update to an existing directory." + - "\nFor example:" + - "\nhadoop " + NAME + " -p -update \"hdfs://A:8020/user/foo/bar\" " + - "\"hdfs://B:8020/user/foo/baz\"\n" + - "\n would update all descendants of 'baz' also in 'bar'; it would " + - "\n *not* update /user/foo/baz/bar" + - - "\n\nNOTE 2: The parameter in -filelimit and -sizelimit can be " + - "\n specified with symbolic representation. For examples," + - "\n 1230k = 1230 * 1024 = 1259520" + - "\n 891g = 891 * 1024^3 = 956703965184" + - - "\n"; - - private static final long BYTES_PER_MAP = 256 * 1024 * 1024; - private static final int MAX_MAPS_PER_NODE = 20; - private static final int SYNC_FILE_MAX = 10; - private static final int DEFAULT_FILE_RETRIES = 3; - - static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED } - static enum Options { - DELETE("-delete", NAME + ".delete"), - FILE_LIMIT("-filelimit", NAME + ".limit.file"), - SIZE_LIMIT("-sizelimit", NAME + ".limit.size"), - IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"), - PRESERVE_STATUS("-p", NAME + ".preserve.status"), - OVERWRITE("-overwrite", NAME + ".overwrite.always"), - UPDATE("-update", NAME + ".overwrite.ifnewer"), - SKIPCRC("-skipcrccheck", NAME + ".skip.crc.check"); - - final String cmd, propertyname; - - private Options(String cmd, String propertyname) { - this.cmd = cmd; - this.propertyname = propertyname; - } - - private long parseLong(String[] args, int offset) { - if (offset == args.length) { - throw new IllegalArgumentException(" not specified in " + cmd); - } - long n = StringUtils.TraditionalBinaryPrefix.string2long(args[offset]); - if (n <= 0) { - throw new IllegalArgumentException("n = " + n + " <= 0 in " + cmd); - } - return n; - } - } - static enum FileAttribute { - BLOCK_SIZE, REPLICATION, USER, GROUP, PERMISSION, TIMES; - - final char symbol; - - private FileAttribute() { - symbol = StringUtils.toLowerCase(toString()).charAt(0); - } - - static EnumSet parse(String s) { - if (s == null || s.length() == 0) { - return EnumSet.allOf(FileAttribute.class); - } - - EnumSet set = EnumSet.noneOf(FileAttribute.class); - FileAttribute[] attributes = values(); - for(char c : s.toCharArray()) { - int i = 0; - for(; i < attributes.length && c != attributes[i].symbol; i++); - if (i < attributes.length) { - if (!set.contains(attributes[i])) { - set.add(attributes[i]); - } else { - throw new IllegalArgumentException("There are more than one '" - + attributes[i].symbol + "' in " + s); - } - } else { - throw new IllegalArgumentException("'" + c + "' in " + s - + " is undefined."); - } - } - return set; - } - } - - static final String TMP_DIR_LABEL = NAME + ".tmp.dir"; - static final String DST_DIR_LABEL = NAME + ".dest.path"; - static final String JOB_DIR_LABEL = NAME + ".job.dir"; - static final String MAX_MAPS_LABEL = NAME + ".max.map.tasks"; - static final String SRC_LIST_LABEL = NAME + ".src.list"; - static final String SRC_COUNT_LABEL = NAME + ".src.count"; - static final String TOTAL_SIZE_LABEL = NAME + ".total.size"; - static final String DST_DIR_LIST_LABEL = NAME + ".dst.dir.list"; - static final String BYTES_PER_MAP_LABEL = NAME + ".bytes.per.map"; - static final String PRESERVE_STATUS_LABEL - = Options.PRESERVE_STATUS.propertyname + ".value"; - static final String FILE_RETRIES_LABEL = NAME + ".file.retries"; - - private JobConf conf; - - public void setConf(Configuration conf) { - if (conf instanceof JobConf) { - this.conf = (JobConf) conf; - } else { - this.conf = new JobConf(conf); - } - } - - public Configuration getConf() { - return conf; - } - - public DistCpV1(Configuration conf) { - setConf(conf); - } - - /** - * An input/output pair of filenames. - */ - static class FilePair implements Writable { - FileStatus input = new FileStatus(); - String output; - FilePair() { } - FilePair(FileStatus input, String output) { - this.input = input; - this.output = output; - } - public void readFields(DataInput in) throws IOException { - input.readFields(in); - output = Text.readString(in); - } - public void write(DataOutput out) throws IOException { - input.write(out); - Text.writeString(out, output); - } - public String toString() { - return input + " : " + output; - } - } - - /** - * InputFormat of a distcp job responsible for generating splits of the src - * file list. - */ - static class CopyInputFormat implements InputFormat { - - /** - * Produce splits such that each is no greater than the quotient of the - * total size and the number of splits requested. - * @param job The handle to the JobConf object - * @param numSplits Number of splits requested - */ - public InputSplit[] getSplits(JobConf job, int numSplits) - throws IOException { - int cnfiles = job.getInt(SRC_COUNT_LABEL, -1); - long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1); - String srcfilelist = job.get(SRC_LIST_LABEL, ""); - if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) { - throw new RuntimeException("Invalid metadata: #files(" + cnfiles + - ") total_size(" + cbsize + ") listuri(" + - srcfilelist + ")"); - } - Path src = new Path(srcfilelist); - FileSystem fs = src.getFileSystem(job); - FileStatus srcst = fs.getFileStatus(src); - - ArrayList splits = new ArrayList(numSplits); - LongWritable key = new LongWritable(); - FilePair value = new FilePair(); - final long targetsize = cbsize / numSplits; - long pos = 0L; - long last = 0L; - long acc = 0L; - long cbrem = srcst.getLen(); - try (SequenceFile.Reader sl = - new SequenceFile.Reader(job, Reader.file(src))) { - for (; sl.next(key, value); last = sl.getPosition()) { - // if adding this split would put this split past the target size, - // cut the last split and put this next file in the next split. - if (acc + key.get() > targetsize && acc != 0) { - long splitsize = last - pos; - splits.add(new FileSplit(src, pos, splitsize, (String[])null)); - cbrem -= splitsize; - pos = last; - acc = 0L; - } - acc += key.get(); - } - } - if (cbrem != 0) { - splits.add(new FileSplit(src, pos, cbrem, (String[])null)); - } - - return splits.toArray(new FileSplit[splits.size()]); - } - - /** - * Returns a reader for this split of the src file list. - */ - public RecordReader getRecordReader(InputSplit split, - JobConf job, Reporter reporter) throws IOException { - return new SequenceFileRecordReader(job, (FileSplit)split); - } - } - - /** - * FSCopyFilesMapper: The mapper for copying files between FileSystems. - */ - static class CopyFilesMapper - implements Mapper, Text> { - // config - private int sizeBuf = 128 * 1024; - private FileSystem destFileSys = null; - private boolean ignoreReadFailures; - private boolean preserve_status; - private EnumSet preseved; - private boolean overwrite; - private boolean update; - private Path destPath = null; - private byte[] buffer = null; - private JobConf job; - private boolean skipCRCCheck = false; - - // stats - private int failcount = 0; - private int skipcount = 0; - private int copycount = 0; - - private String getCountString() { - return "Copied: " + copycount + " Skipped: " + skipcount - + " Failed: " + failcount; - } - private void updateStatus(Reporter reporter) { - reporter.setStatus(getCountString()); - } - - /** - * Return true if dst should be replaced by src and the update flag is set. - * Right now, this merely checks that the src and dst len are not equal. - * This should be improved on once modification times, CRCs, etc. can - * be meaningful in this context. - * @throws IOException - */ - private boolean needsUpdate(FileStatus srcstatus, - FileSystem dstfs, Path dstpath) throws IOException { - return update && !sameFile(srcstatus.getPath().getFileSystem(job), - srcstatus, dstfs, dstpath, skipCRCCheck); - } - - private FSDataOutputStream create(Path f, Reporter reporter, - FileStatus srcstat) throws IOException { - if (destFileSys.exists(f)) { - destFileSys.delete(f, false); - } - if (!preserve_status) { - return destFileSys.create(f, true, sizeBuf, reporter); - } - - FsPermission permission = preseved.contains(FileAttribute.PERMISSION)? - srcstat.getPermission(): null; - short replication = preseved.contains(FileAttribute.REPLICATION)? - srcstat.getReplication(): destFileSys.getDefaultReplication(f); - long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)? - srcstat.getBlockSize(): destFileSys.getDefaultBlockSize(f); - return destFileSys.create(f, permission, true, sizeBuf, replication, - blockSize, reporter); - } - - /** - * Validates copy by checking the sizes of files first and then - * checksums, if the filesystems support checksums. - * @param srcstat src path and metadata - * @param absdst dst path - * @return true if src & destination files are same - */ - private boolean validateCopy(FileStatus srcstat, Path absdst) - throws IOException { - if (destFileSys.exists(absdst)) { - if (sameFile(srcstat.getPath().getFileSystem(job), srcstat, - destFileSys, absdst, skipCRCCheck)) { - return true; - } - } - return false; - } - - /** - * Increment number of files copied and bytes copied and then report status - */ - void updateCopyStatus(FileStatus srcstat, Reporter reporter) { - copycount++; - reporter.incrCounter(Counter.BYTESCOPIED, srcstat.getLen()); - reporter.incrCounter(Counter.COPY, 1); - updateStatus(reporter); - } - - /** - * Skip copying this file if already exists at the destination. - * Updates counters and copy status if skipping this file. - * @return true if copy of this file can be skipped - */ - private boolean skipCopyFile(FileStatus srcstat, Path absdst, - OutputCollector, Text> outc, - Reporter reporter) throws IOException { - if (destFileSys.exists(absdst) && !overwrite - && !needsUpdate(srcstat, destFileSys, absdst)) { - outc.collect(null, new Text("SKIP: " + srcstat.getPath())); - ++skipcount; - reporter.incrCounter(Counter.SKIP, 1); - updateStatus(reporter); - return true; - } - return false; - } - - /** - * Copies single file to the path specified by tmpfile. - * @param srcstat src path and metadata - * @param tmpfile temporary file to which copy is to be done - * @param absdst actual destination path to which copy is to be done - * @param reporter - * @return Number of bytes copied - */ - private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst, - Reporter reporter) throws IOException { - long bytesCopied = 0L; - Path srcPath = srcstat.getPath(); - // open src file - try (FSDataInputStream in = srcPath.getFileSystem(job).open(srcPath)) { - reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen()); - // open tmp file - try (FSDataOutputStream out = create(tmpfile, reporter, srcstat)) { - LOG.info("Copying file " + srcPath + " of size " + - srcstat.getLen() + " bytes..."); - - // copy file - for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) { - out.write(buffer, 0, bytesRead); - bytesCopied += bytesRead; - reporter.setStatus( - String.format("%.2f ", bytesCopied*100.0/srcstat.getLen()) - + absdst + " [ " + - TraditionalBinaryPrefix.long2String(bytesCopied, "", 1) + " / " - + TraditionalBinaryPrefix.long2String(srcstat.getLen(), "", 1) - + " ]"); - } - } - } - return bytesCopied; - } - - /** - * Copy a file to a destination. - * @param srcstat src path and metadata - * @param relativedst relative dst path - * @param outc Log of skipped files - * @param reporter - * @throws IOException if copy fails(even if the validation of copy fails) - */ - private void copy(FileStatus srcstat, Path relativedst, - OutputCollector, Text> outc, Reporter reporter) - throws IOException { - Path absdst = new Path(destPath, relativedst); - int totfiles = job.getInt(SRC_COUNT_LABEL, -1); - assert totfiles >= 0 : "Invalid file count " + totfiles; - - if (totfiles == 1) { - // Copying a single file; use dst path provided by user as - // destination file rather than destination directory - Path dstparent = absdst.getParent(); - if (!(destFileSys.exists(dstparent) && - destFileSys.getFileStatus(dstparent).isDirectory())) { - absdst = dstparent; - } - } - - // if a directory, ensure created even if empty - if (srcstat.isDirectory()) { - if (destFileSys.exists(absdst)) { - if (destFileSys.getFileStatus(absdst).isFile()) { - throw new IOException("Failed to mkdirs: " + absdst+" is a file."); - } - } - else if (!destFileSys.mkdirs(absdst)) { - throw new IOException("Failed to mkdirs " + absdst); - } - // TODO: when modification times can be set, directories should be - // emitted to reducers so they might be preserved. Also, mkdirs does - // not currently return an error when the directory already exists; - // if this changes, all directory work might as well be done in reduce - return; - } - - // Can we skip copying this file ? - if (skipCopyFile(srcstat, absdst, outc, reporter)) { - return; - } - - Path tmpfile = new Path(job.get(TMP_DIR_LABEL), relativedst); - // do the actual copy to tmpfile - long bytesCopied = doCopyFile(srcstat, tmpfile, absdst, reporter); - - if (bytesCopied != srcstat.getLen()) { - throw new IOException("File size not matched: copied " - + bytesString(bytesCopied) + " to tmpfile (=" + tmpfile - + ") but expected " + bytesString(srcstat.getLen()) - + " from " + srcstat.getPath()); - } - else { - if (destFileSys.exists(absdst) && - destFileSys.getFileStatus(absdst).isDirectory()) { - throw new IOException(absdst + " is a directory"); - } - if (!destFileSys.mkdirs(absdst.getParent())) { - throw new IOException("Failed to create parent dir: " + absdst.getParent()); - } - rename(tmpfile, absdst); - - if (!validateCopy(srcstat, absdst)) { - destFileSys.delete(absdst, false); - throw new IOException("Validation of copy of file " - + srcstat.getPath() + " failed."); - } - updateDestStatus(srcstat, destFileSys.getFileStatus(absdst)); - } - - // report at least once for each file - updateCopyStatus(srcstat, reporter); - } - - /** rename tmp to dst, delete dst if already exists */ - private void rename(Path tmp, Path dst) throws IOException { - try { - if (destFileSys.exists(dst)) { - destFileSys.delete(dst, true); - } - if (!destFileSys.rename(tmp, dst)) { - throw new IOException(); - } - } - catch(IOException cause) { - throw (IOException)new IOException("Fail to rename tmp file (=" + tmp - + ") to destination file (=" + dst + ")").initCause(cause); - } - } - - private void updateDestStatus(FileStatus src, FileStatus dst - ) throws IOException { - if (preserve_status) { - DistCpV1.updateDestStatus(src, dst, preseved, destFileSys); - } - } - - static String bytesString(long b) { - return b + " bytes (" + - TraditionalBinaryPrefix.long2String(b, "", 1) + ")"; - } - - /** - * Copies a file and validates the copy by checking the checksums. - * If validation fails, retries (max number of tries is distcp.file.retries) - * to copy the file. - */ - void copyWithRetries(FileStatus srcstat, Path relativedst, - OutputCollector, Text> out, - Reporter reporter) throws IOException { - - // max tries to copy when validation of copy fails - final int maxRetries = job.getInt(FILE_RETRIES_LABEL, DEFAULT_FILE_RETRIES); - // save update flag for later copies within the same map task - final boolean saveUpdate = update; - - int retryCnt = 1; - for (; retryCnt <= maxRetries; retryCnt++) { - try { - //copy the file and validate copy - copy(srcstat, relativedst, out, reporter); - break;// copy successful - } catch (IOException e) { - LOG.warn("Copy of " + srcstat.getPath() + " failed.", e); - if (retryCnt < maxRetries) {// copy failed and need to retry - LOG.info("Retrying copy of file " + srcstat.getPath()); - update = true; // set update flag for retries - } - else {// no more retries... Give up - update = saveUpdate; - throw new IOException("Copy of file failed even with " + retryCnt - + " tries.", e); - } - } - } - } - - /** Mapper configuration. - * Extracts source and destination file system, as well as - * top-level paths on source and destination directories. - * Gets the named file systems, to be used later in map. - */ - public void configure(JobConf job) - { - destPath = new Path(job.get(DST_DIR_LABEL, "/")); - try { - destFileSys = destPath.getFileSystem(job); - } catch (IOException ex) { - throw new RuntimeException("Unable to get the named file system.", ex); - } - sizeBuf = job.getInt("copy.buf.size", 128 * 1024); - buffer = new byte[sizeBuf]; - ignoreReadFailures = job.getBoolean(Options.IGNORE_READ_FAILURES.propertyname, false); - preserve_status = job.getBoolean(Options.PRESERVE_STATUS.propertyname, false); - if (preserve_status) { - preseved = FileAttribute.parse(job.get(PRESERVE_STATUS_LABEL)); - } - update = job.getBoolean(Options.UPDATE.propertyname, false); - overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false); - skipCRCCheck = job.getBoolean(Options.SKIPCRC.propertyname, false); - this.job = job; - } - - /** Map method. Copies one file from source file system to destination. - * @param key src len - * @param value FilePair (FileStatus src, Path dst) - * @param out Log of failed copies - * @param reporter - */ - public void map(LongWritable key, - FilePair value, - OutputCollector, Text> out, - Reporter reporter) throws IOException { - final FileStatus srcstat = value.input; - final Path relativedst = new Path(value.output); - try { - copyWithRetries(srcstat, relativedst, out, reporter); - } catch (IOException e) { - ++failcount; - reporter.incrCounter(Counter.FAIL, 1); - updateStatus(reporter); - final String sfailure = "FAIL " + relativedst + " : " + - StringUtils.stringifyException(e); - out.collect(null, new Text(sfailure)); - LOG.info(sfailure); - if (e instanceof FileNotFoundException) { - final String s = "Possible Cause for failure: Either the filesystem " - + srcstat.getPath().getFileSystem(job) - + " is not accessible or the file is deleted"; - LOG.error(s); - out.collect(null, new Text(s)); - } - - try { - for (int i = 0; i < 3; ++i) { - try { - final Path tmp = new Path(job.get(TMP_DIR_LABEL), relativedst); - if (destFileSys.delete(tmp, true)) - break; - } catch (Throwable ex) { - // ignore, we are just cleaning up - LOG.debug("Ignoring cleanup exception", ex); - } - // update status, so we don't get timed out - updateStatus(reporter); - Thread.sleep(3 * 1000); - } - } catch (InterruptedException inte) { - throw (IOException)new IOException().initCause(inte); - } - } finally { - updateStatus(reporter); - } - } - - public void close() throws IOException { - if (0 == failcount || ignoreReadFailures) { - return; - } - throw new IOException(getCountString()); - } - } - - private static List fetchFileList(Configuration conf, Path srcList) - throws IOException { - List result = new ArrayList(); - FileSystem fs = srcList.getFileSystem(conf); - try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.open(srcList), - Charset.forName("UTF-8")))) { - String line = input.readLine(); - while (line != null) { - result.add(new Path(line)); - line = input.readLine(); - } - } - return result; - } - - @Deprecated - public static void copy(Configuration conf, String srcPath, - String destPath, Path logPath, - boolean srcAsList, boolean ignoreReadFailures) - throws IOException { - final Path src = new Path(srcPath); - List tmp = new ArrayList(); - if (srcAsList) { - tmp.addAll(fetchFileList(conf, src)); - } else { - tmp.add(src); - } - EnumSet flags = ignoreReadFailures - ? EnumSet.of(Options.IGNORE_READ_FAILURES) - : EnumSet.noneOf(Options.class); - - final Path dst = new Path(destPath); - copy(conf, new Arguments(tmp, null, dst, logPath, flags, null, - Long.MAX_VALUE, Long.MAX_VALUE, null, false)); - } - - /** Sanity check for srcPath */ - private static void checkSrcPath(JobConf jobConf, List srcPaths) - throws IOException { - List rslt = new ArrayList(); - List unglobbed = new LinkedList(); - - Path[] ps = new Path[srcPaths.size()]; - ps = srcPaths.toArray(ps); - TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf); - - - for (Path p : srcPaths) { - FileSystem fs = p.getFileSystem(jobConf); - FileStatus[] inputs = fs.globStatus(p); - - if(inputs != null && inputs.length > 0) { - for (FileStatus onePath: inputs) { - unglobbed.add(onePath.getPath()); - } - } else { - rslt.add(new IOException("Input source " + p + " does not exist.")); - } - } - if (!rslt.isEmpty()) { - throw new InvalidInputException(rslt); - } - srcPaths.clear(); - srcPaths.addAll(unglobbed); - } - - /** - * Driver to copy srcPath to destPath depending on required protocol. - * @param conf configuration - * @param args arguments - */ - static void copy(final Configuration conf, final Arguments args - ) throws IOException { - LOG.info("srcPaths=" + args.srcs); - if (!args.dryrun || args.flags.contains(Options.UPDATE)) { - LOG.info("destPath=" + args.dst); - } - - JobConf job = createJobConf(conf); - - checkSrcPath(job, args.srcs); - if (args.preservedAttributes != null) { - job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes); - } - if (args.mapredSslConf != null) { - job.set("dfs.https.client.keystore.resource", args.mapredSslConf); - } - - //Initialize the mapper - try { - if (setup(conf, job, args)) { - JobClient.runJob(job); - } - if(!args.dryrun) { - finalize(conf, job, args.dst, args.preservedAttributes); - } - } finally { - if (!args.dryrun) { - //delete tmp - fullyDelete(job.get(TMP_DIR_LABEL), job); - } - //delete jobDirectory - fullyDelete(job.get(JOB_DIR_LABEL), job); - } - } - - private static void updateDestStatus(FileStatus src, FileStatus dst, - EnumSet preseved, FileSystem destFileSys - ) throws IOException { - String owner = null; - String group = null; - if (preseved.contains(FileAttribute.USER) - && !src.getOwner().equals(dst.getOwner())) { - owner = src.getOwner(); - } - if (preseved.contains(FileAttribute.GROUP) - && !src.getGroup().equals(dst.getGroup())) { - group = src.getGroup(); - } - if (owner != null || group != null) { - destFileSys.setOwner(dst.getPath(), owner, group); - } - if (preseved.contains(FileAttribute.PERMISSION) - && !src.getPermission().equals(dst.getPermission())) { - destFileSys.setPermission(dst.getPath(), src.getPermission()); - } - if (preseved.contains(FileAttribute.TIMES)) { - destFileSys.setTimes(dst.getPath(), src.getModificationTime(), src.getAccessTime()); - } - } - - static private void finalize(Configuration conf, JobConf jobconf, - final Path destPath, String presevedAttributes) throws IOException { - if (presevedAttributes == null) { - return; - } - EnumSet preseved = FileAttribute.parse(presevedAttributes); - if (!preseved.contains(FileAttribute.USER) - && !preseved.contains(FileAttribute.GROUP) - && !preseved.contains(FileAttribute.PERMISSION)) { - return; - } - - FileSystem dstfs = destPath.getFileSystem(conf); - Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL)); - try (SequenceFile.Reader in = - new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) { - Text dsttext = new Text(); - FilePair pair = new FilePair(); - for(; in.next(dsttext, pair); ) { - Path absdst = new Path(destPath, pair.output); - updateDestStatus(pair.input, dstfs.getFileStatus(absdst), - preseved, dstfs); - } - } - } - - static class Arguments { - final List srcs; - final Path basedir; - final Path dst; - final Path log; - final EnumSet flags; - final String preservedAttributes; - final long filelimit; - final long sizelimit; - final String mapredSslConf; - final boolean dryrun; - - /** - * Arguments for distcp - * @param srcs List of source paths - * @param basedir Base directory for copy - * @param dst Destination path - * @param log Log output directory - * @param flags Command-line flags - * @param preservedAttributes Preserved attributes - * @param filelimit File limit - * @param sizelimit Size limit - * @param mapredSslConf ssl configuration - * @param dryrun - */ - Arguments(List srcs, Path basedir, Path dst, Path log, - EnumSet flags, String preservedAttributes, - long filelimit, long sizelimit, String mapredSslConf, - boolean dryrun) { - this.srcs = srcs; - this.basedir = basedir; - this.dst = dst; - this.log = log; - this.flags = flags; - this.preservedAttributes = preservedAttributes; - this.filelimit = filelimit; - this.sizelimit = sizelimit; - this.mapredSslConf = mapredSslConf; - this.dryrun = dryrun; - - if (LOG.isTraceEnabled()) { - LOG.trace("this = " + this); - } - } - - static Arguments valueOf(String[] args, Configuration conf - ) throws IOException { - List srcs = new ArrayList(); - Path dst = null; - Path log = null; - Path basedir = null; - EnumSet flags = EnumSet.noneOf(Options.class); - String presevedAttributes = null; - String mapredSslConf = null; - long filelimit = Long.MAX_VALUE; - long sizelimit = Long.MAX_VALUE; - boolean dryrun = false; - - for (int idx = 0; idx < args.length; idx++) { - Options[] opt = Options.values(); - int i = 0; - for(; i < opt.length && !args[idx].startsWith(opt[i].cmd); i++); - - if (i < opt.length) { - flags.add(opt[i]); - if (opt[i] == Options.PRESERVE_STATUS) { - presevedAttributes = args[idx].substring(2); - FileAttribute.parse(presevedAttributes); //validation - } - else if (opt[i] == Options.FILE_LIMIT) { - filelimit = Options.FILE_LIMIT.parseLong(args, ++idx); - } - else if (opt[i] == Options.SIZE_LIMIT) { - sizelimit = Options.SIZE_LIMIT.parseLong(args, ++idx); - } - } else if ("-f".equals(args[idx])) { - if (++idx == args.length) { - throw new IllegalArgumentException("urilist_uri not specified in -f"); - } - srcs.addAll(fetchFileList(conf, new Path(args[idx]))); - } else if ("-log".equals(args[idx])) { - if (++idx == args.length) { - throw new IllegalArgumentException("logdir not specified in -log"); - } - log = new Path(args[idx]); - } else if ("-basedir".equals(args[idx])) { - if (++idx == args.length) { - throw new IllegalArgumentException("basedir not specified in -basedir"); - } - basedir = new Path(args[idx]); - } else if ("-mapredSslConf".equals(args[idx])) { - if (++idx == args.length) { - throw new IllegalArgumentException("ssl conf file not specified in -mapredSslConf"); - } - mapredSslConf = args[idx]; - } else if ("-dryrun".equals(args[idx])) { - dryrun = true; - dst = new Path("/tmp/distcp_dummy_dest");//dummy destination - } else if ("-m".equals(args[idx])) { - if (++idx == args.length) { - throw new IllegalArgumentException("num_maps not specified in -m"); - } - try { - conf.setInt(MAX_MAPS_LABEL, Integer.parseInt(args[idx])); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid argument to -m: " + - args[idx]); - } - } else if ('-' == args[idx].codePointAt(0)) { - throw new IllegalArgumentException("Invalid switch " + args[idx]); - } else if (idx == args.length -1 && - (!dryrun || flags.contains(Options.UPDATE))) { - dst = new Path(args[idx]); - } else { - srcs.add(new Path(args[idx])); - } - } - // mandatory command-line parameters - if (srcs.isEmpty() || dst == null) { - throw new IllegalArgumentException("Missing " - + (dst == null ? "dst path" : "src")); - } - // incompatible command-line flags - final boolean isOverwrite = flags.contains(Options.OVERWRITE); - final boolean isUpdate = flags.contains(Options.UPDATE); - final boolean isDelete = flags.contains(Options.DELETE); - final boolean skipCRC = flags.contains(Options.SKIPCRC); - if (isOverwrite && isUpdate) { - throw new IllegalArgumentException("Conflicting overwrite policies"); - } - if (!isUpdate && skipCRC) { - throw new IllegalArgumentException( - Options.SKIPCRC.cmd + " is relevant only with the " + - Options.UPDATE.cmd + " option"); - } - if (isDelete && !isOverwrite && !isUpdate) { - throw new IllegalArgumentException(Options.DELETE.cmd - + " must be specified with " + Options.OVERWRITE + " or " - + Options.UPDATE + "."); - } - return new Arguments(srcs, basedir, dst, log, flags, presevedAttributes, - filelimit, sizelimit, mapredSslConf, dryrun); - } - - /** {@inheritDoc} */ - public String toString() { - return getClass().getName() + "{" - + "\n srcs = " + srcs - + "\n dst = " + dst - + "\n log = " + log - + "\n flags = " + flags - + "\n preservedAttributes = " + preservedAttributes - + "\n filelimit = " + filelimit - + "\n sizelimit = " + sizelimit - + "\n mapredSslConf = " + mapredSslConf - + "\n}"; - } - } - - /** - * This is the main driver for recursively copying directories - * across file systems. It takes at least two cmdline parameters. A source - * URL and a destination URL. It then essentially does an "ls -lR" on the - * source URL, and writes the output in a round-robin manner to all the map - * input files. The mapper actually copies the files allotted to it. The - * reduce is empty. - */ - public int run(String[] args) { - try { - copy(conf, Arguments.valueOf(args, conf)); - return 0; - } catch (IllegalArgumentException e) { - System.err.println(StringUtils.stringifyException(e) + "\n" + usage); - ToolRunner.printGenericCommandUsage(System.err); - return -1; - } catch (DuplicationException e) { - System.err.println(StringUtils.stringifyException(e)); - return DuplicationException.ERROR_CODE; - } catch (RemoteException e) { - final IOException unwrapped = e.unwrapRemoteException( - FileNotFoundException.class, - AccessControlException.class, - QuotaExceededException.class); - System.err.println(StringUtils.stringifyException(unwrapped)); - return -3; - } catch (Exception e) { - System.err.println("With failures, global counters are inaccurate; " + - "consider running with -i"); - System.err.println("Copy failed: " + StringUtils.stringifyException(e)); - return -999; - } - } - - public static void main(String[] args) throws Exception { - JobConf job = new JobConf(DistCpV1.class); - DistCpV1 distcp = new DistCpV1(job); - int res = ToolRunner.run(distcp, args); - System.exit(res); - } - - /** - * Make a path relative with respect to a root path. - * absPath is always assumed to descend from root. - * Otherwise returned path is null. - */ - static String makeRelative(Path root, Path absPath) { - if (!absPath.isAbsolute()) { - throw new IllegalArgumentException("!absPath.isAbsolute(), absPath=" - + absPath); - } - String p = absPath.toUri().getPath(); - - StringTokenizer pathTokens = new StringTokenizer(p, "/"); - for(StringTokenizer rootTokens = new StringTokenizer( - root.toUri().getPath(), "/"); rootTokens.hasMoreTokens(); ) { - if (!rootTokens.nextToken().equals(pathTokens.nextToken())) { - return null; - } - } - StringBuilder sb = new StringBuilder(); - for(; pathTokens.hasMoreTokens(); ) { - sb.append(pathTokens.nextToken()); - if (pathTokens.hasMoreTokens()) { sb.append(Path.SEPARATOR); } - } - return sb.length() == 0? ".": sb.toString(); - } - - /** - * Calculate how many maps to run. - * Number of maps is bounded by a minimum of the cumulative size of the - * copy / (distcp.bytes.per.map, default BYTES_PER_MAP or -m on the - * command line) and at most (distcp.max.map.tasks, default - * MAX_MAPS_PER_NODE * nodes in the cluster). - * @param totalBytes Count of total bytes for job - * @param job The job to configure - * @return Count of maps to run. - */ - private static int setMapCount(long totalBytes, JobConf job) - throws IOException { - int numMaps = - (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP)); - numMaps = Math.min(numMaps, - job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE * - new JobClient(job).getClusterStatus().getTaskTrackers())); - numMaps = Math.max(numMaps, 1); - job.setNumMapTasks(numMaps); - return numMaps; - } - - /** Fully delete dir */ - static void fullyDelete(String dir, Configuration conf) throws IOException { - if (dir != null) { - Path tmp = new Path(dir); - boolean success = tmp.getFileSystem(conf).delete(tmp, true); - if (!success) { - LOG.warn("Could not fully delete " + tmp); - } - } - } - - //Job configuration - private static JobConf createJobConf(Configuration conf) { - JobConf jobconf = new JobConf(conf, DistCpV1.class); - jobconf.setJobName(conf.get("mapred.job.name", NAME)); - - // turn off speculative execution, because DFS doesn't handle - // multiple writers to the same file. - jobconf.setMapSpeculativeExecution(false); - - jobconf.setInputFormat(CopyInputFormat.class); - jobconf.setOutputKeyClass(Text.class); - jobconf.setOutputValueClass(Text.class); - - jobconf.setMapperClass(CopyFilesMapper.class); - jobconf.setNumReduceTasks(0); - return jobconf; - } - - private static final Random RANDOM = new Random(); - public static String getRandomId() { - return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36); - } - - /** - * Increase the replication factor of _distcp_src_files to - * sqrt(min(maxMapsOnCluster, numMaps)). This is to reduce the chance of - * failing of distcp because of "not having a replication of _distcp_src_files - * available for reading for some maps". - */ - private static void setReplication(Configuration conf, JobConf jobConf, - Path srcfilelist, int numMaps) throws IOException { - int numMaxMaps = new JobClient(jobConf).getClusterStatus().getMaxMapTasks(); - short replication = (short) Math.ceil( - Math.sqrt(Math.min(numMaxMaps, numMaps))); - FileSystem fs = srcfilelist.getFileSystem(conf); - FileStatus srcStatus = fs.getFileStatus(srcfilelist); - - if (srcStatus.getReplication() < replication) { - if (!fs.setReplication(srcfilelist, replication)) { - throw new IOException("Unable to increase the replication of file " + - srcfilelist); - } - } - } - - /** - * Does the dir already exist at destination ? - * @return true if the dir already exists at destination - */ - private static boolean dirExists(Configuration conf, Path dst) - throws IOException { - FileSystem destFileSys = dst.getFileSystem(conf); - FileStatus status = null; - try { - status = destFileSys.getFileStatus(dst); - }catch (FileNotFoundException e) { - return false; - } - if (status.isFile()) { - throw new FileAlreadyExistsException("Not a dir: " + dst+" is a file."); - } - return true; - } - - /** - * Initialize DFSCopyFileMapper specific job-configuration. - * @param conf : The dfs/mapred configuration. - * @param jobConf : The handle to the jobConf object to be initialized. - * @param args Arguments - * @return true if it is necessary to launch a job. - */ - static boolean setup(Configuration conf, JobConf jobConf, - final Arguments args) - throws IOException { - jobConf.set(DST_DIR_LABEL, args.dst.toUri().toString()); - - //set boolean values - final boolean update = args.flags.contains(Options.UPDATE); - final boolean skipCRCCheck = args.flags.contains(Options.SKIPCRC); - final boolean overwrite = !update && args.flags.contains(Options.OVERWRITE) - && !args.dryrun; - jobConf.setBoolean(Options.UPDATE.propertyname, update); - jobConf.setBoolean(Options.SKIPCRC.propertyname, skipCRCCheck); - jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite); - jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname, - args.flags.contains(Options.IGNORE_READ_FAILURES)); - jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname, - args.flags.contains(Options.PRESERVE_STATUS)); - - final String randomId = getRandomId(); - JobClient jClient = new JobClient(jobConf); - Path stagingArea; - try { - stagingArea = - JobSubmissionFiles.getStagingDir(jClient.getClusterHandle(), conf); - } catch (InterruptedException ie) { - throw new IOException(ie); - } - - Path jobDirectory = new Path(stagingArea + NAME + "_" + randomId); - FsPermission mapredSysPerms = - new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); - FileSystem.mkdirs(jClient.getFs(), jobDirectory, mapredSysPerms); - jobConf.set(JOB_DIR_LABEL, jobDirectory.toString()); - - long maxBytesPerMap = conf.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP); - - FileSystem dstfs = args.dst.getFileSystem(conf); - - // get tokens for all the required FileSystems.. - TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), - new Path[] {args.dst}, conf); - - - boolean dstExists = dstfs.exists(args.dst); - boolean dstIsDir = false; - if (dstExists) { - dstIsDir = dstfs.getFileStatus(args.dst).isDirectory(); - } - - // default logPath - Path logPath = args.log; - if (logPath == null) { - String filename = "_distcp_logs_" + randomId; - if (!dstExists || !dstIsDir) { - Path parent = args.dst.getParent(); - if (null == parent) { - // If dst is '/' on S3, it might not exist yet, but dst.getParent() - // will return null. In this case, use '/' as its own parent to prevent - // NPE errors below. - parent = args.dst; - } - if (!dstfs.exists(parent)) { - dstfs.mkdirs(parent); - } - logPath = new Path(parent, filename); - } else { - logPath = new Path(args.dst, filename); - } - } - FileOutputFormat.setOutputPath(jobConf, logPath); - - // create src list, dst list - FileSystem jobfs = jobDirectory.getFileSystem(jobConf); - - Path srcfilelist = new Path(jobDirectory, "_distcp_src_files"); - Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files"); - Path dstdirlist = new Path(jobDirectory, "_distcp_dst_dirs"); - jobConf.set(SRC_LIST_LABEL, srcfilelist.toString()); - jobConf.set(DST_DIR_LIST_LABEL, dstdirlist.toString()); - int srcCount = 0, cnsyncf = 0, dirsyn = 0; - long fileCount = 0L, dirCount = 0L, byteCount = 0L, cbsyncs = 0L, - skipFileCount = 0L, skipByteCount = 0L; - try ( - SequenceFile.Writer src_writer = SequenceFile.createWriter(jobConf, - Writer.file(srcfilelist), Writer.keyClass(LongWritable.class), - Writer.valueClass(FilePair.class), Writer.compression( - SequenceFile.CompressionType.NONE)); - SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobConf, - Writer.file(dstfilelist), Writer.keyClass(Text.class), - Writer.valueClass(Text.class), Writer.compression( - SequenceFile.CompressionType.NONE)); - SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobConf, - Writer.file(dstdirlist), Writer.keyClass(Text.class), - Writer.valueClass(FilePair.class), Writer.compression( - SequenceFile.CompressionType.NONE)); - ) { - // handle the case where the destination directory doesn't exist - // and we've only a single src directory OR we're updating/overwriting - // the contents of the destination directory. - final boolean special = - (args.srcs.size() == 1 && !dstExists) || update || overwrite; - - Path basedir = null; - HashSet parentDirsToCopy = new HashSet(); - if (args.basedir != null) { - FileSystem basefs = args.basedir.getFileSystem(conf); - basedir = args.basedir.makeQualified( - basefs.getUri(), basefs.getWorkingDirectory()); - if (!basefs.isDirectory(basedir)) { - throw new IOException("Basedir " + basedir + " is not a directory."); - } - } - - for(Iterator srcItr = args.srcs.iterator(); srcItr.hasNext(); ) { - final Path src = srcItr.next(); - FileSystem srcfs = src.getFileSystem(conf); - FileStatus srcfilestat = srcfs.getFileStatus(src); - Path root = special && srcfilestat.isDirectory()? src: src.getParent(); - if (dstExists && !dstIsDir && - (args.srcs.size() > 1 || srcfilestat.isDirectory())) { - // destination should not be a file - throw new IOException("Destination " + args.dst + " should be a dir" + - " if multiple source paths are there OR if" + - " the source path is a dir"); - } - - if (basedir != null) { - root = basedir; - Path parent = src.getParent().makeQualified( - srcfs.getUri(), srcfs.getWorkingDirectory()); - while (parent != null && !parent.equals(basedir)) { - if (!parentDirsToCopy.contains(parent)){ - parentDirsToCopy.add(parent); - String dst = makeRelative(root, parent); - FileStatus pst = srcfs.getFileStatus(parent); - src_writer.append(new LongWritable(0), new FilePair(pst, dst)); - dst_writer.append(new Text(dst), new Text(parent.toString())); - dir_writer.append(new Text(dst), new FilePair(pst, dst)); - if (++dirsyn > SYNC_FILE_MAX) { - dirsyn = 0; - dir_writer.sync(); - } - } - parent = parent.getParent(); - } - - if (parent == null) { - throw new IOException("Basedir " + basedir + - " is not a prefix of source path " + src); - } - } - - if (srcfilestat.isDirectory()) { - ++srcCount; - final String dst = makeRelative(root,src); - if (!update || !dirExists(conf, new Path(args.dst, dst))) { - ++dirCount; - src_writer.append(new LongWritable(0), - new FilePair(srcfilestat, dst)); - } - dst_writer.append(new Text(dst), new Text(src.toString())); - } - - Stack pathstack = new Stack(); - for(pathstack.push(srcfilestat); !pathstack.empty(); ) { - FileStatus cur = pathstack.pop(); - FileStatus[] children = srcfs.listStatus(cur.getPath()); - for(int i = 0; i < children.length; i++) { - boolean skipPath = false; - final FileStatus child = children[i]; - final String dst = makeRelative(root, child.getPath()); - ++srcCount; - - if (child.isDirectory()) { - pathstack.push(child); - if (!update || !dirExists(conf, new Path(args.dst, dst))) { - ++dirCount; - } - else { - skipPath = true; // skip creating dir at destination - } - } - else { - Path destPath = new Path(args.dst, dst); - if (cur.isFile() && (args.srcs.size() == 1)) { - // Copying a single file; use dst path provided by user as - // destination file rather than destination directory - Path dstparent = destPath.getParent(); - FileSystem destFileSys = destPath.getFileSystem(jobConf); - if (!(destFileSys.exists(dstparent) && - destFileSys.getFileStatus(dstparent).isDirectory())) { - destPath = dstparent; - } - } - //skip path if the src and the dst files are the same. - skipPath = update && - sameFile(srcfs, child, dstfs, destPath, skipCRCCheck); - //skip path if it exceed file limit or size limit - skipPath |= fileCount == args.filelimit - || byteCount + child.getLen() > args.sizelimit; - - if (!skipPath) { - ++fileCount; - byteCount += child.getLen(); - - if (LOG.isTraceEnabled()) { - LOG.trace("adding file " + child.getPath()); - } - - ++cnsyncf; - cbsyncs += child.getLen(); - if (cnsyncf > SYNC_FILE_MAX || cbsyncs > maxBytesPerMap) { - src_writer.sync(); - dst_writer.sync(); - cnsyncf = 0; - cbsyncs = 0L; - } - } - else { - ++skipFileCount; - skipByteCount += child.getLen(); - if (LOG.isTraceEnabled()) { - LOG.trace("skipping file " + child.getPath()); - } - } - } - - if (!skipPath) { - src_writer.append(new LongWritable(child.isDirectory()? 0: child.getLen()), - new FilePair(child, dst)); - } - - dst_writer.append(new Text(dst), - new Text(child.getPath().toString())); - } - - if (cur.isDirectory()) { - String dst = makeRelative(root, cur.getPath()); - dir_writer.append(new Text(dst), new FilePair(cur, dst)); - if (++dirsyn > SYNC_FILE_MAX) { - dirsyn = 0; - dir_writer.sync(); - } - } - } - } - } - LOG.info("sourcePathsCount(files+directories)=" + srcCount); - LOG.info("filesToCopyCount=" + fileCount); - LOG.info("bytesToCopyCount=" + - TraditionalBinaryPrefix.long2String(byteCount, "", 1)); - if (update) { - LOG.info("filesToSkipCopyCount=" + skipFileCount); - LOG.info("bytesToSkipCopyCount=" + - TraditionalBinaryPrefix.long2String(skipByteCount, "", 1)); - } - if (args.dryrun) { - return false; - } - int mapCount = setMapCount(byteCount, jobConf); - // Increase the replication of _distcp_src_files, if needed - setReplication(conf, jobConf, srcfilelist, mapCount); - - FileStatus dststatus = null; - try { - dststatus = dstfs.getFileStatus(args.dst); - } catch(FileNotFoundException fnfe) { - LOG.info(args.dst + " does not exist."); - } - - // create dest path dir if copying > 1 file - if (dststatus == null) { - if (srcCount > 1 && !dstfs.mkdirs(args.dst)) { - throw new IOException("Failed to create" + args.dst); - } - } - - final Path sorted = new Path(jobDirectory, "_distcp_sorted"); - checkDuplication(jobfs, dstfilelist, sorted, conf); - - if (dststatus != null && args.flags.contains(Options.DELETE)) { - long deletedPathsCount = deleteNonexisting(dstfs, dststatus, sorted, - jobfs, jobDirectory, jobConf, conf); - LOG.info("deletedPathsFromDestCount(files+directories)=" + - deletedPathsCount); - } - - Path tmpDir = new Path( - (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)? - args.dst.getParent(): args.dst, "_distcp_tmp_" + randomId); - jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString()); - - // Explicitly create the tmpDir to ensure that it can be cleaned - // up by fullyDelete() later. - tmpDir.getFileSystem(conf).mkdirs(tmpDir); - - LOG.info("sourcePathsCount=" + srcCount); - LOG.info("filesToCopyCount=" + fileCount); - LOG.info("bytesToCopyCount=" + - TraditionalBinaryPrefix.long2String(byteCount, "", 1)); - jobConf.setInt(SRC_COUNT_LABEL, srcCount); - jobConf.setLong(TOTAL_SIZE_LABEL, byteCount); - - return (fileCount + dirCount) > 0; - } - - /** - * Check whether the contents of src and dst are the same. - * - * Return false if dstpath does not exist - * - * If the files have different sizes, return false. - * - * If the files have the same sizes, the file checksums will be compared. - * - * When file checksum is not supported in any of file systems, - * two files are considered as the same if they have the same size. - */ - static private boolean sameFile(FileSystem srcfs, FileStatus srcstatus, - FileSystem dstfs, Path dstpath, boolean skipCRCCheck) throws IOException { - FileStatus dststatus; - try { - dststatus = dstfs.getFileStatus(dstpath); - } catch(FileNotFoundException fnfe) { - return false; - } - - //same length? - if (srcstatus.getLen() != dststatus.getLen()) { - return false; - } - - if (skipCRCCheck) { - LOG.debug("Skipping the CRC check"); - return true; - } - - //get src checksum - final FileChecksum srccs; - try { - srccs = srcfs.getFileChecksum(srcstatus.getPath()); - } catch(FileNotFoundException fnfe) { - /* - * Two possible cases: - * (1) src existed once but was deleted between the time period that - * srcstatus was obtained and the try block above. - * (2) srcfs does not support file checksum and (incorrectly) throws - * FNFE, e.g. some previous versions of HftpFileSystem. - * For case (1), it is okay to return true since src was already deleted. - * For case (2), true should be returned. - */ - return true; - } - - //compare checksums - try { - final FileChecksum dstcs = dstfs.getFileChecksum(dststatus.getPath()); - //return true if checksum is not supported - //(i.e. some of the checksums is null) - return srccs == null || dstcs == null || srccs.equals(dstcs); - } catch(FileNotFoundException fnfe) { - return false; - } - } - - /** - * Delete the dst files/dirs which do not exist in src - * - * @return total count of files and directories deleted from destination - * @throws IOException - */ - static private long deleteNonexisting( - FileSystem dstfs, FileStatus dstroot, Path dstsorted, - FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf - ) throws IOException { - if (dstroot.isFile()) { - throw new IOException("dst must be a directory when option " - + Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath() - + ") is not a directory."); - } - - //write dst lsr results - final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr"); - try (final SequenceFile.Writer writer = SequenceFile.createWriter(jobconf, - Writer.file(dstlsr), Writer.keyClass(Text.class), - Writer.valueClass(NullWritable.class), Writer.compression( - SequenceFile.CompressionType.NONE))) { - //do lsr to get all file statuses in dstroot - final Stack lsrstack = new Stack(); - for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) { - final FileStatus status = lsrstack.pop(); - if (status.isDirectory()) { - for(FileStatus child : dstfs.listStatus(status.getPath())) { - String relative = makeRelative(dstroot.getPath(), child.getPath()); - writer.append(new Text(relative), NullWritable.get()); - lsrstack.push(child); - } - } - } - } - - //sort lsr results - final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted"); - SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs, - new Text.Comparator(), Text.class, NullWritable.class, jobconf); - sorter.sort(dstlsr, sortedlsr); - - //compare lsr list and dst list - long deletedPathsCount = 0; - try (SequenceFile.Reader lsrin = - new SequenceFile.Reader(jobconf, Reader.file(sortedlsr)); - SequenceFile.Reader dstin = - new SequenceFile.Reader(jobconf, Reader.file(dstsorted))) { - //compare sorted lsr list and sorted dst list - final Text lsrpath = new Text(); - final Text dstpath = new Text(); - final Text dstfrom = new Text(); - final Trash trash = new Trash(dstfs, conf); - Path lastpath = null; - - boolean hasnext = dstin.next(dstpath, dstfrom); - while (lsrin.next(lsrpath, NullWritable.get())) { - int dst_cmp_lsr = dstpath.compareTo(lsrpath); - while (hasnext && dst_cmp_lsr < 0) { - hasnext = dstin.next(dstpath, dstfrom); - dst_cmp_lsr = dstpath.compareTo(lsrpath); - } - - if (dst_cmp_lsr == 0) { - //lsrpath exists in dst, skip it - hasnext = dstin.next(dstpath, dstfrom); - } else { - //lsrpath does not exist, delete it - final Path rmpath = new Path(dstroot.getPath(), lsrpath.toString()); - ++deletedPathsCount; - if ((lastpath == null || !isAncestorPath(lastpath, rmpath))) { - if (!(trash.moveToTrash(rmpath) || dstfs.delete(rmpath, true))) { - throw new IOException("Failed to delete " + rmpath); - } - lastpath = rmpath; - } - } - } - } - return deletedPathsCount; - } - - //is x an ancestor path of y? - static private boolean isAncestorPath(Path xp, Path yp) { - final String x = xp.toString(); - final String y = yp.toString(); - if (!y.startsWith(x)) { - return false; - } - final int len = x.length(); - return y.length() == len || y.charAt(len) == Path.SEPARATOR_CHAR; - } - - /** Check whether the file list have duplication. */ - static private void checkDuplication(FileSystem fs, Path file, Path sorted, - Configuration conf) throws IOException { - SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, - new Text.Comparator(), Text.class, Text.class, conf); - sorter.sort(file, sorted); - try (SequenceFile.Reader in = - new SequenceFile.Reader(conf, Reader.file(sorted))) { - Text prevdst = null, curdst = new Text(); - Text prevsrc = null, cursrc = new Text(); - for(; in.next(curdst, cursrc); ) { - if (prevdst != null && curdst.equals(prevdst)) { - throw new DuplicationException( - "Invalid input, there are duplicated files in the sources: " - + prevsrc + ", " + cursrc); - } - prevdst = curdst; - curdst = new Text(); - prevsrc = cursrc; - cursrc = new Text(); - } - } - } - - /** An exception class for duplicated source files. */ - public static class DuplicationException extends IOException { - private static final long serialVersionUID = 1L; - /** Error code for this exception */ - public static final int ERROR_CODE = -2; - DuplicationException(String message) {super(message);} - } -} diff --git a/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/Logalyzer.java b/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/Logalyzer.java deleted file mode 100644 index 05e6e2419d..0000000000 --- a/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/Logalyzer.java +++ /dev/null @@ -1,329 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.tools; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.Random; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configuration.DeprecationDelta; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; -import org.apache.hadoop.mapred.lib.LongSumReducer; -import org.apache.hadoop.mapreduce.lib.map.RegexMapper; - -/** - * Logalyzer: A utility tool for archiving and analyzing hadoop logs. - *

- * This tool supports archiving and anaylzing (sort/grep) of log-files. - * It takes as input - * a) Input uri which will serve uris of the logs to be archived. - * b) Output directory (not mandatory). - * b) Directory on dfs to archive the logs. - * c) The sort/grep patterns for analyzing the files and separator for boundaries. - * Usage: - * Logalyzer -archive -archiveDir <directory to archive logs> -analysis - * <directory> -logs <log-list uri> -grep <pattern> -sort - * <col1, col2> -separator <separator> - *

- */ -@Deprecated -public class Logalyzer { - // Constants - private static Configuration fsConfig = new Configuration(); - public static final String SORT_COLUMNS = - "logalizer.logcomparator.sort.columns"; - public static final String COLUMN_SEPARATOR = - "logalizer.logcomparator.column.separator"; - - static { - Configuration.addDeprecations(new DeprecationDelta[] { - new DeprecationDelta("mapred.reducer.sort", SORT_COLUMNS), - new DeprecationDelta("mapred.reducer.separator", COLUMN_SEPARATOR) - }); - } - - /** A {@link Mapper} that extracts text matching a regular expression. */ - public static class LogRegexMapper - extends MapReduceBase - implements Mapper { - - private Pattern pattern; - - public void configure(JobConf job) { - pattern = Pattern.compile(job.get(RegexMapper.PATTERN)); - } - - public void map(K key, Text value, - OutputCollector output, - Reporter reporter) - throws IOException { - String text = value.toString(); - Matcher matcher = pattern.matcher(text); - while (matcher.find()) { - output.collect(value, new LongWritable(1)); - } - } - - } - - /** A WritableComparator optimized for UTF8 keys of the logs. */ - public static class LogComparator extends Text.Comparator implements Configurable { - - private static Log LOG = LogFactory.getLog(Logalyzer.class); - private JobConf conf = null; - private String[] sortSpec = null; - private String columnSeparator = null; - - public void setConf(Configuration conf) { - if (conf instanceof JobConf) { - this.conf = (JobConf) conf; - } else { - this.conf = new JobConf(conf); - } - - //Initialize the specification for *comparision* - String sortColumns = this.conf.get(SORT_COLUMNS, null); - if (sortColumns != null) { - sortSpec = sortColumns.split(","); - } - - //Column-separator - columnSeparator = this.conf.get(COLUMN_SEPARATOR, ""); - } - - public Configuration getConf() { - return conf; - } - - public int compare(byte[] b1, int s1, int l1, - byte[] b2, int s2, int l2) { - - if (sortSpec == null) { - return super.compare(b1, s1, l1, b2, s2, l2); - } - - try { - Text logline1 = new Text(); - logline1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); - String line1 = logline1.toString(); - String[] logColumns1 = line1.split(columnSeparator); - - Text logline2 = new Text(); - logline2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); - String line2 = logline2.toString(); - String[] logColumns2 = line2.split(columnSeparator); - - if (logColumns1 == null || logColumns2 == null) { - return super.compare(b1, s1, l1, b2, s2, l2); - } - - //Compare column-wise according to *sortSpec* - for(int i=0; i < sortSpec.length; ++i) { - int column = Integer.parseInt(sortSpec[i]); - String c1 = logColumns1[column]; - String c2 = logColumns2[column]; - - //Compare columns - int comparision = super.compareBytes( - c1.getBytes(Charset.forName("UTF-8")), 0, c1.length(), - c2.getBytes(Charset.forName("UTF-8")), 0, c2.length() - ); - - //They differ! - if (comparision != 0) { - return comparision; - } - } - - } catch (IOException ioe) { - LOG.fatal("Caught " + ioe); - return 0; - } - - return 0; - } - - static { - // register this comparator - WritableComparator.define(Text.class, new LogComparator()); - } - } - - /** - * doArchive: Workhorse function to archive log-files. - * @param logListURI : The uri which will serve list of log-files to archive. - * @param archiveDirectory : The directory to store archived logfiles. - * @throws IOException - */ - @SuppressWarnings("deprecation") - public void - doArchive(String logListURI, String archiveDirectory) - throws IOException - { - String destURL = FileSystem.getDefaultUri(fsConfig) + archiveDirectory; - DistCpV1.copy(new JobConf(fsConfig), logListURI, destURL, null, true, false); - } - - /** - * doAnalyze: - * @param inputFilesDirectory : Directory containing the files to be analyzed. - * @param outputDirectory : Directory to store analysis (output). - * @param grepPattern : Pattern to *grep* for. - * @param sortColumns : Sort specification for output. - * @param columnSeparator : Column separator. - * @throws IOException - */ - public void - doAnalyze(String inputFilesDirectory, String outputDirectory, - String grepPattern, String sortColumns, String columnSeparator) - throws IOException - { - Path grepInput = new Path(inputFilesDirectory); - - Path analysisOutput = null; - if (outputDirectory.equals("")) { - analysisOutput = new Path(inputFilesDirectory, "logalyzer_" + - Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - } else { - analysisOutput = new Path(outputDirectory); - } - - JobConf grepJob = new JobConf(fsConfig); - grepJob.setJobName("logalyzer-grep-sort"); - - FileInputFormat.setInputPaths(grepJob, grepInput); - grepJob.setInputFormat(TextInputFormat.class); - - grepJob.setMapperClass(LogRegexMapper.class); - grepJob.set(RegexMapper.PATTERN, grepPattern); - grepJob.set(SORT_COLUMNS, sortColumns); - grepJob.set(COLUMN_SEPARATOR, columnSeparator); - - grepJob.setCombinerClass(LongSumReducer.class); - grepJob.setReducerClass(LongSumReducer.class); - - FileOutputFormat.setOutputPath(grepJob, analysisOutput); - grepJob.setOutputFormat(TextOutputFormat.class); - grepJob.setOutputKeyClass(Text.class); - grepJob.setOutputValueClass(LongWritable.class); - grepJob.setOutputKeyComparatorClass(LogComparator.class); - - grepJob.setNumReduceTasks(1); // write a single file - - JobClient.runJob(grepJob); - } - - public static void main(String[] args) { - - Log LOG = LogFactory.getLog(Logalyzer.class); - - String version = "Logalyzer.0.0.1"; - String usage = "Usage: Logalyzer [-archive -logs ] " + - "-archiveDir " + - "-grep -sort -separator " + - "-analysis "; - - System.out.println(version); - if (args.length == 0) { - System.err.println(usage); - System.exit(-1); - } - - //Command line arguments - boolean archive = false; - boolean grep = false; - boolean sort = false; - - String archiveDir = ""; - String logListURI = ""; - String grepPattern = ".*"; - String sortColumns = ""; - String columnSeparator = " "; - String outputDirectory = ""; - - for (int i = 0; i < args.length; i++) { // parse command line - if (args[i].equals("-archive")) { - archive = true; - } else if (args[i].equals("-archiveDir")) { - archiveDir = args[++i]; - } else if (args[i].equals("-grep")) { - grep = true; - grepPattern = args[++i]; - } else if (args[i].equals("-logs")) { - logListURI = args[++i]; - } else if (args[i].equals("-sort")) { - sort = true; - sortColumns = args[++i]; - } else if (args[i].equals("-separator")) { - columnSeparator = args[++i]; - } else if (args[i].equals("-analysis")) { - outputDirectory = args[++i]; - } - } - - LOG.info("analysisDir = " + outputDirectory); - LOG.info("archiveDir = " + archiveDir); - LOG.info("logListURI = " + logListURI); - LOG.info("grepPattern = " + grepPattern); - LOG.info("sortColumns = " + sortColumns); - LOG.info("separator = " + columnSeparator); - - try { - Logalyzer logalyzer = new Logalyzer(); - - // Archive? - if (archive) { - logalyzer.doArchive(logListURI, archiveDir); - } - - // Analyze? - if (grep || sort) { - logalyzer.doAnalyze(archiveDir, outputDirectory, grepPattern, sortColumns, columnSeparator); - } - } catch (IOException ioe) { - ioe.printStackTrace(); - System.exit(-1); - } - - } //main - -} //class Logalyzer diff --git a/hadoop-tools/hadoop-extras/src/test/java/org/apache/hadoop/tools/TestCopyFiles.java b/hadoop-tools/hadoop-extras/src/test/java/org/apache/hadoop/tools/TestCopyFiles.java deleted file mode 100644 index 20b8ee293e..0000000000 --- a/hadoop-tools/hadoop-extras/src/test/java/org/apache/hadoop/tools/TestCopyFiles.java +++ /dev/null @@ -1,1077 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.tools; - -import java.io.ByteArrayOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.PrintStream; -import java.net.URI; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.StringTokenizer; - -import junit.framework.TestCase; - -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsShell; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSCluster.Builder; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.ToolRunner; -import org.apache.log4j.Level; - - -/** - * A JUnit test for copying files recursively. - */ -@SuppressWarnings("deprecation") -public class TestCopyFiles extends TestCase { - { - ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange") - ).getLogger().setLevel(Level.ERROR); - ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ERROR); - ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ERROR); - ((Log4JLogger)DistCpV1.LOG).getLogger().setLevel(Level.ALL); - } - - static final URI LOCAL_FS = URI.create("file:///"); - - private static final Random RAN = new Random(); - private static final int NFILES = 20; - private static String TEST_ROOT_DIR = - new Path(System.getProperty("test.build.data","/tmp")) - .toString().replace(' ', '+'); - - /** class MyFile contains enough information to recreate the contents of - * a single file. - */ - private static class MyFile { - private static Random gen = new Random(); - private static final int MAX_LEVELS = 3; - private static final int MAX_SIZE = 8*1024; - private static String[] dirNames = { - "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" - }; - private final String name; - private int size = 0; - private long seed = 0L; - - MyFile() { - this(gen.nextInt(MAX_LEVELS)); - } - MyFile(int nLevels) { - String xname = ""; - if (nLevels != 0) { - int[] levels = new int[nLevels]; - for (int idx = 0; idx < nLevels; idx++) { - levels[idx] = gen.nextInt(10); - } - StringBuffer sb = new StringBuffer(); - for (int idx = 0; idx < nLevels; idx++) { - sb.append(dirNames[levels[idx]]); - sb.append("/"); - } - xname = sb.toString(); - } - long fidx = gen.nextLong() & Long.MAX_VALUE; - name = xname + Long.toString(fidx); - reset(); - } - void reset() { - final int oldsize = size; - do { size = gen.nextInt(MAX_SIZE); } while (oldsize == size); - final long oldseed = seed; - do { seed = gen.nextLong() & Long.MAX_VALUE; } while (oldseed == seed); - } - String getName() { return name; } - int getSize() { return size; } - long getSeed() { return seed; } - } - - private static MyFile[] createFiles(URI fsname, String topdir) - throws IOException { - return createFiles(FileSystem.get(fsname, new Configuration()), topdir); - } - - /** create NFILES with random names and directory hierarchies - * with random (but reproducible) data in them. - */ - private static MyFile[] createFiles(FileSystem fs, String topdir) - throws IOException { - Path root = new Path(topdir); - MyFile[] files = new MyFile[NFILES]; - for (int i = 0; i < NFILES; i++) { - files[i] = createFile(root, fs); - } - return files; - } - - static MyFile createFile(Path root, FileSystem fs, int levels) - throws IOException { - MyFile f = levels < 0 ? new MyFile() : new MyFile(levels); - Path p = new Path(root, f.getName()); - FSDataOutputStream out = fs.create(p); - byte[] toWrite = new byte[f.getSize()]; - new Random(f.getSeed()).nextBytes(toWrite); - out.write(toWrite); - out.close(); - FileSystem.LOG.info("created: " + p + ", size=" + f.getSize()); - return f; - } - - static MyFile createFile(Path root, FileSystem fs) throws IOException { - return createFile(root, fs, -1); - } - - private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files - ) throws IOException { - return checkFiles(fs, topdir, files, false); - } - - private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files, - boolean existingOnly) throws IOException { - Path root = new Path(topdir); - - for (int idx = 0; idx < files.length; idx++) { - Path fPath = new Path(root, files[idx].getName()); - try { - fs.getFileStatus(fPath); - FSDataInputStream in = fs.open(fPath); - byte[] toRead = new byte[files[idx].getSize()]; - byte[] toCompare = new byte[files[idx].getSize()]; - Random rb = new Random(files[idx].getSeed()); - rb.nextBytes(toCompare); - assertEquals("Cannnot read file.", toRead.length, in.read(toRead)); - in.close(); - for (int i = 0; i < toRead.length; i++) { - if (toRead[i] != toCompare[i]) { - return false; - } - } - toRead = null; - toCompare = null; - } - catch(FileNotFoundException fnfe) { - if (!existingOnly) { - throw fnfe; - } - } - } - - return true; - } - - private static void updateFiles(FileSystem fs, String topdir, MyFile[] files, - int nupdate) throws IOException { - assert nupdate <= NFILES; - - Path root = new Path(topdir); - - for (int idx = 0; idx < nupdate; ++idx) { - Path fPath = new Path(root, files[idx].getName()); - // overwrite file - assertTrue(fPath.toString() + " does not exist", fs.exists(fPath)); - FSDataOutputStream out = fs.create(fPath); - files[idx].reset(); - byte[] toWrite = new byte[files[idx].getSize()]; - Random rb = new Random(files[idx].getSeed()); - rb.nextBytes(toWrite); - out.write(toWrite); - out.close(); - } - } - - private static FileStatus[] getFileStatus(FileSystem fs, - String topdir, MyFile[] files) throws IOException { - return getFileStatus(fs, topdir, files, false); - } - private static FileStatus[] getFileStatus(FileSystem fs, - String topdir, MyFile[] files, boolean existingOnly) throws IOException { - Path root = new Path(topdir); - List statuses = new ArrayList(); - for (int idx = 0; idx < NFILES; ++idx) { - try { - statuses.add(fs.getFileStatus(new Path(root, files[idx].getName()))); - } catch(FileNotFoundException fnfe) { - if (!existingOnly) { - throw fnfe; - } - } - } - return statuses.toArray(new FileStatus[statuses.size()]); - } - - private static boolean checkUpdate(FileSystem fs, FileStatus[] old, - String topdir, MyFile[] upd, final int nupdate) throws IOException { - Path root = new Path(topdir); - - // overwrote updated files - for (int idx = 0; idx < nupdate; ++idx) { - final FileStatus stat = - fs.getFileStatus(new Path(root, upd[idx].getName())); - if (stat.getModificationTime() <= old[idx].getModificationTime()) { - return false; - } - } - // did not overwrite files not updated - for (int idx = nupdate; idx < NFILES; ++idx) { - final FileStatus stat = - fs.getFileStatus(new Path(root, upd[idx].getName())); - if (stat.getModificationTime() != old[idx].getModificationTime()) { - return false; - } - } - return true; - } - - /** delete directory and everything underneath it.*/ - private static void deldir(FileSystem fs, String topdir) throws IOException { - fs.delete(new Path(topdir), true); - } - - /** copy files from local file system to local file system */ - @SuppressWarnings("deprecation") - public void testCopyFromLocalToLocal() throws Exception { - Configuration conf = new Configuration(); - FileSystem localfs = FileSystem.get(LOCAL_FS, conf); - MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat"); - ToolRunner.run(new DistCpV1(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files)); - deldir(localfs, TEST_ROOT_DIR+"/destdat"); - deldir(localfs, TEST_ROOT_DIR+"/srcdat"); - } - - /** copy files from dfs file system to dfs file system */ - @SuppressWarnings("deprecation") - public void testCopyFromDfsToDfs() throws Exception { - String namenode = null; - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - final FileSystem hdfs = cluster.getFileSystem(); - namenode = FileSystem.getDefaultUri(conf).toString(); - if (namenode.startsWith("hdfs://")) { - MyFile[] files = createFiles(URI.create(namenode), "/srcdat"); - ToolRunner.run(new DistCpV1(conf), new String[] { - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(hdfs, "/destdat", files)); - FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf); - assertTrue("Log directory does not exist.", - fs.exists(new Path(namenode+"/logs"))); - deldir(hdfs, "/destdat"); - deldir(hdfs, "/srcdat"); - deldir(hdfs, "/logs"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - /** copy empty directory on dfs file system */ - @SuppressWarnings("deprecation") - public void testEmptyDir() throws Exception { - String namenode = null; - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - final FileSystem hdfs = cluster.getFileSystem(); - namenode = FileSystem.getDefaultUri(conf).toString(); - if (namenode.startsWith("hdfs://")) { - - FileSystem fs = FileSystem.get(URI.create(namenode), new Configuration()); - fs.mkdirs(new Path("/empty")); - - ToolRunner.run(new DistCpV1(conf), new String[] { - "-log", - namenode+"/logs", - namenode+"/empty", - namenode+"/dest"}); - fs = FileSystem.get(URI.create(namenode+"/destdat"), conf); - assertTrue("Destination directory does not exist.", - fs.exists(new Path(namenode+"/dest"))); - deldir(hdfs, "/dest"); - deldir(hdfs, "/empty"); - deldir(hdfs, "/logs"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - /** copy files from local file system to dfs file system */ - @SuppressWarnings("deprecation") - public void testCopyFromLocalToDfs() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - final FileSystem hdfs = cluster.getFileSystem(); - final String namenode = hdfs.getUri().toString(); - if (namenode.startsWith("hdfs://")) { - MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat"); - ToolRunner.run(new DistCpV1(conf), new String[] { - "-log", - namenode+"/logs", - "file:///"+TEST_ROOT_DIR+"/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(cluster.getFileSystem(), "/destdat", files)); - assertTrue("Log directory does not exist.", - hdfs.exists(new Path(namenode+"/logs"))); - deldir(hdfs, "/destdat"); - deldir(hdfs, "/logs"); - deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - /** copy files from dfs file system to local file system */ - @SuppressWarnings("deprecation") - public void testCopyFromDfsToLocal() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - final FileSystem localfs = FileSystem.get(LOCAL_FS, conf); - cluster = new MiniDFSCluster.Builder(conf).build(); - final FileSystem hdfs = cluster.getFileSystem(); - final String namenode = FileSystem.getDefaultUri(conf).toString(); - if (namenode.startsWith("hdfs://")) { - MyFile[] files = createFiles(URI.create(namenode), "/srcdat"); - ToolRunner.run(new DistCpV1(conf), new String[] { - "-log", - "/logs", - namenode+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files)); - assertTrue("Log directory does not exist.", - hdfs.exists(new Path("/logs"))); - deldir(localfs, TEST_ROOT_DIR+"/destdat"); - deldir(hdfs, "/logs"); - deldir(hdfs, "/srcdat"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - @SuppressWarnings("deprecation") - public void testCopyDfsToDfsUpdateOverwrite() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - final FileSystem hdfs = cluster.getFileSystem(); - final String namenode = hdfs.getUri().toString(); - if (namenode.startsWith("hdfs://")) { - MyFile[] files = createFiles(URI.create(namenode), "/srcdat"); - ToolRunner.run(new DistCpV1(conf), new String[] { - "-p", - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(hdfs, "/destdat", files)); - FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf); - assertTrue("Log directory does not exist.", - fs.exists(new Path(namenode+"/logs"))); - - FileStatus[] dchkpoint = getFileStatus(hdfs, "/destdat", files); - final int nupdate = NFILES>>2; - updateFiles(cluster.getFileSystem(), "/srcdat", files, nupdate); - deldir(hdfs, "/logs"); - - ToolRunner.run(new DistCpV1(conf), new String[] { - "-prbugp", // no t to avoid preserving mod. times - "-update", - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(hdfs, "/destdat", files)); - assertTrue("Update failed to replicate all changes in src", - checkUpdate(hdfs, dchkpoint, "/destdat", files, nupdate)); - - deldir(hdfs, "/logs"); - ToolRunner.run(new DistCpV1(conf), new String[] { - "-prbugp", // no t to avoid preserving mod. times - "-overwrite", - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(hdfs, "/destdat", files)); - assertTrue("-overwrite didn't.", - checkUpdate(hdfs, dchkpoint, "/destdat", files, NFILES)); - - deldir(hdfs, "/destdat"); - deldir(hdfs, "/srcdat"); - deldir(hdfs, "/logs"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - @SuppressWarnings("deprecation") - public void testCopyDfsToDfsUpdateWithSkipCRC() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - final FileSystem hdfs = cluster.getFileSystem(); - final String namenode = hdfs.getUri().toString(); - - FileSystem fs = FileSystem.get(URI.create(namenode), new Configuration()); - // Create two files of the same name, same length but different - // contents - final String testfilename = "test"; - final String srcData = "act act act"; - final String destData = "cat cat cat"; - - if (namenode.startsWith("hdfs://")) { - deldir(hdfs,"/logs"); - - Path srcPath = new Path("/srcdat", testfilename); - Path destPath = new Path("/destdat", testfilename); - FSDataOutputStream out = fs.create(srcPath, true); - out.writeUTF(srcData); - out.close(); - - out = fs.create(destPath, true); - out.writeUTF(destData); - out.close(); - - // Run with -skipcrccheck option - ToolRunner.run(new DistCpV1(conf), new String[] { - "-p", - "-update", - "-skipcrccheck", - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - - // File should not be overwritten - FSDataInputStream in = hdfs.open(destPath); - String s = in.readUTF(); - System.out.println("Dest had: " + s); - assertTrue("Dest got over written even with skip crc", - s.equalsIgnoreCase(destData)); - in.close(); - - deldir(hdfs, "/logs"); - - // Run without the option - ToolRunner.run(new DistCpV1(conf), new String[] { - "-p", - "-update", - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - - // File should be overwritten - in = hdfs.open(destPath); - s = in.readUTF(); - System.out.println("Dest had: " + s); - - assertTrue("Dest did not get overwritten without skip crc", - s.equalsIgnoreCase(srcData)); - in.close(); - - deldir(hdfs, "/destdat"); - deldir(hdfs, "/srcdat"); - deldir(hdfs, "/logs"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - @SuppressWarnings("deprecation") - public void testCopyDuplication() throws Exception { - final FileSystem localfs = FileSystem.get(LOCAL_FS, new Configuration()); - try { - MyFile[] files = createFiles(localfs, TEST_ROOT_DIR+"/srcdat"); - ToolRunner.run(new DistCpV1(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/src2/srcdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(localfs, TEST_ROOT_DIR+"/src2/srcdat", files)); - - assertEquals(DistCpV1.DuplicationException.ERROR_CODE, - ToolRunner.run(new DistCpV1(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/src2/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat",})); - } - finally { - deldir(localfs, TEST_ROOT_DIR+"/destdat"); - deldir(localfs, TEST_ROOT_DIR+"/srcdat"); - deldir(localfs, TEST_ROOT_DIR+"/src2"); - } - } - - @SuppressWarnings("deprecation") - public void testCopySingleFile() throws Exception { - FileSystem fs = FileSystem.get(LOCAL_FS, new Configuration()); - Path root = new Path(TEST_ROOT_DIR+"/srcdat"); - try { - MyFile[] files = {createFile(root, fs)}; - //copy a dir with a single file - ToolRunner.run(new DistCpV1(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, TEST_ROOT_DIR+"/destdat", files)); - - //copy a single file - String fname = files[0].getName(); - Path p = new Path(root, fname); - FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p)); - ToolRunner.run(new DistCpV1(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat/"+fname, - "file:///"+TEST_ROOT_DIR+"/dest2/"+fname}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, TEST_ROOT_DIR+"/dest2", files)); - - // single file update should skip copy if destination has the file already - String[] args = {"-update", "file:///"+TEST_ROOT_DIR+"/srcdat/"+fname, - "file:///"+TEST_ROOT_DIR+"/dest2/"+fname}; - Configuration conf = new Configuration(); - JobConf job = new JobConf(conf, DistCpV1.class); - DistCpV1.Arguments distcpArgs = DistCpV1.Arguments.valueOf(args, conf); - assertFalse("Single file update failed to skip copying even though the " - + "file exists at destination.", DistCpV1.setup(conf, job, distcpArgs)); - - //copy single file to existing dir - deldir(fs, TEST_ROOT_DIR+"/dest2"); - fs.mkdirs(new Path(TEST_ROOT_DIR+"/dest2")); - MyFile[] files2 = {createFile(root, fs, 0)}; - String sname = files2[0].getName(); - ToolRunner.run(new DistCpV1(new Configuration()), - new String[] {"-update", - "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname, - "file:///"+TEST_ROOT_DIR+"/dest2/"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2)); - updateFiles(fs, TEST_ROOT_DIR+"/srcdat", files2, 1); - //copy single file to existing dir w/ dst name conflict - ToolRunner.run(new DistCpV1(new Configuration()), - new String[] {"-update", - "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname, - "file:///"+TEST_ROOT_DIR+"/dest2/"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2)); - } - finally { - deldir(fs, TEST_ROOT_DIR+"/destdat"); - deldir(fs, TEST_ROOT_DIR+"/dest2"); - deldir(fs, TEST_ROOT_DIR+"/srcdat"); - } - } - - /** tests basedir option copying files from dfs file system to dfs file system */ - @SuppressWarnings("deprecation") - public void testBasedir() throws Exception { - String namenode = null; - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - final FileSystem hdfs = cluster.getFileSystem(); - namenode = FileSystem.getDefaultUri(conf).toString(); - if (namenode.startsWith("hdfs://")) { - MyFile[] files = createFiles(URI.create(namenode), "/basedir/middle/srcdat"); - ToolRunner.run(new DistCpV1(conf), new String[] { - "-basedir", - "/basedir", - namenode+"/basedir/middle/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(hdfs, "/destdat/middle/srcdat", files)); - deldir(hdfs, "/destdat"); - deldir(hdfs, "/basedir"); - deldir(hdfs, "/logs"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - @SuppressWarnings("deprecation") - public void testPreserveOption() throws Exception { - Configuration conf = new Configuration(); - MiniDFSCluster cluster = null; - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - String nnUri = FileSystem.getDefaultUri(conf).toString(); - FileSystem fs = FileSystem.get(URI.create(nnUri), conf); - - {//test preserving user - MyFile[] files = createFiles(URI.create(nnUri), "/srcdat"); - FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files); - for(int i = 0; i < srcstat.length; i++) { - fs.setOwner(srcstat[i].getPath(), "u" + i, null); - } - ToolRunner.run(new DistCpV1(conf), - new String[]{"-pu", nnUri+"/srcdat", nnUri+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, "/destdat", files)); - - FileStatus[] dststat = getFileStatus(fs, "/destdat", files); - for(int i = 0; i < dststat.length; i++) { - assertEquals("i=" + i, "u" + i, dststat[i].getOwner()); - } - deldir(fs, "/destdat"); - deldir(fs, "/srcdat"); - } - - {//test preserving group - MyFile[] files = createFiles(URI.create(nnUri), "/srcdat"); - FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files); - for(int i = 0; i < srcstat.length; i++) { - fs.setOwner(srcstat[i].getPath(), null, "g" + i); - } - ToolRunner.run(new DistCpV1(conf), - new String[]{"-pg", nnUri+"/srcdat", nnUri+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, "/destdat", files)); - - FileStatus[] dststat = getFileStatus(fs, "/destdat", files); - for(int i = 0; i < dststat.length; i++) { - assertEquals("i=" + i, "g" + i, dststat[i].getGroup()); - } - deldir(fs, "/destdat"); - deldir(fs, "/srcdat"); - } - - {//test preserving mode - MyFile[] files = createFiles(URI.create(nnUri), "/srcdat"); - FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files); - FsPermission[] permissions = new FsPermission[srcstat.length]; - for(int i = 0; i < srcstat.length; i++) { - permissions[i] = new FsPermission((short)(i & 0666)); - fs.setPermission(srcstat[i].getPath(), permissions[i]); - } - - ToolRunner.run(new DistCpV1(conf), - new String[]{"-pp", nnUri+"/srcdat", nnUri+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, "/destdat", files)); - - FileStatus[] dststat = getFileStatus(fs, "/destdat", files); - for(int i = 0; i < dststat.length; i++) { - assertEquals("i=" + i, permissions[i], dststat[i].getPermission()); - } - deldir(fs, "/destdat"); - deldir(fs, "/srcdat"); - } - - {//test preserving times - MyFile[] files = createFiles(URI.create(nnUri), "/srcdat"); - fs.mkdirs(new Path("/srcdat/tmpf1")); - fs.mkdirs(new Path("/srcdat/tmpf2")); - FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files); - FsPermission[] permissions = new FsPermission[srcstat.length]; - for(int i = 0; i < srcstat.length; i++) { - fs.setTimes(srcstat[i].getPath(), 40, 50); - } - - ToolRunner.run(new DistCpV1(conf), - new String[]{"-pt", nnUri+"/srcdat", nnUri+"/destdat"}); - - FileStatus[] dststat = getFileStatus(fs, "/destdat", files); - for(int i = 0; i < dststat.length; i++) { - assertEquals("Modif. Time i=" + i, 40, dststat[i].getModificationTime()); - assertEquals("Access Time i=" + i+ srcstat[i].getPath() + "-" + dststat[i].getPath(), 50, dststat[i].getAccessTime()); - } - - assertTrue("Source and destination directories do not match.", - checkFiles(fs, "/destdat", files)); - - deldir(fs, "/destdat"); - deldir(fs, "/srcdat"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - @SuppressWarnings("deprecation") - public void testMapCount() throws Exception { - String namenode = null; - MiniDFSCluster dfs = null; - MiniDFSCluster mr = null; - try { - Configuration conf = new Configuration(); - - dfs= new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build(); - - FileSystem fs = dfs.getFileSystem(); - final FsShell shell = new FsShell(conf); - namenode = fs.getUri().toString(); - MyFile[] files = createFiles(fs.getUri(), "/srcdat"); - long totsize = 0; - for (MyFile f : files) { - totsize += f.getSize(); - } - - Configuration job = new JobConf(conf); - job.setLong("distcp.bytes.per.map", totsize / 3); - ToolRunner.run(new DistCpV1(job), - new String[] {"-m", "100", - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, "/destdat", files)); - - String logdir = namenode + "/logs"; - System.out.println(execCmd(shell, "-lsr", logdir)); - FileStatus[] logs = fs.listStatus(new Path(logdir)); - // rare case where splits are exact, logs.length can be 4 - assertTrue( logs.length == 2); - - deldir(fs, "/destdat"); - deldir(fs, "/logs"); - ToolRunner.run(new DistCpV1(job), - new String[] {"-m", "1", - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - - System.out.println(execCmd(shell, "-lsr", logdir)); - logs = fs.globStatus(new Path(namenode+"/logs/part*")); - assertTrue("Unexpected map count, logs.length=" + logs.length, - logs.length == 1); - } finally { - if (dfs != null) { dfs.shutdown(); } - if (mr != null) { mr.shutdown(); } - } - } - - @SuppressWarnings("deprecation") - public void testLimits() throws Exception { - Configuration conf = new Configuration(); - MiniDFSCluster cluster = null; - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - final String nnUri = FileSystem.getDefaultUri(conf).toString(); - final FileSystem fs = FileSystem.get(URI.create(nnUri), conf); - final DistCpV1 distcp = new DistCpV1(conf); - final FsShell shell = new FsShell(conf); - - final String srcrootdir = "/src_root"; - final Path srcrootpath = new Path(srcrootdir); - final String dstrootdir = "/dst_root"; - final Path dstrootpath = new Path(dstrootdir); - - {//test -filelimit - MyFile[] files = createFiles(URI.create(nnUri), srcrootdir); - int filelimit = files.length / 2; - System.out.println("filelimit=" + filelimit); - - ToolRunner.run(distcp, - new String[]{"-filelimit", ""+filelimit, nnUri+srcrootdir, nnUri+dstrootdir}); - String results = execCmd(shell, "-lsr", dstrootdir); - results = removePrefix(results, dstrootdir); - System.out.println("results=" + results); - - FileStatus[] dststat = getFileStatus(fs, dstrootdir, files, true); - assertEquals(filelimit, dststat.length); - deldir(fs, dstrootdir); - deldir(fs, srcrootdir); - } - - {//test -sizelimit - createFiles(URI.create(nnUri), srcrootdir); - long sizelimit = fs.getContentSummary(srcrootpath).getLength()/2; - System.out.println("sizelimit=" + sizelimit); - - ToolRunner.run(distcp, - new String[]{"-sizelimit", ""+sizelimit, nnUri+srcrootdir, nnUri+dstrootdir}); - - ContentSummary summary = fs.getContentSummary(dstrootpath); - System.out.println("summary=" + summary); - assertTrue(summary.getLength() <= sizelimit); - deldir(fs, dstrootdir); - deldir(fs, srcrootdir); - } - - {//test update - final MyFile[] srcs = createFiles(URI.create(nnUri), srcrootdir); - final long totalsize = fs.getContentSummary(srcrootpath).getLength(); - System.out.println("src.length=" + srcs.length); - System.out.println("totalsize =" + totalsize); - fs.mkdirs(dstrootpath); - final int parts = RAN.nextInt(NFILES/3 - 1) + 2; - final int filelimit = srcs.length/parts; - final long sizelimit = totalsize/parts; - System.out.println("filelimit=" + filelimit); - System.out.println("sizelimit=" + sizelimit); - System.out.println("parts =" + parts); - final String[] args = {"-filelimit", ""+filelimit, "-sizelimit", ""+sizelimit, - "-update", nnUri+srcrootdir, nnUri+dstrootdir}; - - int dstfilecount = 0; - long dstsize = 0; - for(int i = 0; i <= parts; i++) { - ToolRunner.run(distcp, args); - - FileStatus[] dststat = getFileStatus(fs, dstrootdir, srcs, true); - System.out.println(i + ") dststat.length=" + dststat.length); - assertTrue(dststat.length - dstfilecount <= filelimit); - ContentSummary summary = fs.getContentSummary(dstrootpath); - System.out.println(i + ") summary.getLength()=" + summary.getLength()); - assertTrue(summary.getLength() - dstsize <= sizelimit); - assertTrue(checkFiles(fs, dstrootdir, srcs, true)); - dstfilecount = dststat.length; - dstsize = summary.getLength(); - } - - deldir(fs, dstrootdir); - deldir(fs, srcrootdir); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - static final long now = System.currentTimeMillis(); - - static UserGroupInformation createUGI(String name, boolean issuper) { - String username = name + now; - String group = issuper? "supergroup": username; - return UserGroupInformation.createUserForTesting(username, - new String[]{group}); - } - - static Path createHomeDirectory(FileSystem fs, UserGroupInformation ugi - ) throws IOException { - final Path home = new Path("/user/" + ugi.getUserName()); - fs.mkdirs(home); - fs.setOwner(home, ugi.getUserName(), ugi.getGroupNames()[0]); - fs.setPermission(home, new FsPermission((short)0700)); - return home; - } - - /** test -delete */ - @SuppressWarnings("deprecation") - public void testDelete() throws Exception { - final Configuration conf = new Configuration(); - conf.setInt("fs.trash.interval", 60); - MiniDFSCluster cluster = null; - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - final URI nnURI = FileSystem.getDefaultUri(conf); - final String nnUri = nnURI.toString(); - final FileSystem fs = FileSystem.get(URI.create(nnUri), conf); - - final DistCpV1 distcp = new DistCpV1(conf); - final FsShell shell = new FsShell(conf); - - final String srcrootdir = "/src_root"; - final String dstrootdir = "/dst_root"; - - { - //create source files - createFiles(nnURI, srcrootdir); - String srcresults = execCmd(shell, "-lsr", srcrootdir); - srcresults = removePrefix(srcresults, srcrootdir); - System.out.println("srcresults=" + srcresults); - - //create some files in dst - createFiles(nnURI, dstrootdir); - System.out.println("dstrootdir=" + dstrootdir); - shell.run(new String[]{"-lsr", dstrootdir}); - - //run distcp - ToolRunner.run(distcp, - new String[]{"-delete", "-update", "-log", "/log", - nnUri+srcrootdir, nnUri+dstrootdir}); - - //make sure src and dst contains the same files - String dstresults = execCmd(shell, "-lsr", dstrootdir); - dstresults = removePrefix(dstresults, dstrootdir); - System.out.println("first dstresults=" + dstresults); - assertEquals(srcresults, dstresults); - - //create additional file in dst - create(fs, new Path(dstrootdir, "foo")); - create(fs, new Path(dstrootdir, "foobar")); - - //run distcp again - ToolRunner.run(distcp, - new String[]{"-delete", "-update", "-log", "/log2", - nnUri+srcrootdir, nnUri+dstrootdir}); - - //make sure src and dst contains the same files - dstresults = execCmd(shell, "-lsr", dstrootdir); - dstresults = removePrefix(dstresults, dstrootdir); - System.out.println("second dstresults=" + dstresults); - assertEquals(srcresults, dstresults); - // verify that files removed in -delete were moved to the trash - // regrettably, this test will break if Trash changes incompatibly - assertTrue(fs.exists(new Path(fs.getHomeDirectory(), - ".Trash/Current" + dstrootdir + "/foo"))); - assertTrue(fs.exists(new Path(fs.getHomeDirectory(), - ".Trash/Current" + dstrootdir + "/foobar"))); - - //cleanup - deldir(fs, dstrootdir); - deldir(fs, srcrootdir); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - /** - * verify that -delete option works for other {@link FileSystem} - * implementations. See MAPREDUCE-1285 */ - @SuppressWarnings("deprecation") - public void testDeleteLocal() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - final FileSystem localfs = FileSystem.get(LOCAL_FS, conf); - cluster = new MiniDFSCluster.Builder(conf).build(); - final FileSystem hdfs = cluster.getFileSystem(); - final String namenode = FileSystem.getDefaultUri(conf).toString(); - if (namenode.startsWith("hdfs://")) { - MyFile[] files = createFiles(URI.create(namenode), "/srcdat"); - String destdir = TEST_ROOT_DIR + "/destdat"; - MyFile[] localFiles = createFiles(localfs, destdir); - ToolRunner.run(new DistCpV1(conf), new String[] { - "-delete", - "-update", - "-log", - "/logs", - namenode+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(localfs, destdir, files)); - assertTrue("Log directory does not exist.", - hdfs.exists(new Path("/logs"))); - deldir(localfs, destdir); - deldir(hdfs, "/logs"); - deldir(hdfs, "/srcdat"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - /** test globbing */ - @SuppressWarnings("deprecation") - public void testGlobbing() throws Exception { - String namenode = null; - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - final FileSystem hdfs = cluster.getFileSystem(); - namenode = FileSystem.getDefaultUri(conf).toString(); - if (namenode.startsWith("hdfs://")) { - MyFile[] files = createFiles(URI.create(namenode), "/srcdat"); - ToolRunner.run(new DistCpV1(conf), new String[] { - "-log", - namenode+"/logs", - namenode+"/srcdat/*", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(hdfs, "/destdat", files)); - FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf); - assertTrue("Log directory does not exist.", - fs.exists(new Path(namenode+"/logs"))); - deldir(hdfs, "/destdat"); - deldir(hdfs, "/srcdat"); - deldir(hdfs, "/logs"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - static void create(FileSystem fs, Path f) throws IOException { - FSDataOutputStream out = fs.create(f); - try { - byte[] b = new byte[1024 + RAN.nextInt(1024)]; - RAN.nextBytes(b); - out.write(b); - } finally { - if (out != null) out.close(); - } - } - - static String execCmd(FsShell shell, String... args) throws Exception { - ByteArrayOutputStream baout = new ByteArrayOutputStream(); - PrintStream out = new PrintStream(baout, true); - PrintStream old = System.out; - System.setOut(out); - shell.run(args); - out.close(); - System.setOut(old); - return baout.toString(); - } - - private static String removePrefix(String lines, String prefix) { - final int prefixlen = prefix.length(); - final StringTokenizer t = new StringTokenizer(lines, "\n"); - final StringBuffer results = new StringBuffer(); - for(; t.hasMoreTokens(); ) { - String s = t.nextToken(); - results.append(s.substring(s.indexOf(prefix) + prefixlen) + "\n"); - } - return results.toString(); - } -} diff --git a/hadoop-tools/hadoop-extras/src/test/java/org/apache/hadoop/tools/TestLogalyzer.java b/hadoop-tools/hadoop-extras/src/test/java/org/apache/hadoop/tools/TestLogalyzer.java deleted file mode 100644 index 3787f6c119..0000000000 --- a/hadoop-tools/hadoop-extras/src/test/java/org/apache/hadoop/tools/TestLogalyzer.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.tools; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.EnumSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; -import org.junit.Assert; -import org.junit.Test; - -public class TestLogalyzer { - private static String EL = System.getProperty("line.separator"); - private static String TAB = "\t"; - private static final Log LOG = LogFactory.getLog(TestLogalyzer.class); - - private static File workSpace = new File("target", - TestLogalyzer.class.getName() + "-workSpace"); - private static File outdir = new File(workSpace.getAbsoluteFile() - + File.separator + "out"); - - @Test - @SuppressWarnings("deprecation") - public void testLogalyzer() throws Exception { - Path f = createLogFile(); - - String[] args = new String[10]; - - args[0] = "-archiveDir"; - args[1] = f.toString(); - args[2] = "-grep"; - args[3] = "44"; - args[4] = "-sort"; - args[5] = "0"; - args[6] = "-analysis"; - args[7] = outdir.getAbsolutePath(); - args[8] = "-separator"; - args[9] = " "; - - Logalyzer.main(args); - checkResult(); - - } - - private void checkResult() throws Exception { - File result = new File(outdir.getAbsolutePath() + File.separator - + "part-00000"); - File success = new File(outdir.getAbsolutePath() + File.separator - + "_SUCCESS"); - Assert.assertTrue(success.exists()); - - FileInputStream fis = new FileInputStream(result); - BufferedReader br = new BufferedReader(new InputStreamReader(fis, "UTF-8")); - String line = br.readLine(); - Assert.assertTrue(("1 44" + TAB + "2").equals(line)); - line = br.readLine(); - - Assert.assertTrue(("3 44" + TAB + "1").equals(line)); - line = br.readLine(); - - Assert.assertTrue(("4 44" + TAB + "1").equals(line)); - - br.close(); - - } - - /** - * Create simple log file - * - * @return - * @throws IOException - */ - - private Path createLogFile() throws IOException { - - FileContext files = FileContext.getLocalFSFileContext(); - - Path ws = new Path(workSpace.getAbsoluteFile().getAbsolutePath()); - - files.delete(ws, true); - Path workSpacePath = new Path(workSpace.getAbsolutePath(), "log"); - files.mkdir(workSpacePath, null, true); - - LOG.info("create logfile.log"); - Path logfile1 = new Path(workSpacePath, "logfile.log"); - - FSDataOutputStream os = files.create(logfile1, - EnumSet.of(CreateFlag.CREATE)); - os.writeBytes("4 3" + EL + "1 3" + EL + "4 44" + EL); - os.writeBytes("2 3" + EL + "1 3" + EL + "0 45" + EL); - os.writeBytes("4 3" + EL + "1 3" + EL + "1 44" + EL); - - os.flush(); - os.close(); - LOG.info("create logfile1.log"); - - Path logfile2 = new Path(workSpacePath, "logfile1.log"); - - os = files.create(logfile2, EnumSet.of(CreateFlag.CREATE)); - os.writeBytes("4 3" + EL + "1 3" + EL + "3 44" + EL); - os.writeBytes("2 3" + EL + "1 3" + EL + "0 45" + EL); - os.writeBytes("4 3" + EL + "1 3" + EL + "1 44" + EL); - - os.flush(); - os.close(); - - return workSpacePath; - } -}