From 000a4d8e133920d71af304f2015d3674daed74fa Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Tue, 3 May 2016 09:19:36 -0700 Subject: [PATCH] MAPREDUCE-6688. Store job configurations in Timeline Service v2 (Varun Saxena via sjlee) --- .../jobhistory/JobHistoryEventHandler.java | 57 +++++++++++- .../mapreduce/v2/app/job/impl/JobImpl.java | 2 +- .../jobhistory/JobSubmittedEvent.java | 38 +++++++- .../mapreduce/util/JobHistoryEventUtils.java | 3 + .../mapred/TestMRTimelineEventHandling.java | 92 ++++++++++++++++--- .../apache/hadoop/mapred/UtilsForTests.java | 8 ++ 6 files changed, 181 insertions(+), 19 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 528b45002d..887533d7fd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -1074,7 +1074,16 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId, entity.setId(jobId.toString()); return entity; } - + + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createJobEntity(JobId jobId) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); + entity.setId(jobId.toString()); + entity.setType(MAPREDUCE_JOB_ENTITY_TYPE); + return entity; + } + // create ApplicationEntity with job finished Metrics from HistoryEvent private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity createAppEntityWithJobMetrics(HistoryEvent event, JobId jobId) { @@ -1133,6 +1142,46 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId, return entity; } + private void publishConfigsOnJobSubmittedEvent(JobSubmittedEvent event, + JobId jobId) { + if (event.getJobConf() == null) { + return; + } + // Publish job configurations both as job and app entity. + // Configs are split into multiple entities if they exceed 100kb in size. + org.apache.hadoop.yarn.api.records.timelineservice. + TimelineEntity jobEntityForConfigs = createJobEntity(jobId); + ApplicationEntity appEntityForConfigs = new ApplicationEntity(); + String appId = jobId.getAppId().toString(); + appEntityForConfigs.setId(appId); + try { + int configSize = 0; + for (Map.Entry entry : event.getJobConf()) { + int size = entry.getKey().length() + entry.getValue().length(); + configSize += size; + if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) { + if (jobEntityForConfigs.getConfigs().size() > 0) { + timelineClient.putEntities(jobEntityForConfigs); + timelineClient.putEntities(appEntityForConfigs); + jobEntityForConfigs = createJobEntity(jobId); + appEntityForConfigs = new ApplicationEntity(); + appEntityForConfigs.setId(appId); + } + configSize = size; + } + jobEntityForConfigs.addConfig(entry.getKey(), entry.getValue()); + appEntityForConfigs.addConfig(entry.getKey(), entry.getValue()); + } + if (configSize > 0) { + timelineClient.putEntities(jobEntityForConfigs); + timelineClient.putEntities(appEntityForConfigs); + } + } catch (IOException | YarnException e) { + LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " + + " for the job : " + jobId, e); + } + } + private void processEventForNewTimelineService(HistoryEvent event, JobId jobId, long timestamp) { org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity = @@ -1252,8 +1301,12 @@ private void processEventForNewTimelineService(HistoryEvent event, } catch (IOException | YarnException e) { LOG.error("Failed to process Event " + event.getEventType() + " for the job : " + jobId, e); + return; + } + if (event.getEventType() == EventType.JOB_SUBMITTED) { + // Publish configs after main job submitted event has been posted. + publishConfigsOnJobSubmittedEvent((JobSubmittedEvent)event, jobId); } - } private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index b7036a5363..5127a43c2c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1445,7 +1445,7 @@ public JobStateInternal transition(JobImpl job, JobEvent event) { job.conf.get(MRJobConfig.WORKFLOW_NAME, ""), job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""), getWorkflowAdjacencies(job.conf), - job.conf.get(MRJobConfig.WORKFLOW_TAGS, "")); + job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""), job.conf); job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); //TODO JH Verify jobACLs, UserName via UGI? diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java index 07edb58c2b..7d05571fff 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java @@ -26,6 +26,7 @@ import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.security.authorize.AccessControlList; @@ -41,6 +42,7 @@ @InterfaceStability.Unstable public class JobSubmittedEvent implements HistoryEvent { private JobSubmitted datum = new JobSubmitted(); + private JobConf jobConf = null; /** * Create an event to record job submission @@ -83,6 +85,31 @@ public JobSubmittedEvent(JobID id, String jobName, String userName, workflowAdjacencies, ""); } + /** + * Create an event to record job submission. + * @param id The job Id of the job + * @param jobName Name of the job + * @param userName Name of the user who submitted the job + * @param submitTime Time of submission + * @param jobConfPath Path of the Job Configuration file + * @param jobACLs The configured acls for the job. + * @param jobQueueName The job-queue to which this job was submitted to + * @param workflowId The Id of the workflow + * @param workflowName The name of the workflow + * @param workflowNodeName The node name of the workflow + * @param workflowAdjacencies The adjacencies of the workflow + * @param workflowTags Comma-separated tags for the workflow + */ + public JobSubmittedEvent(JobID id, String jobName, String userName, + long submitTime, String jobConfPath, + Map jobACLs, String jobQueueName, + String workflowId, String workflowName, String workflowNodeName, + String workflowAdjacencies, String workflowTags) { + this(id, jobName, userName, submitTime, jobConfPath, jobACLs, + jobQueueName, workflowId, workflowName, workflowNodeName, + workflowAdjacencies, workflowTags, null); + } + /** * Create an event to record job submission * @param id The job Id of the job @@ -97,12 +124,13 @@ public JobSubmittedEvent(JobID id, String jobName, String userName, * @param workflowNodeName The node name of the workflow * @param workflowAdjacencies The adjacencies of the workflow * @param workflowTags Comma-separated tags for the workflow + * @param conf Job configuration */ public JobSubmittedEvent(JobID id, String jobName, String userName, long submitTime, String jobConfPath, Map jobACLs, String jobQueueName, String workflowId, String workflowName, String workflowNodeName, - String workflowAdjacencies, String workflowTags) { + String workflowAdjacencies, String workflowTags, JobConf conf) { datum.setJobid(new Utf8(id.toString())); datum.setJobName(new Utf8(jobName)); datum.setUserName(new Utf8(userName)); @@ -132,6 +160,7 @@ public JobSubmittedEvent(JobID id, String jobName, String userName, if (workflowTags != null) { datum.setWorkflowTags(new Utf8(workflowTags)); } + jobConf = conf; } JobSubmittedEvent() {} @@ -208,7 +237,11 @@ public String getWorkflowTags() { } /** Get the event type */ public EventType getEventType() { return EventType.JOB_SUBMITTED; } - + + public JobConf getJobConf() { + return jobConf; + } + @Override public TimelineEvent toTimelineEvent() { TimelineEvent tEvent = new TimelineEvent(); @@ -234,5 +267,4 @@ public TimelineEvent toTimelineEvent() { public Set getTimelineMetrics() { return null; } - } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java index 225d5176ea..35d066c214 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java @@ -36,6 +36,9 @@ public final class JobHistoryEventUtils { private JobHistoryEventUtils() { } + // Number of bytes of config which can be published in one shot to ATSv2. + public static final int ATS_CONFIG_PUBLISH_SIZE_BYTES = 10 * 1024; + public static JsonNode countersToJSON(Counters counters) { ObjectMapper mapper = new ObjectMapper(); ArrayNode nodes = mapper.createArrayNode(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index 5915a43ffa..fde9e64212 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -26,7 +26,9 @@ import java.io.FileReader; import java.io.IOException; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; @@ -55,6 +57,8 @@ import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Sets; + public class TestMRTimelineEventHandling { private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; @@ -191,8 +195,17 @@ public void testMRNewTimelineServiceEventHandling() throws Exception { Path inDir = new Path("input"); Path outDir = new Path("output"); LOG.info("Run 1st job which should be successful."); + JobConf successConf = new JobConf(conf); + successConf.set("dummy_conf1", + UtilsForTests.createConfigValue(51 * 1024)); + successConf.set("dummy_conf2", + UtilsForTests.createConfigValue(51 * 1024)); + successConf.set("huge_dummy_conf1", + UtilsForTests.createConfigValue(101 * 1024)); + successConf.set("huge_dummy_conf2", + UtilsForTests.createConfigValue(101 * 1024)); RunningJob job = - UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir); + UtilsForTests.runJobSucceed(successConf, inDir, outDir); Assert.assertEquals(JobStatus.SUCCEEDED, job.getJobStatus().getState().getValue()); @@ -270,7 +283,11 @@ private void checkNewTimelineEvent(ApplicationId appId, Assert.assertTrue("jobEventFilePath: " + jobEventFilePath + " does not exist.", jobEventFile.exists()); - verifyMetricsWhenEvent(jobEventFile, EventType.JOB_FINISHED.name()); + verifyEntity(jobEventFile, EventType.JOB_FINISHED.name(), + true, false, null); + Set cfgsToCheck = Sets.newHashSet("dummy_conf1", "dummy_conf2", + "huge_dummy_conf1", "huge_dummy_conf2"); + verifyEntity(jobEventFile, null, false, true, cfgsToCheck); // for this test, we expect MR job metrics are published in YARN_APPLICATION String outputAppDir = basePath + "/YARN_APPLICATION/"; @@ -290,7 +307,8 @@ private void checkNewTimelineEvent(ApplicationId appId, "appEventFilePath: " + appEventFilePath + " does not exist.", appEventFile.exists()); - verifyMetricsWhenEvent(appEventFile, null); + verifyEntity(appEventFile, null, true, false, null); + verifyEntity(appEventFile, null, false, true, cfgsToCheck); // check for task event file String outputDirTask = basePath + "/MAPREDUCE_TASK/"; @@ -307,7 +325,8 @@ private void checkNewTimelineEvent(ApplicationId appId, Assert.assertTrue("taskEventFileName: " + taskEventFilePath + " does not exist.", taskEventFile.exists()); - verifyMetricsWhenEvent(taskEventFile, EventType.TASK_FINISHED.name()); + verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(), + true, false, null); // check for task attempt event file String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/"; @@ -324,17 +343,30 @@ private void checkNewTimelineEvent(ApplicationId appId, File taskAttemptEventFile = new File(taskAttemptEventFilePath); Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath + " does not exist.", taskAttemptEventFile.exists()); - verifyMetricsWhenEvent(taskAttemptEventFile, - EventType.MAP_ATTEMPT_FINISHED.name()); + verifyEntity(taskAttemptEventFile, EventType.MAP_ATTEMPT_FINISHED.name(), + true, false, null); } - private void verifyMetricsWhenEvent(File entityFile, String eventId) + /** + * Verifies entity by reading the entity file written via FS impl. + * @param entityFile File to be read. + * @param eventId Event to be checked. + * @param chkMetrics If event is not null, this flag determines if metrics + * exist when the event is encountered. If event is null, we merely check + * if metrics exist in the entity file. + * @param chkCfg If event is not null, this flag determines if configs + * exist when the event is encountered. If event is null, we merely check + * if configs exist in the entity file. + * @param cfgsToVerify a set of configs which should exist in the entity file. + * @throws IOException + */ + private void verifyEntity(File entityFile, String eventId, + boolean chkMetrics, boolean chkCfg, Set cfgsToVerify) throws IOException { BufferedReader reader = null; String strLine; try { reader = new BufferedReader(new FileReader(entityFile)); - boolean jobMetricsFoundForAppEntity = false; while ((strLine = reader.readLine()) != null) { if (strLine.trim().length() > 0) { org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = @@ -344,23 +376,57 @@ private void verifyMetricsWhenEvent(File entityFile, String eventId) if (eventId == null) { // Job metrics are published without any events for // ApplicationEntity. There is also possibility that some other - // ApplicationEntity is published without events, hence loop all - if (entity.getEvents().size() == 0) { - jobMetricsFoundForAppEntity = entity.getMetrics().size() > 0; - if (jobMetricsFoundForAppEntity) { + // ApplicationEntity is published without events, hence loop till + // its found. Same applies to configs. + if (chkMetrics && entity.getMetrics().size() > 0) { + return; + } + if (chkCfg && entity.getConfigs().size() > 0) { + if (cfgsToVerify == null) { return; + } else { + // Have configs to verify. Keep on removing configs from the set + // of configs to verify as they are found. When the all the + // entities have been looped through, we will check if the set + // is empty or not(indicating if all configs have been found or + // not). + for (Iterator itr = + cfgsToVerify.iterator(); itr.hasNext();) { + String config = itr.next(); + if (entity.getConfigs().containsKey(config)) { + itr.remove(); + } + } + // All the required configs have been verified, so return. + if (cfgsToVerify.isEmpty()) { + return; + } } } } else { for (TimelineEvent event : entity.getEvents()) { if (event.getId().equals(eventId)) { - assertTrue(entity.getMetrics().size() > 0); + if (chkMetrics) { + assertTrue(entity.getMetrics().size() > 0); + } + if (chkCfg) { + assertTrue(entity.getConfigs().size() > 0); + if (cfgsToVerify != null) { + for (String cfg : cfgsToVerify) { + assertTrue(entity.getConfigs().containsKey(cfg)); + } + } + } return; } } } } } + if (cfgsToVerify != null) { + assertTrue(cfgsToVerify.isEmpty()); + return; + } fail("Expected event : " + eventId + " not found in the file " + entityFile); } finally { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java index cb494db697..2fb6828e92 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java @@ -156,6 +156,14 @@ public static String regexpEscape(String plain) { return buf.toString(); } + public static String createConfigValue(int msgSize) { + StringBuilder sb = new StringBuilder(msgSize); + for (int i=0; i