MAPREDUCE-6682. TestMRCJCFileOutputCommitter fails intermittently Contributed by Akira Ajisaka.
This commit is contained in:
parent
a1f6564d31
commit
8f1c374bec
@ -25,6 +25,8 @@
|
|||||||
import org.apache.hadoop.io.NullWritable;
|
import org.apache.hadoop.io.NullWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.JobStatus;
|
import org.apache.hadoop.mapreduce.JobStatus;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
@ -37,8 +39,7 @@
|
|||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class TestMRCJCFileOutputCommitter {
|
public class TestMRCJCFileOutputCommitter {
|
||||||
private static Path outDir = new Path(
|
private static Path outDir = new Path(GenericTestUtils.getTempPath("output"));
|
||||||
System.getProperty("test.build.data", "/tmp"), "output");
|
|
||||||
|
|
||||||
// A random task attempt id for testing.
|
// A random task attempt id for testing.
|
||||||
private static String attempt = "attempt_200707121733_0001_m_000000_0";
|
private static String attempt = "attempt_200707121733_0001_m_000000_0";
|
||||||
@ -112,12 +113,11 @@ public void testCommitter() throws Exception {
|
|||||||
expectedOutput.append(key2).append('\t').append(val2).append("\n");
|
expectedOutput.append(key2).append('\t').append(val2).append("\n");
|
||||||
String output = UtilsForTests.slurp(expectedFile);
|
String output = UtilsForTests.slurp(expectedFile);
|
||||||
assertEquals(output, expectedOutput.toString());
|
assertEquals(output, expectedOutput.toString());
|
||||||
|
|
||||||
FileUtil.fullyDelete(new File(outDir.toString()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAbort() throws IOException {
|
public void testAbort() throws IOException {
|
||||||
|
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||||
JobConf job = new JobConf();
|
JobConf job = new JobConf();
|
||||||
setConfForFileOutputCommitter(job);
|
setConfForFileOutputCommitter(job);
|
||||||
JobContext jContext = new JobContextImpl(job, taskID.getJobID());
|
JobContext jContext = new JobContextImpl(job, taskID.getJobID());
|
||||||
@ -152,7 +152,6 @@ public void testAbort() throws IOException {
|
|||||||
assertFalse("job temp dir "+expectedFile+" still exists", expectedFile.exists());
|
assertFalse("job temp dir "+expectedFile+" still exists", expectedFile.exists());
|
||||||
assertEquals("Output directory not empty", 0, new File(outDir.toString())
|
assertEquals("Output directory not empty", 0, new File(outDir.toString())
|
||||||
.listFiles().length);
|
.listFiles().length);
|
||||||
FileUtil.fullyDelete(new File(outDir.toString()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class FakeFileSystem extends RawLocalFileSystem {
|
public static class FakeFileSystem extends RawLocalFileSystem {
|
||||||
@ -223,4 +222,9 @@ public void testFailAbort() throws IOException {
|
|||||||
assertTrue(th.getMessage().contains("fake delete failed"));
|
assertTrue(th.getMessage().contains("fake delete failed"));
|
||||||
assertTrue("job temp dir does not exists", jobTmpDir.exists());
|
assertTrue("job temp dir does not exists", jobTmpDir.exists());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() {
|
||||||
|
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,12 +34,14 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.EventType;
|
import org.apache.hadoop.mapreduce.jobhistory.EventType;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
|
import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
|
||||||
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
@ -460,21 +462,21 @@ public void testMapreduceJobTimelineServiceEnabled()
|
|||||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
|
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
|
||||||
MiniMRYarnCluster cluster = null;
|
MiniMRYarnCluster cluster = null;
|
||||||
|
FileSystem fs = null;
|
||||||
|
Path inDir = new Path(GenericTestUtils.getTempPath("input"));
|
||||||
|
Path outDir = new Path(GenericTestUtils.getTempPath("output"));
|
||||||
try {
|
try {
|
||||||
|
fs = FileSystem.get(conf);
|
||||||
cluster = new MiniMRYarnCluster(
|
cluster = new MiniMRYarnCluster(
|
||||||
TestMRTimelineEventHandling.class.getSimpleName(), 1);
|
TestMRTimelineEventHandling.class.getSimpleName(), 1);
|
||||||
cluster.init(conf);
|
cluster.init(conf);
|
||||||
cluster.start();
|
cluster.start();
|
||||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
||||||
MiniYARNCluster.getHostname() + ":"
|
MiniYARNCluster.getHostname() + ":"
|
||||||
+ cluster.getApplicationHistoryServer().getPort());
|
+ cluster.getApplicationHistoryServer().getPort());
|
||||||
TimelineStore ts = cluster.getApplicationHistoryServer()
|
TimelineStore ts = cluster.getApplicationHistoryServer()
|
||||||
.getTimelineStore();
|
.getTimelineStore();
|
||||||
|
|
||||||
String localPathRoot = System.getProperty("test.build.data",
|
|
||||||
"build/test/data");
|
|
||||||
Path inDir = new Path(localPathRoot, "input");
|
|
||||||
Path outDir = new Path(localPathRoot, "output");
|
|
||||||
RunningJob job =
|
RunningJob job =
|
||||||
UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
|
UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
|
||||||
Assert.assertEquals(JobStatus.SUCCEEDED,
|
Assert.assertEquals(JobStatus.SUCCEEDED,
|
||||||
@ -496,6 +498,7 @@ public void testMapreduceJobTimelineServiceEnabled()
|
|||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.stop();
|
cluster.stop();
|
||||||
}
|
}
|
||||||
|
deletePaths(fs, inDir, outDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
conf = new YarnConfiguration();
|
conf = new YarnConfiguration();
|
||||||
@ -509,15 +512,10 @@ public void testMapreduceJobTimelineServiceEnabled()
|
|||||||
cluster.start();
|
cluster.start();
|
||||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
||||||
MiniYARNCluster.getHostname() + ":"
|
MiniYARNCluster.getHostname() + ":"
|
||||||
+ cluster.getApplicationHistoryServer().getPort());
|
+ cluster.getApplicationHistoryServer().getPort());
|
||||||
TimelineStore ts = cluster.getApplicationHistoryServer()
|
TimelineStore ts = cluster.getApplicationHistoryServer()
|
||||||
.getTimelineStore();
|
.getTimelineStore();
|
||||||
|
|
||||||
String localPathRoot = System.getProperty("test.build.data",
|
|
||||||
"build/test/data");
|
|
||||||
Path inDir = new Path(localPathRoot, "input");
|
|
||||||
Path outDir = new Path(localPathRoot, "output");
|
|
||||||
|
|
||||||
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
|
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
|
||||||
RunningJob job =
|
RunningJob job =
|
||||||
UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
|
UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
|
||||||
@ -540,6 +538,20 @@ public void testMapreduceJobTimelineServiceEnabled()
|
|||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.stop();
|
cluster.stop();
|
||||||
}
|
}
|
||||||
|
deletePaths(fs, inDir, outDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Delete input paths recursively. Paths should not be null. */
|
||||||
|
private void deletePaths(FileSystem fs, Path... paths) {
|
||||||
|
if (fs == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (Path path : paths) {
|
||||||
|
try {
|
||||||
|
fs.delete(path, true);
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user