diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index fcf0f3a022..5df2075462 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -480,6 +480,8 @@ Release 0.23.3 - UNRELEASED HADOOP-8227. Allow RPC to limit ephemeral port range. (bobby) + HADOOP-8305. distcp over viewfs is broken (John George via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java index 0ba5dbff90..0919563bd7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java @@ -1050,9 +1050,9 @@ public static Option compression(CompressionType value, int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : bufferSizeOption.getValue(); short replication = replicationOption == null ? - fs.getDefaultReplication() : + fs.getDefaultReplication(p) : (short) replicationOption.getValue(); - long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize() : + long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : blockSizeOption.getValue(); Progressable progress = progressOption == null ? null : progressOption.getValue(); diff --git a/hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java b/hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java index 3ff94ac0f8..8a370a5eb4 100644 --- a/hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java +++ b/hadoop-tools/hadoop-archives/src/main/java/org/apache/hadoop/tools/HadoopArchives.java @@ -613,7 +613,7 @@ public void configure(JobConf conf) { destFs.delete(tmpOutput, false); } partStream = destFs.create(tmpOutput, false, conf.getInt("io.file.buffer.size", 4096), - destFs.getDefaultReplication(), blockSize); + destFs.getDefaultReplication(tmpOutput), blockSize); } catch(IOException ie) { throw new RuntimeException("Unable to open output file " + tmpOutput, ie); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 7ba26ff481..e3c8f060cd 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -107,8 +107,8 @@ private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS, throws IOException { OutputStream outStream = new BufferedOutputStream(targetFS.create( tmpTargetPath, true, BUFFER_SIZE, - getReplicationFactor(fileAttributes, sourceFileStatus, targetFS), - getBlockSize(fileAttributes, sourceFileStatus, targetFS), context)); + getReplicationFactor(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), + getBlockSize(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), context)); return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, true, context); } @@ -218,16 +218,16 @@ private static ThrottledInputStream getInputStream(Path path, Configuration conf private static short getReplicationFactor( EnumSet fileAttributes, - FileStatus sourceFile, FileSystem targetFS) { + FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) { return fileAttributes.contains(FileAttribute.REPLICATION)? - sourceFile.getReplication() : targetFS.getDefaultReplication(); + sourceFile.getReplication() : targetFS.getDefaultReplication(tmpTargetPath); } private static long getBlockSize( EnumSet fileAttributes, - FileStatus sourceFile, FileSystem targetFS) { + FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) { return fileAttributes.contains(FileAttribute.BLOCKSIZE)? - sourceFile.getBlockSize() : targetFS.getDefaultBlockSize(); + sourceFile.getBlockSize() : targetFS.getDefaultBlockSize(tmpTargetPath); } /** diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java index d50f63ab37..64018cf96e 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java @@ -110,9 +110,9 @@ private static void touchFile(String path) throws Exception { fs = cluster.getFileSystem(); final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(), fs.getWorkingDirectory()); - final long blockSize = fs.getDefaultBlockSize() * 2; + final long blockSize = fs.getDefaultBlockSize(new Path(path)) * 2; outputStream = fs.create(qualifiedPath, true, 0, - (short)(fs.getDefaultReplication()*2), + (short)(fs.getDefaultReplication(new Path(path))*2), blockSize); outputStream.write(new byte[FILE_SIZE]); pathList.add(qualifiedPath); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpViewFs.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpViewFs.java new file mode 100644 index 0000000000..7151134c5f --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpViewFs.java @@ -0,0 +1,485 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.fs.viewfs.*; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.util.TestDistCpUtils; +import org.apache.hadoop.fs.FsConstants; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; + +public class TestDistCpViewFs { + private static final Log LOG = LogFactory.getLog(TestDistCpViewFs.class); + + private static FileSystem fs; + + private static Path listFile; + private static Path target; + private static String root; + + private static Configuration getConf() throws URISyntaxException { + Configuration conf = new Configuration(); + conf.set("mapred.job.tracker", "local"); + conf.set("fs.default.name", "file:///"); + return conf; + } + + @BeforeClass + public static void setup() throws URISyntaxException{ + try { + Path fswd = FileSystem.get(getConf()).getWorkingDirectory(); + Configuration vConf = ViewFileSystemTestSetup.createConfig(); + ConfigUtil.addLink(vConf, "/usr", new URI(fswd.toString())); + fs = FileSystem.get(FsConstants.VIEWFS_URI, vConf); + fs.setWorkingDirectory(new Path("/usr")); + listFile = new Path("target/tmp/listing").makeQualified(fs.getUri(), + fs.getWorkingDirectory()); + target = new Path("target/tmp/target").makeQualified(fs.getUri(), + fs.getWorkingDirectory()); + root = new Path("target/tmp").makeQualified(fs.getUri(), + fs.getWorkingDirectory()).toString(); + TestDistCpUtils.delete(fs, root); + } catch (IOException e) { + LOG.error("Exception encountered ", e); + } + } + + @Test + public void testSingleFileMissingTarget() { + caseSingleFileMissingTarget(false); + caseSingleFileMissingTarget(true); + } + + + private void caseSingleFileMissingTarget(boolean sync) { + + try { + addEntries(listFile, "singlefile1/file1"); + createFiles("singlefile1/file1"); + + runTest(listFile, target, sync); + + checkResult(target, 1); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test + public void testSingleFileTargetFile() { + caseSingleFileTargetFile(false); + caseSingleFileTargetFile(true); + } + + private void caseSingleFileTargetFile(boolean sync) { + + try { + addEntries(listFile, "singlefile1/file1"); + createFiles("singlefile1/file1", target.toString()); + + runTest(listFile, target, sync); + + checkResult(target, 1); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test + public void testSingleFileTargetDir() { + caseSingleFileTargetDir(false); + caseSingleFileTargetDir(true); + } + + private void caseSingleFileTargetDir(boolean sync) { + + try { + addEntries(listFile, "singlefile2/file2"); + createFiles("singlefile2/file2"); + mkdirs(target.toString()); + + runTest(listFile, target, sync); + + checkResult(target, 1, "file2"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test + public void testSingleDirTargetMissing() { + caseSingleDirTargetMissing(false); + caseSingleDirTargetMissing(true); + } + + private void caseSingleDirTargetMissing(boolean sync) { + + try { + addEntries(listFile, "singledir"); + mkdirs(root + "/singledir/dir1"); + + runTest(listFile, target, sync); + + checkResult(target, 1, "dir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test + public void testSingleDirTargetPresent() { + + try { + addEntries(listFile, "singledir"); + mkdirs(root + "/singledir/dir1"); + mkdirs(target.toString()); + + runTest(listFile, target, false); + + checkResult(target, 1, "singledir/dir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test + public void testUpdateSingleDirTargetPresent() { + + try { + addEntries(listFile, "Usingledir"); + mkdirs(root + "/Usingledir/Udir1"); + mkdirs(target.toString()); + + runTest(listFile, target, true); + + checkResult(target, 1, "Udir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test + public void testMultiFileTargetPresent() { + caseMultiFileTargetPresent(false); + caseMultiFileTargetPresent(true); + } + + private void caseMultiFileTargetPresent(boolean sync) { + + try { + addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + mkdirs(target.toString()); + + runTest(listFile, target, sync); + + checkResult(target, 3, "file3", "file4", "file5"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test + public void testMultiFileTargetMissing() { + caseMultiFileTargetMissing(false); + caseMultiFileTargetMissing(true); + } + + private void caseMultiFileTargetMissing(boolean sync) { + + try { + addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + + runTest(listFile, target, sync); + + checkResult(target, 3, "file3", "file4", "file5"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test + public void testMultiDirTargetPresent() { + + try { + addEntries(listFile, "multifile", "singledir"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + mkdirs(target.toString(), root + "/singledir/dir1"); + + runTest(listFile, target, false); + + checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5", "singledir/dir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test + public void testUpdateMultiDirTargetPresent() { + + try { + addEntries(listFile, "Umultifile", "Usingledir"); + createFiles("Umultifile/Ufile3", "Umultifile/Ufile4", "Umultifile/Ufile5"); + mkdirs(target.toString(), root + "/Usingledir/Udir1"); + + runTest(listFile, target, true); + + checkResult(target, 4, "Ufile3", "Ufile4", "Ufile5", "Udir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test + public void testMultiDirTargetMissing() { + + try { + addEntries(listFile, "multifile", "singledir"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + mkdirs(root + "/singledir/dir1"); + + runTest(listFile, target, false); + + checkResult(target, 2, "multifile/file3", "multifile/file4", + "multifile/file5", "singledir/dir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test + public void testUpdateMultiDirTargetMissing() { + + try { + addEntries(listFile, "multifile", "singledir"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + mkdirs(root + "/singledir/dir1"); + + runTest(listFile, target, true); + + checkResult(target, 4, "file3", "file4", "file5", "dir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test + public void testGlobTargetMissingSingleLevel() { + + try { + Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(), + fs.getWorkingDirectory()); + addEntries(listFile, "*"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + createFiles("singledir/dir2/file6"); + + runTest(listFile, target, false); + + checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5", + "singledir/dir2/file6"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + TestDistCpUtils.delete(fs, "target/tmp1"); + } + } + + @Test + public void testUpdateGlobTargetMissingSingleLevel() { + + try { + Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(), + fs.getWorkingDirectory()); + addEntries(listFile, "*"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + createFiles("singledir/dir2/file6"); + + runTest(listFile, target, true); + + checkResult(target, 4, "file3", "file4", "file5", "dir2/file6"); + } catch (IOException e) { + LOG.error("Exception encountered while running distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + TestDistCpUtils.delete(fs, "target/tmp1"); + } + } + + @Test + public void testGlobTargetMissingMultiLevel() { + + try { + Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(), + fs.getWorkingDirectory()); + addEntries(listFile, "*/*"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + createFiles("singledir1/dir3/file7", "singledir1/dir3/file8", + "singledir1/dir3/file9"); + + runTest(listFile, target, false); + + checkResult(target, 4, "file3", "file4", "file5", + "dir3/file7", "dir3/file8", "dir3/file9"); + } catch (IOException e) { + LOG.error("Exception encountered while running distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + TestDistCpUtils.delete(fs, "target/tmp1"); + } + } + + @Test + public void testUpdateGlobTargetMissingMultiLevel() { + + try { + Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(), + fs.getWorkingDirectory()); + addEntries(listFile, "*/*"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + createFiles("singledir1/dir3/file7", "singledir1/dir3/file8", + "singledir1/dir3/file9"); + + runTest(listFile, target, true); + + checkResult(target, 6, "file3", "file4", "file5", + "file7", "file8", "file9"); + } catch (IOException e) { + LOG.error("Exception encountered while running distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + TestDistCpUtils.delete(fs, "target/tmp1"); + } + } + + private void addEntries(Path listFile, String... entries) throws IOException { + OutputStream out = fs.create(listFile); + try { + for (String entry : entries){ + out.write((root + "/" + entry).getBytes()); + out.write("\n".getBytes()); + } + } finally { + out.close(); + } + } + + private void createFiles(String... entries) throws IOException { + String e; + for (String entry : entries){ + if ((new Path(entry)).isAbsolute()) + { + e = entry; + } + else + { + e = root + "/" + entry; + } + OutputStream out = fs.create(new Path(e)); + try { + out.write((e).getBytes()); + out.write("\n".getBytes()); + } finally { + out.close(); + } + } + } + + private void mkdirs(String... entries) throws IOException { + for (String entry : entries){ + fs.mkdirs(new Path(entry)); + } + } + + private void runTest(Path listFile, Path target, boolean sync) throws IOException { + DistCpOptions options = new DistCpOptions(listFile, target); + options.setSyncFolder(sync); + try { + new DistCp(getConf(), options).execute(); + } catch (Exception e) { + LOG.error("Exception encountered ", e); + throw new IOException(e); + } + } + + private void checkResult(Path target, int count, String... relPaths) throws IOException { + Assert.assertEquals(count, fs.listStatus(target).length); + if (relPaths == null || relPaths.length == 0) { + Assert.assertTrue(target.toString(), fs.exists(target)); + return; + } + for (String relPath : relPaths) { + Assert.assertTrue(new Path(target, relPath).toString(), fs.exists(new Path(target, relPath))); + } + } + +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index 5ba5eb8867..f9c2371ef2 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -127,9 +127,9 @@ private static void touchFile(String path) throws Exception { fs = cluster.getFileSystem(); final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(), fs.getWorkingDirectory()); - final long blockSize = fs.getDefaultBlockSize() * 2; + final long blockSize = fs.getDefaultBlockSize(qualifiedPath) * 2; outputStream = fs.create(qualifiedPath, true, 0, - (short)(fs.getDefaultReplication()*2), + (short)(fs.getDefaultReplication(qualifiedPath)*2), blockSize); outputStream.write(new byte[FILE_SIZE]); pathList.add(qualifiedPath); diff --git a/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCp.java index 04d123af91..5c9203804a 100644 --- a/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -374,9 +374,9 @@ private FSDataOutputStream create(Path f, Reporter reporter, FsPermission permission = preseved.contains(FileAttribute.PERMISSION)? srcstat.getPermission(): null; short replication = preseved.contains(FileAttribute.REPLICATION)? - srcstat.getReplication(): destFileSys.getDefaultReplication(); + srcstat.getReplication(): destFileSys.getDefaultReplication(f); long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)? - srcstat.getBlockSize(): destFileSys.getDefaultBlockSize(); + srcstat.getBlockSize(): destFileSys.getDefaultBlockSize(f); return destFileSys.create(f, permission, true, sizeBuf, replication, blockSize, reporter); }