From aa934616dd488bd25e65113fa8ca13c42c626f1a Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 9 Oct 2012 14:37:46 +0000 Subject: [PATCH] MAPREDUCE-4654. TestDistCp is ignored. Contributed by Sandy Ryza. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1396047 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../org/apache/hadoop/tools/TestDistCp.java | 275 ------------------ .../apache/hadoop/tools/TestIntegration.java | 101 ++++++- 3 files changed, 102 insertions(+), 276 deletions(-) delete mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9adc52e1d3..893096d18d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -173,6 +173,8 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-4712. mr-jobhistory-daemon.sh doesn't accept --config (Vinod Kumar Vavilapalli via tgraves) + MAPREDUCE-4654. TestDistCp is ignored. (Sandy Ryza via tomwhite) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java deleted file mode 100644 index 64018cf96e..0000000000 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCp.java +++ /dev/null @@ -1,275 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.tools; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.mapred.MiniMRCluster; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobSubmissionFiles; -import org.apache.hadoop.mapreduce.Cluster; -import org.apache.hadoop.tools.mapred.CopyOutputFormat; -import org.junit.*; - -import java.util.List; -import java.util.ArrayList; -import java.io.*; - -@Ignore -public class TestDistCp { - private static final Log LOG = LogFactory.getLog(TestDistCp.class); - private static List pathList = new ArrayList(); - private static final int FILE_SIZE = 1024; - - private static Configuration configuration; - private static MiniDFSCluster cluster; - private static MiniMRCluster mrCluster; - - private static final String SOURCE_PATH = "/tmp/source"; - private static final String TARGET_PATH = "/tmp/target"; - - @BeforeClass - public static void setup() throws Exception { - configuration = getConfigurationForCluster(); - cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(1) - .format(true).build(); - System.setProperty("org.apache.hadoop.mapred.TaskTracker", "target/tmp"); - configuration.set("org.apache.hadoop.mapred.TaskTracker", "target/tmp"); - System.setProperty("hadoop.log.dir", "target/tmp"); - configuration.set("hadoop.log.dir", "target/tmp"); - mrCluster = new MiniMRCluster(1, cluster.getFileSystem().getUri().toString(), 1); - Configuration mrConf = mrCluster.createJobConf(); - final String mrJobTracker = mrConf.get("mapred.job.tracker"); - configuration.set("mapred.job.tracker", mrJobTracker); - final String mrJobTrackerAddress - = mrConf.get("mapred.job.tracker.http.address"); - configuration.set("mapred.job.tracker.http.address", mrJobTrackerAddress); - } - - @AfterClass - public static void cleanup() { - if (mrCluster != null) mrCluster.shutdown(); - if (cluster != null) cluster.shutdown(); - } - - private static Configuration getConfigurationForCluster() throws IOException { - Configuration configuration = new Configuration(); - System.setProperty("test.build.data", "target/build/TEST_DISTCP/data"); - configuration.set("hadoop.log.dir", "target/tmp"); - - LOG.debug("fs.default.name == " + configuration.get("fs.default.name")); - LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address")); - return configuration; - } - - private static void createSourceData() throws Exception { - mkdirs(SOURCE_PATH + "/1"); - mkdirs(SOURCE_PATH + "/2"); - mkdirs(SOURCE_PATH + "/2/3/4"); - mkdirs(SOURCE_PATH + "/2/3"); - mkdirs(SOURCE_PATH + "/5"); - touchFile(SOURCE_PATH + "/5/6"); - mkdirs(SOURCE_PATH + "/7"); - mkdirs(SOURCE_PATH + "/7/8"); - touchFile(SOURCE_PATH + "/7/8/9"); - } - - private static void mkdirs(String path) throws Exception { - FileSystem fileSystem = cluster.getFileSystem(); - final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(), - fileSystem.getWorkingDirectory()); - pathList.add(qualifiedPath); - fileSystem.mkdirs(qualifiedPath); - } - - private static void touchFile(String path) throws Exception { - FileSystem fs; - DataOutputStream outputStream = null; - try { - fs = cluster.getFileSystem(); - final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(), - fs.getWorkingDirectory()); - final long blockSize = fs.getDefaultBlockSize(new Path(path)) * 2; - outputStream = fs.create(qualifiedPath, true, 0, - (short)(fs.getDefaultReplication(new Path(path))*2), - blockSize); - outputStream.write(new byte[FILE_SIZE]); - pathList.add(qualifiedPath); - } - finally { - IOUtils.cleanup(null, outputStream); - } - } - - private static void clearState() throws Exception { - pathList.clear(); - cluster.getFileSystem().delete(new Path(TARGET_PATH), true); - createSourceData(); - } - -// @Test - public void testUniformSizeDistCp() throws Exception { - try { - clearState(); - final FileSystem fileSystem = cluster.getFileSystem(); - Path sourcePath = new Path(SOURCE_PATH) - .makeQualified(fileSystem.getUri(), - fileSystem.getWorkingDirectory()); - List sources = new ArrayList(); - sources.add(sourcePath); - - Path targetPath = new Path(TARGET_PATH) - .makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()); - DistCpOptions options = new DistCpOptions(sources, targetPath); - options.setAtomicCommit(true); - options.setBlocking(false); - Job job = new DistCp(configuration, options).execute(); - Path workDir = CopyOutputFormat.getWorkingDirectory(job); - Path finalDir = CopyOutputFormat.getCommitDirectory(job); - - while (!job.isComplete()) { - if (cluster.getFileSystem().exists(workDir)) { - break; - } - } - job.waitForCompletion(true); - Assert.assertFalse(cluster.getFileSystem().exists(workDir)); - Assert.assertTrue(cluster.getFileSystem().exists(finalDir)); - Assert.assertFalse(cluster.getFileSystem().exists( - new Path(job.getConfiguration().get(DistCpConstants.CONF_LABEL_META_FOLDER)))); - verifyResults(); - } - catch (Exception e) { - LOG.error("Exception encountered", e); - Assert.fail("Unexpected exception: " + e.getMessage()); - } - } - -// @Test - public void testCleanup() { - try { - clearState(); - Path sourcePath = new Path("noscheme:///file"); - List sources = new ArrayList(); - sources.add(sourcePath); - - final FileSystem fs = cluster.getFileSystem(); - Path targetPath = new Path(TARGET_PATH) - .makeQualified(fs.getUri(), fs.getWorkingDirectory()); - DistCpOptions options = new DistCpOptions(sources, targetPath); - - Path stagingDir = JobSubmissionFiles.getStagingDir( - new Cluster(configuration), configuration); - stagingDir.getFileSystem(configuration).mkdirs(stagingDir); - - try { - new DistCp(configuration, options).execute(); - } catch (Throwable t) { - Assert.assertEquals(stagingDir.getFileSystem(configuration). - listStatus(stagingDir).length, 0); - } - } catch (Exception e) { - LOG.error("Exception encountered ", e); - Assert.fail("testCleanup failed " + e.getMessage()); - } - } - - @Test - public void testRootPath() throws Exception { - try { - clearState(); - List sources = new ArrayList(); - final FileSystem fs = cluster.getFileSystem(); - sources.add(new Path("/a") - .makeQualified(fs.getUri(), fs.getWorkingDirectory())); - sources.add(new Path("/b") - .makeQualified(fs.getUri(), fs.getWorkingDirectory())); - touchFile("/a/a.txt"); - touchFile("/b/b.txt"); - - Path targetPath = new Path("/c") - .makeQualified(fs.getUri(), fs.getWorkingDirectory()); - DistCpOptions options = new DistCpOptions(sources, targetPath); - new DistCp(configuration, options).execute(); - Assert.assertTrue(fs.exists(new Path("/c/a/a.txt"))); - Assert.assertTrue(fs.exists(new Path("/c/b/b.txt"))); - } - catch (Exception e) { - LOG.error("Exception encountered", e); - Assert.fail("Unexpected exception: " + e.getMessage()); - } - } - - @Test - public void testDynamicDistCp() throws Exception { - try { - clearState(); - final FileSystem fs = cluster.getFileSystem(); - Path sourcePath = new Path(SOURCE_PATH) - .makeQualified(fs.getUri(), fs.getWorkingDirectory()); - List sources = new ArrayList(); - sources.add(sourcePath); - - Path targetPath = new Path(TARGET_PATH) - .makeQualified(fs.getUri(), fs.getWorkingDirectory()); - DistCpOptions options = new DistCpOptions(sources, targetPath); - options.setCopyStrategy("dynamic"); - - options.setAtomicCommit(true); - options.setAtomicWorkPath(new Path("/work")); - options.setBlocking(false); - Job job = new DistCp(configuration, options).execute(); - Path workDir = CopyOutputFormat.getWorkingDirectory(job); - Path finalDir = CopyOutputFormat.getCommitDirectory(job); - - while (!job.isComplete()) { - if (fs.exists(workDir)) { - break; - } - } - job.waitForCompletion(true); - Assert.assertFalse(fs.exists(workDir)); - Assert.assertTrue(fs.exists(finalDir)); - - verifyResults(); - } - catch (Exception e) { - LOG.error("Exception encountered", e); - Assert.fail("Unexpected exception: " + e.getMessage()); - } - } - - private static void verifyResults() throws Exception { - for (Path path : pathList) { - FileSystem fs = cluster.getFileSystem(); - - Path sourcePath = path.makeQualified(fs.getUri(), fs.getWorkingDirectory()); - Path targetPath - = new Path(sourcePath.toString().replaceAll(SOURCE_PATH, TARGET_PATH)); - - Assert.assertTrue(fs.exists(targetPath)); - Assert.assertEquals(fs.isFile(sourcePath), fs.isFile(targetPath)); - } - } -} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java index 303a4e05fc..ca08e25478 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java @@ -21,8 +21,11 @@ package org.apache.hadoop.tools; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.JobSubmissionFiles; import org.apache.hadoop.tools.util.TestDistCpUtils; import org.junit.Assert; import org.junit.BeforeClass; @@ -30,6 +33,8 @@ import org.junit.Test; import java.io.IOException; import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; public class TestIntegration { private static final Log LOG = LogFactory.getLog(TestIntegration.class); @@ -317,6 +322,58 @@ public class TestIntegration { TestDistCpUtils.delete(fs, root); } } + + @Test + public void testDeleteMissingInDestination() { + + try { + addEntries(listFile, "srcdir"); + createFiles("srcdir/file1", "dstdir/file1", "dstdir/file2"); + + Path target = new Path(root + "/dstdir"); + runTest(listFile, target, true, true, false); + + checkResult(target, 1, "file1"); + } catch (IOException e) { + LOG.error("Exception encountered while running distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + TestDistCpUtils.delete(fs, "target/tmp1"); + } + } + + @Test + public void testOverwrite() { + byte[] contents1 = "contents1".getBytes(); + byte[] contents2 = "contents2".getBytes(); + Assert.assertEquals(contents1.length, contents2.length); + + try { + addEntries(listFile, "srcdir"); + createWithContents("srcdir/file1", contents1); + createWithContents("dstdir/file1", contents2); + + Path target = new Path(root + "/dstdir"); + runTest(listFile, target, false, false, true); + + checkResult(target, 1, "file1"); + + // make sure dstdir/file1 has been overwritten with the contents + // of srcdir/file1 + FSDataInputStream is = fs.open(new Path(root + "/dstdir/file1")); + byte[] dstContents = new byte[contents1.length]; + is.readFully(dstContents); + is.close(); + Assert.assertArrayEquals(contents1, dstContents); + } catch (IOException e) { + LOG.error("Exception encountered while running distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + TestDistCpUtils.delete(fs, "target/tmp1"); + } + } @Test public void testGlobTargetMissingSingleLevel() { @@ -410,7 +467,33 @@ public class TestIntegration { TestDistCpUtils.delete(fs, "target/tmp1"); } } + + @Test + public void testCleanup() { + try { + Path sourcePath = new Path("noscheme:///file"); + List sources = new ArrayList(); + sources.add(sourcePath); + DistCpOptions options = new DistCpOptions(sources, target); + + Configuration conf = getConf(); + Path stagingDir = JobSubmissionFiles.getStagingDir( + new Cluster(conf), conf); + stagingDir.getFileSystem(conf).mkdirs(stagingDir); + + try { + new DistCp(conf, options).execute(); + } catch (Throwable t) { + Assert.assertEquals(stagingDir.getFileSystem(conf). + listStatus(stagingDir).length, 0); + } + } catch (Exception e) { + LOG.error("Exception encountered ", e); + Assert.fail("testCleanup failed " + e.getMessage()); + } + } + private void addEntries(Path listFile, String... entries) throws IOException { OutputStream out = fs.create(listFile); try { @@ -434,16 +517,32 @@ public class TestIntegration { } } } + + private void createWithContents(String entry, byte[] contents) throws IOException { + OutputStream out = fs.create(new Path(root + "/" + entry)); + try { + out.write(contents); + } finally { + out.close(); + } + } private void mkdirs(String... entries) throws IOException { for (String entry : entries){ fs.mkdirs(new Path(entry)); } } - + private void runTest(Path listFile, Path target, boolean sync) throws IOException { + runTest(listFile, target, sync, false, false); + } + + private void runTest(Path listFile, Path target, boolean sync, boolean delete, + boolean overwrite) throws IOException { DistCpOptions options = new DistCpOptions(listFile, target); options.setSyncFolder(sync); + options.setDeleteMissing(delete); + options.setOverwrite(overwrite); try { new DistCp(getConf(), options).execute(); } catch (Exception e) {