MAPREDUCE-4654. TestDistCp is ignored. Contributed by Sandy Ryza.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1396047 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0aa8188d18
commit
aa934616dd
@ -173,6 +173,8 @@ Release 2.0.3-alpha - Unreleased
|
|||||||
MAPREDUCE-4712. mr-jobhistory-daemon.sh doesn't accept --config
|
MAPREDUCE-4712. mr-jobhistory-daemon.sh doesn't accept --config
|
||||||
(Vinod Kumar Vavilapalli via tgraves)
|
(Vinod Kumar Vavilapalli via tgraves)
|
||||||
|
|
||||||
|
MAPREDUCE-4654. TestDistCp is ignored. (Sandy Ryza via tomwhite)
|
||||||
|
|
||||||
Release 2.0.2-alpha - 2012-09-07
|
Release 2.0.2-alpha - 2012-09-07
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -1,275 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.tools;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
|
||||||
import org.apache.hadoop.mapred.MiniMRCluster;
|
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
|
||||||
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
|
|
||||||
import org.apache.hadoop.mapreduce.Cluster;
|
|
||||||
import org.apache.hadoop.tools.mapred.CopyOutputFormat;
|
|
||||||
import org.junit.*;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.io.*;
|
|
||||||
|
|
||||||
@Ignore
|
|
||||||
public class TestDistCp {
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestDistCp.class);
|
|
||||||
private static List<Path> pathList = new ArrayList<Path>();
|
|
||||||
private static final int FILE_SIZE = 1024;
|
|
||||||
|
|
||||||
private static Configuration configuration;
|
|
||||||
private static MiniDFSCluster cluster;
|
|
||||||
private static MiniMRCluster mrCluster;
|
|
||||||
|
|
||||||
private static final String SOURCE_PATH = "/tmp/source";
|
|
||||||
private static final String TARGET_PATH = "/tmp/target";
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setup() throws Exception {
|
|
||||||
configuration = getConfigurationForCluster();
|
|
||||||
cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(1)
|
|
||||||
.format(true).build();
|
|
||||||
System.setProperty("org.apache.hadoop.mapred.TaskTracker", "target/tmp");
|
|
||||||
configuration.set("org.apache.hadoop.mapred.TaskTracker", "target/tmp");
|
|
||||||
System.setProperty("hadoop.log.dir", "target/tmp");
|
|
||||||
configuration.set("hadoop.log.dir", "target/tmp");
|
|
||||||
mrCluster = new MiniMRCluster(1, cluster.getFileSystem().getUri().toString(), 1);
|
|
||||||
Configuration mrConf = mrCluster.createJobConf();
|
|
||||||
final String mrJobTracker = mrConf.get("mapred.job.tracker");
|
|
||||||
configuration.set("mapred.job.tracker", mrJobTracker);
|
|
||||||
final String mrJobTrackerAddress
|
|
||||||
= mrConf.get("mapred.job.tracker.http.address");
|
|
||||||
configuration.set("mapred.job.tracker.http.address", mrJobTrackerAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void cleanup() {
|
|
||||||
if (mrCluster != null) mrCluster.shutdown();
|
|
||||||
if (cluster != null) cluster.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Configuration getConfigurationForCluster() throws IOException {
|
|
||||||
Configuration configuration = new Configuration();
|
|
||||||
System.setProperty("test.build.data", "target/build/TEST_DISTCP/data");
|
|
||||||
configuration.set("hadoop.log.dir", "target/tmp");
|
|
||||||
|
|
||||||
LOG.debug("fs.default.name == " + configuration.get("fs.default.name"));
|
|
||||||
LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
|
|
||||||
return configuration;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void createSourceData() throws Exception {
|
|
||||||
mkdirs(SOURCE_PATH + "/1");
|
|
||||||
mkdirs(SOURCE_PATH + "/2");
|
|
||||||
mkdirs(SOURCE_PATH + "/2/3/4");
|
|
||||||
mkdirs(SOURCE_PATH + "/2/3");
|
|
||||||
mkdirs(SOURCE_PATH + "/5");
|
|
||||||
touchFile(SOURCE_PATH + "/5/6");
|
|
||||||
mkdirs(SOURCE_PATH + "/7");
|
|
||||||
mkdirs(SOURCE_PATH + "/7/8");
|
|
||||||
touchFile(SOURCE_PATH + "/7/8/9");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void mkdirs(String path) throws Exception {
|
|
||||||
FileSystem fileSystem = cluster.getFileSystem();
|
|
||||||
final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
|
|
||||||
fileSystem.getWorkingDirectory());
|
|
||||||
pathList.add(qualifiedPath);
|
|
||||||
fileSystem.mkdirs(qualifiedPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void touchFile(String path) throws Exception {
|
|
||||||
FileSystem fs;
|
|
||||||
DataOutputStream outputStream = null;
|
|
||||||
try {
|
|
||||||
fs = cluster.getFileSystem();
|
|
||||||
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
|
|
||||||
fs.getWorkingDirectory());
|
|
||||||
final long blockSize = fs.getDefaultBlockSize(new Path(path)) * 2;
|
|
||||||
outputStream = fs.create(qualifiedPath, true, 0,
|
|
||||||
(short)(fs.getDefaultReplication(new Path(path))*2),
|
|
||||||
blockSize);
|
|
||||||
outputStream.write(new byte[FILE_SIZE]);
|
|
||||||
pathList.add(qualifiedPath);
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
IOUtils.cleanup(null, outputStream);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void clearState() throws Exception {
|
|
||||||
pathList.clear();
|
|
||||||
cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
|
|
||||||
createSourceData();
|
|
||||||
}
|
|
||||||
|
|
||||||
// @Test
|
|
||||||
public void testUniformSizeDistCp() throws Exception {
|
|
||||||
try {
|
|
||||||
clearState();
|
|
||||||
final FileSystem fileSystem = cluster.getFileSystem();
|
|
||||||
Path sourcePath = new Path(SOURCE_PATH)
|
|
||||||
.makeQualified(fileSystem.getUri(),
|
|
||||||
fileSystem.getWorkingDirectory());
|
|
||||||
List<Path> sources = new ArrayList<Path>();
|
|
||||||
sources.add(sourcePath);
|
|
||||||
|
|
||||||
Path targetPath = new Path(TARGET_PATH)
|
|
||||||
.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
|
|
||||||
DistCpOptions options = new DistCpOptions(sources, targetPath);
|
|
||||||
options.setAtomicCommit(true);
|
|
||||||
options.setBlocking(false);
|
|
||||||
Job job = new DistCp(configuration, options).execute();
|
|
||||||
Path workDir = CopyOutputFormat.getWorkingDirectory(job);
|
|
||||||
Path finalDir = CopyOutputFormat.getCommitDirectory(job);
|
|
||||||
|
|
||||||
while (!job.isComplete()) {
|
|
||||||
if (cluster.getFileSystem().exists(workDir)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
job.waitForCompletion(true);
|
|
||||||
Assert.assertFalse(cluster.getFileSystem().exists(workDir));
|
|
||||||
Assert.assertTrue(cluster.getFileSystem().exists(finalDir));
|
|
||||||
Assert.assertFalse(cluster.getFileSystem().exists(
|
|
||||||
new Path(job.getConfiguration().get(DistCpConstants.CONF_LABEL_META_FOLDER))));
|
|
||||||
verifyResults();
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
LOG.error("Exception encountered", e);
|
|
||||||
Assert.fail("Unexpected exception: " + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// @Test
|
|
||||||
public void testCleanup() {
|
|
||||||
try {
|
|
||||||
clearState();
|
|
||||||
Path sourcePath = new Path("noscheme:///file");
|
|
||||||
List<Path> sources = new ArrayList<Path>();
|
|
||||||
sources.add(sourcePath);
|
|
||||||
|
|
||||||
final FileSystem fs = cluster.getFileSystem();
|
|
||||||
Path targetPath = new Path(TARGET_PATH)
|
|
||||||
.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
|
||||||
DistCpOptions options = new DistCpOptions(sources, targetPath);
|
|
||||||
|
|
||||||
Path stagingDir = JobSubmissionFiles.getStagingDir(
|
|
||||||
new Cluster(configuration), configuration);
|
|
||||||
stagingDir.getFileSystem(configuration).mkdirs(stagingDir);
|
|
||||||
|
|
||||||
try {
|
|
||||||
new DistCp(configuration, options).execute();
|
|
||||||
} catch (Throwable t) {
|
|
||||||
Assert.assertEquals(stagingDir.getFileSystem(configuration).
|
|
||||||
listStatus(stagingDir).length, 0);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Exception encountered ", e);
|
|
||||||
Assert.fail("testCleanup failed " + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRootPath() throws Exception {
|
|
||||||
try {
|
|
||||||
clearState();
|
|
||||||
List<Path> sources = new ArrayList<Path>();
|
|
||||||
final FileSystem fs = cluster.getFileSystem();
|
|
||||||
sources.add(new Path("/a")
|
|
||||||
.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
|
|
||||||
sources.add(new Path("/b")
|
|
||||||
.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
|
|
||||||
touchFile("/a/a.txt");
|
|
||||||
touchFile("/b/b.txt");
|
|
||||||
|
|
||||||
Path targetPath = new Path("/c")
|
|
||||||
.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
|
||||||
DistCpOptions options = new DistCpOptions(sources, targetPath);
|
|
||||||
new DistCp(configuration, options).execute();
|
|
||||||
Assert.assertTrue(fs.exists(new Path("/c/a/a.txt")));
|
|
||||||
Assert.assertTrue(fs.exists(new Path("/c/b/b.txt")));
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
LOG.error("Exception encountered", e);
|
|
||||||
Assert.fail("Unexpected exception: " + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDynamicDistCp() throws Exception {
|
|
||||||
try {
|
|
||||||
clearState();
|
|
||||||
final FileSystem fs = cluster.getFileSystem();
|
|
||||||
Path sourcePath = new Path(SOURCE_PATH)
|
|
||||||
.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
|
||||||
List<Path> sources = new ArrayList<Path>();
|
|
||||||
sources.add(sourcePath);
|
|
||||||
|
|
||||||
Path targetPath = new Path(TARGET_PATH)
|
|
||||||
.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
|
||||||
DistCpOptions options = new DistCpOptions(sources, targetPath);
|
|
||||||
options.setCopyStrategy("dynamic");
|
|
||||||
|
|
||||||
options.setAtomicCommit(true);
|
|
||||||
options.setAtomicWorkPath(new Path("/work"));
|
|
||||||
options.setBlocking(false);
|
|
||||||
Job job = new DistCp(configuration, options).execute();
|
|
||||||
Path workDir = CopyOutputFormat.getWorkingDirectory(job);
|
|
||||||
Path finalDir = CopyOutputFormat.getCommitDirectory(job);
|
|
||||||
|
|
||||||
while (!job.isComplete()) {
|
|
||||||
if (fs.exists(workDir)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
job.waitForCompletion(true);
|
|
||||||
Assert.assertFalse(fs.exists(workDir));
|
|
||||||
Assert.assertTrue(fs.exists(finalDir));
|
|
||||||
|
|
||||||
verifyResults();
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
LOG.error("Exception encountered", e);
|
|
||||||
Assert.fail("Unexpected exception: " + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void verifyResults() throws Exception {
|
|
||||||
for (Path path : pathList) {
|
|
||||||
FileSystem fs = cluster.getFileSystem();
|
|
||||||
|
|
||||||
Path sourcePath = path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
|
||||||
Path targetPath
|
|
||||||
= new Path(sourcePath.toString().replaceAll(SOURCE_PATH, TARGET_PATH));
|
|
||||||
|
|
||||||
Assert.assertTrue(fs.exists(targetPath));
|
|
||||||
Assert.assertEquals(fs.isFile(sourcePath), fs.isFile(targetPath));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -21,8 +21,11 @@ package org.apache.hadoop.tools;
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.Cluster;
|
||||||
|
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
|
||||||
import org.apache.hadoop.tools.util.TestDistCpUtils;
|
import org.apache.hadoop.tools.util.TestDistCpUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
@ -30,6 +33,8 @@ import org.junit.Test;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
public class TestIntegration {
|
public class TestIntegration {
|
||||||
private static final Log LOG = LogFactory.getLog(TestIntegration.class);
|
private static final Log LOG = LogFactory.getLog(TestIntegration.class);
|
||||||
@ -318,6 +323,58 @@ public class TestIntegration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteMissingInDestination() {
|
||||||
|
|
||||||
|
try {
|
||||||
|
addEntries(listFile, "srcdir");
|
||||||
|
createFiles("srcdir/file1", "dstdir/file1", "dstdir/file2");
|
||||||
|
|
||||||
|
Path target = new Path(root + "/dstdir");
|
||||||
|
runTest(listFile, target, true, true, false);
|
||||||
|
|
||||||
|
checkResult(target, 1, "file1");
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Exception encountered while running distcp", e);
|
||||||
|
Assert.fail("distcp failure");
|
||||||
|
} finally {
|
||||||
|
TestDistCpUtils.delete(fs, root);
|
||||||
|
TestDistCpUtils.delete(fs, "target/tmp1");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOverwrite() {
|
||||||
|
byte[] contents1 = "contents1".getBytes();
|
||||||
|
byte[] contents2 = "contents2".getBytes();
|
||||||
|
Assert.assertEquals(contents1.length, contents2.length);
|
||||||
|
|
||||||
|
try {
|
||||||
|
addEntries(listFile, "srcdir");
|
||||||
|
createWithContents("srcdir/file1", contents1);
|
||||||
|
createWithContents("dstdir/file1", contents2);
|
||||||
|
|
||||||
|
Path target = new Path(root + "/dstdir");
|
||||||
|
runTest(listFile, target, false, false, true);
|
||||||
|
|
||||||
|
checkResult(target, 1, "file1");
|
||||||
|
|
||||||
|
// make sure dstdir/file1 has been overwritten with the contents
|
||||||
|
// of srcdir/file1
|
||||||
|
FSDataInputStream is = fs.open(new Path(root + "/dstdir/file1"));
|
||||||
|
byte[] dstContents = new byte[contents1.length];
|
||||||
|
is.readFully(dstContents);
|
||||||
|
is.close();
|
||||||
|
Assert.assertArrayEquals(contents1, dstContents);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Exception encountered while running distcp", e);
|
||||||
|
Assert.fail("distcp failure");
|
||||||
|
} finally {
|
||||||
|
TestDistCpUtils.delete(fs, root);
|
||||||
|
TestDistCpUtils.delete(fs, "target/tmp1");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGlobTargetMissingSingleLevel() {
|
public void testGlobTargetMissingSingleLevel() {
|
||||||
|
|
||||||
@ -411,6 +468,32 @@ public class TestIntegration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanup() {
|
||||||
|
try {
|
||||||
|
Path sourcePath = new Path("noscheme:///file");
|
||||||
|
List<Path> sources = new ArrayList<Path>();
|
||||||
|
sources.add(sourcePath);
|
||||||
|
|
||||||
|
DistCpOptions options = new DistCpOptions(sources, target);
|
||||||
|
|
||||||
|
Configuration conf = getConf();
|
||||||
|
Path stagingDir = JobSubmissionFiles.getStagingDir(
|
||||||
|
new Cluster(conf), conf);
|
||||||
|
stagingDir.getFileSystem(conf).mkdirs(stagingDir);
|
||||||
|
|
||||||
|
try {
|
||||||
|
new DistCp(conf, options).execute();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
Assert.assertEquals(stagingDir.getFileSystem(conf).
|
||||||
|
listStatus(stagingDir).length, 0);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Exception encountered ", e);
|
||||||
|
Assert.fail("testCleanup failed " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void addEntries(Path listFile, String... entries) throws IOException {
|
private void addEntries(Path listFile, String... entries) throws IOException {
|
||||||
OutputStream out = fs.create(listFile);
|
OutputStream out = fs.create(listFile);
|
||||||
try {
|
try {
|
||||||
@ -435,6 +518,15 @@ public class TestIntegration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void createWithContents(String entry, byte[] contents) throws IOException {
|
||||||
|
OutputStream out = fs.create(new Path(root + "/" + entry));
|
||||||
|
try {
|
||||||
|
out.write(contents);
|
||||||
|
} finally {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void mkdirs(String... entries) throws IOException {
|
private void mkdirs(String... entries) throws IOException {
|
||||||
for (String entry : entries){
|
for (String entry : entries){
|
||||||
fs.mkdirs(new Path(entry));
|
fs.mkdirs(new Path(entry));
|
||||||
@ -442,8 +534,15 @@ public class TestIntegration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void runTest(Path listFile, Path target, boolean sync) throws IOException {
|
private void runTest(Path listFile, Path target, boolean sync) throws IOException {
|
||||||
|
runTest(listFile, target, sync, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runTest(Path listFile, Path target, boolean sync, boolean delete,
|
||||||
|
boolean overwrite) throws IOException {
|
||||||
DistCpOptions options = new DistCpOptions(listFile, target);
|
DistCpOptions options = new DistCpOptions(listFile, target);
|
||||||
options.setSyncFolder(sync);
|
options.setSyncFolder(sync);
|
||||||
|
options.setDeleteMissing(delete);
|
||||||
|
options.setOverwrite(overwrite);
|
||||||
try {
|
try {
|
||||||
new DistCp(getConf(), options).execute();
|
new DistCp(getConf(), options).execute();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user