MAPREDUCE-6731. TestMRTimelineEventHandling#testMRNewTimelineServiceEventHandling fails randomly for concurrent tests. (Sangjin Lee via Varun Saxena).

This commit is contained in:
Varun Saxena 2016-07-12 20:45:53 +05:30
parent a3ac1c1943
commit d14e729a55

View File

@ -173,6 +173,13 @@ public void testMRTimelineEventHandling() throws Exception {
@Test
public void testMRNewTimelineServiceEventHandling() throws Exception {
LOG.info("testMRNewTimelineServiceEventHandling start.");
String testDir =
new File("target", getClass().getSimpleName() +
"-test_dir").getAbsolutePath();
String storageDir =
testDir + File.separator + "timeline_service_data";
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
// enable new timeline service
@ -180,6 +187,9 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
// set the file system root directory
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
storageDir);
// enable aux-service based timeline collectors
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
@ -196,8 +206,8 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
cluster.start();
LOG.info("A MiniMRYarnCluster get start.");
Path inDir = new Path("input");
Path outDir = new Path("output");
Path inDir = new Path(testDir, "input");
Path outDir = new Path(testDir, "output");
LOG.info("Run 1st job which should be successful.");
JobConf successConf = new JobConf(conf);
successConf.set("dummy_conf1",
@ -225,7 +235,7 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
ApplicationReport appReport = apps.get(0);
firstAppId = appReport.getApplicationId();
UtilsForTests.waitForAppFinished(job, cluster);
checkNewTimelineEvent(firstAppId, appReport);
checkNewTimelineEvent(firstAppId, appReport, storageDir);
LOG.info("Run 2nd job which should be failed.");
job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
@ -238,41 +248,38 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
appReport = apps.get(0).getApplicationId().equals(firstAppId) ?
apps.get(0) : apps.get(1);
checkNewTimelineEvent(firstAppId, appReport);
checkNewTimelineEvent(firstAppId, appReport, storageDir);
} finally {
if (cluster != null) {
cluster.stop();
}
// Cleanup test file
String testRoot =
FileSystemTimelineWriterImpl.
DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
File testRootFolder = new File(testRoot);
if(testRootFolder.isDirectory()) {
FileUtils.deleteDirectory(testRootFolder);
File testDirFolder = new File(testDir);
if(testDirFolder.isDirectory()) {
FileUtils.deleteDirectory(testDirFolder);
}
}
}
private void checkNewTimelineEvent(ApplicationId appId,
ApplicationReport appReport) throws IOException {
String tmpRoot =
FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+ "/entities/";
ApplicationReport appReport, String storageDir) throws IOException {
String tmpRoot = storageDir + File.separator + "entities" + File.separator;
File tmpRootFolder = new File(tmpRoot);
Assert.assertTrue(tmpRootFolder.isDirectory());
String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID +
"/" + UserGroupInformation.getCurrentUser().getShortUserName() +
"/" + appReport.getName() +
"/" + TimelineUtils.DEFAULT_FLOW_VERSION +
"/" + appReport.getStartTime() +
"/" + appId.toString();
File.separator +
UserGroupInformation.getCurrentUser().getShortUserName() +
File.separator + appReport.getName() +
File.separator + TimelineUtils.DEFAULT_FLOW_VERSION +
File.separator + appReport.getStartTime() +
File.separator + appId.toString();
// for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs
String outputDirJob = basePath + "/MAPREDUCE_JOB/";
String outputDirJob =
basePath + File.separator + "MAPREDUCE_JOB" + File.separator;
File entityFolder = new File(outputDirJob);
Assert.assertTrue("Job output directory: " + outputDirJob +
@ -295,7 +302,8 @@ private void checkNewTimelineEvent(ApplicationId appId,
verifyEntity(jobEventFile, null, false, true, cfgsToCheck);
// for this test, we expect MR job metrics are published in YARN_APPLICATION
String outputAppDir = basePath + "/YARN_APPLICATION/";
String outputAppDir =
basePath + File.separator + "YARN_APPLICATION" + File.separator;
entityFolder = new File(outputAppDir);
Assert.assertTrue(
"Job output directory: " + outputAppDir +
@ -316,7 +324,8 @@ private void checkNewTimelineEvent(ApplicationId appId,
verifyEntity(appEventFile, null, false, true, cfgsToCheck);
// check for task event file
String outputDirTask = basePath + "/MAPREDUCE_TASK/";
String outputDirTask =
basePath + File.separator + "MAPREDUCE_TASK" + File.separator;
File taskFolder = new File(outputDirTask);
Assert.assertTrue("Task output directory: " + outputDirTask +
" does not exist.",
@ -336,7 +345,8 @@ private void checkNewTimelineEvent(ApplicationId appId,
true, false, null);
// check for task attempt event file
String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/";
String outputDirTaskAttempt =
basePath + File.separator + "MAPREDUCE_TASK_ATTEMPT" + File.separator;
File taskAttemptFolder = new File(outputDirTaskAttempt);
Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt +
" does not exist.", taskAttemptFolder.isDirectory());