diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java new file mode 100644 index 0000000000..9d051dfde4 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; + +class JobHistoryFileParser { + private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class); + + private final FileSystem fs; + + public JobHistoryFileParser(FileSystem fs) { + LOG.info("JobHistoryFileParser created with " + fs); + this.fs = fs; + } + + public JobInfo parseHistoryFile(Path path) throws IOException { + LOG.info("parsing job history file " + path); + JobHistoryParser parser = new JobHistoryParser(fs, path); + return parser.parse(); + } + + public Configuration parseConfiguration(Path path) throws IOException { + LOG.info("parsing job configuration file " + path); + Configuration conf = new Configuration(false); + conf.addResource(fs.open(path)); + return conf; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java new file mode 100644 index 0000000000..802b78ff46 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter; +import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; + +/** + * Mapper for TimelineServicePerformanceV2 that replays job history files to the + * timeline service. + * + */ +class JobHistoryFileReplayMapper extends EntityWriter { + private static final Log LOG = + LogFactory.getLog(JobHistoryFileReplayMapper.class); + + static final String PROCESSING_PATH = "processing path"; + static final String REPLAY_MODE = "replay mode"; + static final int WRITE_ALL_AT_ONCE = 1; + static final int WRITE_PER_ENTITY = 2; + static final int REPLAY_MODE_DEFAULT = WRITE_ALL_AT_ONCE; + + private static final Pattern JOB_ID_PARSER = + Pattern.compile("^(job_[0-9]+_([0-9]+)).*"); + + public static class JobFiles { + private final String jobId; + private Path jobHistoryFilePath; + private Path jobConfFilePath; + + public JobFiles(String jobId) { + this.jobId = jobId; + } + + public String getJobId() { + return jobId; + } + + public Path getJobHistoryFilePath() { + return jobHistoryFilePath; + } + + public void setJobHistoryFilePath(Path jobHistoryFilePath) { + this.jobHistoryFilePath = jobHistoryFilePath; + } + + public Path getJobConfFilePath() { + return jobConfFilePath; + } + + public void setJobConfFilePath(Path jobConfFilePath) { + this.jobConfFilePath = jobConfFilePath; + } + + @Override + public int hashCode() { + return jobId.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + JobFiles other = (JobFiles) obj; + return jobId.equals(other.jobId); + } + } + + private enum FileType { JOB_HISTORY_FILE, JOB_CONF_FILE, UNKNOWN } + + + @Override + protected void writeEntities(Configuration tlConf, + TimelineCollectorManager manager, Context context) throws IOException { + // collect the apps it needs to process + Configuration conf = context.getConfiguration(); + int taskId = context.getTaskAttemptID().getTaskID().getId(); + int size = conf.getInt(MRJobConfig.NUM_MAPS, + TimelineServicePerformanceV2.NUM_MAPS_DEFAULT); + String processingDir = + conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH); + int replayMode = + conf.getInt(JobHistoryFileReplayMapper.REPLAY_MODE, + JobHistoryFileReplayMapper.REPLAY_MODE_DEFAULT); + Path processingPath = new Path(processingDir); + FileSystem processingFs = processingPath.getFileSystem(conf); + JobHistoryFileParser parser = new JobHistoryFileParser(processingFs); + TimelineEntityConverter converter = new TimelineEntityConverter(); + + Collection jobs = + selectJobFiles(processingFs, processingPath, taskId, size); + if (jobs.isEmpty()) { + LOG.info(context.getTaskAttemptID().getTaskID() + + " will process no jobs"); + } else { + LOG.info(context.getTaskAttemptID().getTaskID() + " will process " + + jobs.size() + " jobs"); + } + for (JobFiles job: jobs) { + // process each job + String jobIdStr = job.getJobId(); + LOG.info("processing " + jobIdStr + "..."); + JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr)); + ApplicationId appId = jobId.getAppId(); + + // create the app level timeline collector and start it + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + manager.putIfAbsent(appId, collector); + try { + // parse the job info and configuration + JobInfo jobInfo = + parser.parseHistoryFile(job.getJobHistoryFilePath()); + Configuration jobConf = + parser.parseConfiguration(job.getJobConfFilePath()); + LOG.info("parsed the job history file and the configuration file for job" + + jobIdStr); + + // set the context + // flow id: job name, flow run id: timestamp, user id + TimelineCollectorContext tlContext = + collector.getTimelineEntityContext(); + tlContext.setFlowName(jobInfo.getJobname()); + tlContext.setFlowRunId(jobInfo.getSubmitTime()); + tlContext.setUserId(jobInfo.getUsername()); + + // create entities from job history and write them + long totalTime = 0; + Set entitySet = + converter.createTimelineEntities(jobInfo, jobConf); + LOG.info("converted them into timeline entities for job " + jobIdStr); + // use the current user for this purpose + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + long startWrite = System.nanoTime(); + try { + switch (replayMode) { + case JobHistoryFileReplayMapper.WRITE_ALL_AT_ONCE: + writeAllEntities(collector, entitySet, ugi); + break; + case JobHistoryFileReplayMapper.WRITE_PER_ENTITY: + writePerEntity(collector, entitySet, ugi); + break; + default: + break; + } + } catch (Exception e) { + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES). + increment(1); + LOG.error("writing to the timeline service failed", e); + } + long endWrite = System.nanoTime(); + totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite); + int numEntities = entitySet.size(); + LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms"); + + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME). + increment(totalTime); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). + increment(numEntities); + } finally { + manager.remove(appId); + context.progress(); // move it along + } + } + } + + private void writeAllEntities(AppLevelTimelineCollector collector, + Set entitySet, UserGroupInformation ugi) + throws IOException { + TimelineEntities entities = new TimelineEntities(); + entities.setEntities(entitySet); + collector.putEntities(entities, ugi); + } + + private void writePerEntity(AppLevelTimelineCollector collector, + Set entitySet, UserGroupInformation ugi) + throws IOException { + for (TimelineEntity entity : entitySet) { + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + collector.putEntities(entities, ugi); + LOG.info("wrote entity " + entity.getId()); + } + } + + private Collection selectJobFiles(FileSystem fs, + Path processingRoot, int i, int size) throws IOException { + Map jobs = new HashMap<>(); + RemoteIterator it = fs.listFiles(processingRoot, true); + while (it.hasNext()) { + LocatedFileStatus status = it.next(); + Path path = status.getPath(); + String fileName = path.getName(); + Matcher m = JOB_ID_PARSER.matcher(fileName); + if (!m.matches()) { + continue; + } + String jobId = m.group(1); + int lastId = Integer.parseInt(m.group(2)); + int mod = lastId % size; + if (mod != i) { + continue; + } + LOG.info("this mapper will process file " + fileName); + // it's mine + JobFiles jobFiles = jobs.get(jobId); + if (jobFiles == null) { + jobFiles = new JobFiles(jobId); + jobs.put(jobId, jobFiles); + } + setFilePath(fileName, path, jobFiles); + } + return jobs.values(); + } + + private void setFilePath(String fileName, Path path, + JobFiles jobFiles) { + // determine if we're dealing with a job history file or a job conf file + FileType type = getFileType(fileName); + switch (type) { + case JOB_HISTORY_FILE: + if (jobFiles.getJobHistoryFilePath() == null) { + jobFiles.setJobHistoryFilePath(path); + } else { + LOG.warn("we already have the job history file " + + jobFiles.getJobHistoryFilePath() + ": skipping " + path); + } + break; + case JOB_CONF_FILE: + if (jobFiles.getJobConfFilePath() == null) { + jobFiles.setJobConfFilePath(path); + } else { + LOG.warn("we already have the job conf file " + + jobFiles.getJobConfFilePath() + ": skipping " + path); + } + break; + case UNKNOWN: + LOG.warn("unknown type: " + path); + } + } + + private FileType getFileType(String fileName) { + if (fileName.endsWith(".jhist")) { + return FileType.JOB_HISTORY_FILE; + } + if (fileName.endsWith("_conf.xml")) { + return FileType.JOB_CONF_FILE; + } + return FileType.UNKNOWN; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java new file mode 100644 index 0000000000..4ef0a14bfe --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter; +import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; + +/** + * Adds simple entities with random string payload, events, metrics, and + * configuration. + */ +class SimpleEntityWriter extends EntityWriter { + private static final Log LOG = LogFactory.getLog(SimpleEntityWriter.class); + + // constants for mtype = 1 + static final String KBS_SENT = "kbs sent"; + static final int KBS_SENT_DEFAULT = 1; + static final String TEST_TIMES = "testtimes"; + static final int TEST_TIMES_DEFAULT = 100; + static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID = + "timeline.server.performance.run.id"; + + protected void writeEntities(Configuration tlConf, + TimelineCollectorManager manager, Context context) throws IOException { + Configuration conf = context.getConfiguration(); + // simulate the app id with the task id + int taskId = context.getTaskAttemptID().getTaskID().getId(); + long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0); + ApplicationId appId = ApplicationId.newInstance(timestamp, taskId); + + // create the app level timeline collector + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + manager.putIfAbsent(appId, collector); + + try { + // set the context + // flow id: job name, flow run id: timestamp, user id + TimelineCollectorContext tlContext = + collector.getTimelineEntityContext(); + tlContext.setFlowName(context.getJobName()); + tlContext.setFlowRunId(timestamp); + tlContext.setUserId(context.getUser()); + + final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT); + + long totalTime = 0; + final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT); + final Random rand = new Random(); + final TaskAttemptID taskAttemptId = context.getTaskAttemptID(); + final char[] payLoad = new char[kbs * 1024]; + + for (int i = 0; i < testtimes; i++) { + // Generate a fixed length random payload + for (int xx = 0; xx < kbs * 1024; xx++) { + int alphaNumIdx = + rand.nextInt(TimelineServicePerformanceV2.alphaNums.length); + payLoad[xx] = TimelineServicePerformanceV2.alphaNums[alphaNumIdx]; + } + String entId = taskAttemptId + "_" + Integer.toString(i); + final TimelineEntity entity = new TimelineEntity(); + entity.setId(entId); + entity.setType("FOO_ATTEMPT"); + entity.addInfo("PERF_TEST", payLoad); + // add an event + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.addInfo("foo_event", "test"); + entity.addEvent(event); + // add a metric + TimelineMetric metric = new TimelineMetric(); + metric.setId("foo_metric"); + metric.addValue(System.currentTimeMillis(), 123456789L); + entity.addMetric(metric); + // add a config + entity.addConfig("foo", "bar"); + + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + // use the current user for this purpose + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + long startWrite = System.nanoTime(); + try { + collector.putEntities(entities, ugi); + } catch (Exception e) { + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES). + increment(1); + LOG.error("writing to the timeline service failed", e); + } + long endWrite = System.nanoTime(); + totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite); + } + LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes + + " kB) in " + totalTime + " ms"); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME). + increment(totalTime); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). + increment(testtimes); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS). + increment(kbs*testtimes); + } finally { + // clean up + manager.remove(appId); + } + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java new file mode 100644 index 0000000000..80928dc572 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; + +class TimelineEntityConverter { + private static final Log LOG = + LogFactory.getLog(TimelineEntityConverter.class); + + static final String JOB = "MAPREDUCE_JOB"; + static final String TASK = "MAPREDUCE_TASK"; + static final String TASK_ATTEMPT = "MAPREDUCE_TASK_ATTEMPT"; + + /** + * Creates job, task, and task attempt entities based on the job history info + * and configuration. + * + * Note: currently these are plan timeline entities created for mapreduce + * types. These are not meant to be the complete and accurate entity set-up + * for mapreduce jobs. We do not leverage hierarchical timeline entities. If + * we create canonical mapreduce hierarchical timeline entities with proper + * parent-child relationship, we could modify this to use that instead. + * + * Note that we also do not add info to the YARN application entity, which + * would be needed for aggregation. + */ + public Set createTimelineEntities(JobInfo jobInfo, + Configuration conf) { + Set entities = new HashSet<>(); + + // create the job entity + TimelineEntity job = createJobEntity(jobInfo, conf); + entities.add(job); + + // create the task and task attempt entities + Set tasksAndAttempts = + createTaskAndTaskAttemptEntities(jobInfo); + entities.addAll(tasksAndAttempts); + + return entities; + } + + private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) { + TimelineEntity job = new TimelineEntity(); + job.setType(JOB); + job.setId(jobInfo.getJobId().toString()); + job.setCreatedTime(jobInfo.getSubmitTime()); + + job.addInfo("JOBNAME", jobInfo.getJobname()); + job.addInfo("USERNAME", jobInfo.getUsername()); + job.addInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName()); + job.addInfo("SUBMIT_TIME", jobInfo.getSubmitTime()); + job.addInfo("LAUNCH_TIME", jobInfo.getLaunchTime()); + job.addInfo("FINISH_TIME", jobInfo.getFinishTime()); + job.addInfo("JOB_STATUS", jobInfo.getJobStatus()); + job.addInfo("PRIORITY", jobInfo.getPriority()); + job.addInfo("TOTAL_MAPS", jobInfo.getTotalMaps()); + job.addInfo("TOTAL_REDUCES", jobInfo.getTotalReduces()); + job.addInfo("UBERIZED", jobInfo.getUberized()); + job.addInfo("ERROR_INFO", jobInfo.getErrorInfo()); + + // add metrics from total counters + // we omit the map counters and reduce counters for now as it's kind of + // awkward to put them (map/reduce/total counters are really a group of + // related counters) + Counters totalCounters = jobInfo.getTotalCounters(); + if (totalCounters != null) { + addMetrics(job, totalCounters); + } + // finally add configuration to the job + addConfiguration(job, conf); + LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity"); + return job; + } + + private void addConfiguration(TimelineEntity job, Configuration conf) { + for (Map.Entry e: conf) { + job.addConfig(e.getKey(), e.getValue()); + } + } + + private void addMetrics(TimelineEntity entity, Counters counters) { + for (CounterGroup g: counters) { + String groupName = g.getName(); + for (Counter c: g) { + String name = groupName + ":" + c.getName(); + TimelineMetric metric = new TimelineMetric(); + metric.setId(name); + metric.addValue(System.currentTimeMillis(), c.getValue()); + entity.addMetric(metric); + } + } + } + + private Set createTaskAndTaskAttemptEntities(JobInfo jobInfo) { + Set entities = new HashSet<>(); + Map taskInfoMap = jobInfo.getAllTasks(); + LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() + + " tasks"); + for (TaskInfo taskInfo: taskInfoMap.values()) { + TimelineEntity task = createTaskEntity(taskInfo); + entities.add(task); + // add the task attempts from this task + Set taskAttempts = createTaskAttemptEntities(taskInfo); + entities.addAll(taskAttempts); + } + return entities; + } + + private TimelineEntity createTaskEntity(TaskInfo taskInfo) { + TimelineEntity task = new TimelineEntity(); + task.setType(TASK); + task.setId(taskInfo.getTaskId().toString()); + task.setCreatedTime(taskInfo.getStartTime()); + + task.addInfo("START_TIME", taskInfo.getStartTime()); + task.addInfo("FINISH_TIME", taskInfo.getFinishTime()); + task.addInfo("TASK_TYPE", taskInfo.getTaskType()); + task.addInfo("TASK_STATUS", taskInfo.getTaskStatus()); + task.addInfo("ERROR_INFO", taskInfo.getError()); + + // add metrics from counters + Counters counters = taskInfo.getCounters(); + if (counters != null) { + addMetrics(task, counters); + } + LOG.info("converted task " + taskInfo.getTaskId() + + " to a timeline entity"); + return task; + } + + private Set createTaskAttemptEntities(TaskInfo taskInfo) { + Set taskAttempts = new HashSet(); + Map taskAttemptInfoMap = + taskInfo.getAllTaskAttempts(); + LOG.info("task " + taskInfo.getTaskId() + " has " + + taskAttemptInfoMap.size() + " task attempts"); + for (TaskAttemptInfo taskAttemptInfo: taskAttemptInfoMap.values()) { + TimelineEntity taskAttempt = createTaskAttemptEntity(taskAttemptInfo); + taskAttempts.add(taskAttempt); + } + return taskAttempts; + } + + private TimelineEntity createTaskAttemptEntity(TaskAttemptInfo taskAttemptInfo) { + TimelineEntity taskAttempt = new TimelineEntity(); + taskAttempt.setType(TASK_ATTEMPT); + taskAttempt.setId(taskAttemptInfo.getAttemptId().toString()); + taskAttempt.setCreatedTime(taskAttemptInfo.getStartTime()); + + taskAttempt.addInfo("START_TIME", taskAttemptInfo.getStartTime()); + taskAttempt.addInfo("FINISH_TIME", taskAttemptInfo.getFinishTime()); + taskAttempt.addInfo("MAP_FINISH_TIME", + taskAttemptInfo.getMapFinishTime()); + taskAttempt.addInfo("SHUFFLE_FINISH_TIME", + taskAttemptInfo.getShuffleFinishTime()); + taskAttempt.addInfo("SORT_FINISH_TIME", + taskAttemptInfo.getSortFinishTime()); + taskAttempt.addInfo("TASK_STATUS", taskAttemptInfo.getTaskStatus()); + taskAttempt.addInfo("STATE", taskAttemptInfo.getState()); + taskAttempt.addInfo("ERROR", taskAttemptInfo.getError()); + taskAttempt.addInfo("CONTAINER_ID", + taskAttemptInfo.getContainerId().toString()); + + // add metrics from counters + Counters counters = taskAttemptInfo.getCounters(); + if (counters != null) { + addMetrics(taskAttempt, counters); + } + LOG.info("converted task attempt " + taskAttemptInfo.getAttemptId() + + " to a timeline entity"); + return taskAttempt; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java index 1c2e28df44..f674ae1be0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java @@ -20,10 +20,7 @@ import java.io.IOException; import java.util.Date; -import java.util.Random; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.IntWritable; @@ -31,49 +28,35 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; public class TimelineServicePerformanceV2 extends Configured implements Tool { - private static final Log LOG = - LogFactory.getLog(TimelineServicePerformanceV2.class); - static final int NUM_MAPS_DEFAULT = 1; static final int SIMPLE_ENTITY_WRITER = 1; - // constants for mtype = 1 - static final String KBS_SENT = "kbs sent"; - static final int KBS_SENT_DEFAULT = 1; - static final String TEST_TIMES = "testtimes"; - static final int TEST_TIMES_DEFAULT = 100; - static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID = - "timeline.server.performance.run.id"; - + static final int JOB_HISTORY_FILE_REPLAY_MAPPER = 2; static int mapperType = SIMPLE_ENTITY_WRITER; protected static int printUsage() { - // TODO is there a way to handle mapper-specific options more gracefully? System.err.println( "Usage: [-m ] number of mappers (default: " + NUM_MAPS_DEFAULT + ")\n" + - " [-mtype ] \n" + + " [-mtype ]\n" + " 1. simple entity write mapper\n" + - " [-s <(KBs)test>] number of KB per put (default: " + - KBS_SENT_DEFAULT + " KB)\n" + - " [-t] package sending iterations per mapper (default: " + - TEST_TIMES_DEFAULT + ")\n"); + " 2. job history file replay mapper\n" + + " [-s <(KBs)test>] number of KB per put (mtype=1, default: " + + SimpleEntityWriter.KBS_SENT_DEFAULT + " KB)\n" + + " [-t] package sending iterations per mapper (mtype=1, default: " + + SimpleEntityWriter.TEST_TIMES_DEFAULT + ")\n" + + " [-d ] root path of job history files (mtype=2)\n" + + " [-r ] (mtype=2)\n" + + " 1. write all entities for a job in one put (default)\n" + + " 2. write one entity at a time\n"); GenericOptionsParser.printGenericCommandUsage(System.err); return -1; } @@ -82,11 +65,9 @@ protected static int printUsage() { * Configure a job given argv. */ public static boolean parseArgs(String[] args, Job job) throws IOException { - // set the defaults + // set the common defaults Configuration conf = job.getConfiguration(); conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT); - conf.setInt(KBS_SENT, KBS_SENT_DEFAULT); - conf.setInt(TEST_TIMES, TEST_TIMES_DEFAULT); for (int i = 0; i < args.length; i++) { if (args.length == i + 1) { @@ -97,25 +78,24 @@ public static boolean parseArgs(String[] args, Job job) throws IOException { if ("-m".equals(args[i])) { if (Integer.parseInt(args[++i]) > 0) { job.getConfiguration() - .setInt(MRJobConfig.NUM_MAPS, (Integer.parseInt(args[i]))); + .setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i])); } } else if ("-mtype".equals(args[i])) { mapperType = Integer.parseInt(args[++i]); - switch (mapperType) { - case SIMPLE_ENTITY_WRITER: - job.setMapperClass(SimpleEntityWriter.class); - break; - default: - job.setMapperClass(SimpleEntityWriter.class); - } } else if ("-s".equals(args[i])) { if (Integer.parseInt(args[++i]) > 0) { - conf.setInt(KBS_SENT, (Integer.parseInt(args[i]))); + conf.setInt(SimpleEntityWriter.KBS_SENT, Integer.parseInt(args[i])); } } else if ("-t".equals(args[i])) { if (Integer.parseInt(args[++i]) > 0) { - conf.setInt(TEST_TIMES, (Integer.parseInt(args[i]))); + conf.setInt(SimpleEntityWriter.TEST_TIMES, + Integer.parseInt(args[i])); } + } else if ("-d".equals(args[i])) { + conf.set(JobHistoryFileReplayMapper.PROCESSING_PATH, args[++i]); + } else if ("-r".equals(args[i])) { + conf.setInt(JobHistoryFileReplayMapper.REPLAY_MODE, + Integer.parseInt(args[++i])); } else { System.out.println("Unexpected argument: " + args[i]); return printUsage() == 0; @@ -128,6 +108,27 @@ public static boolean parseArgs(String[] args, Job job) throws IOException { } } + // handle mapper-specific settings + switch (mapperType) { + case JOB_HISTORY_FILE_REPLAY_MAPPER: + job.setMapperClass(JobHistoryFileReplayMapper.class); + String processingPath = + conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH); + if (processingPath == null || processingPath.isEmpty()) { + System.out.println("processing path is missing while mtype = 2"); + return printUsage() == 0; + } + break; + case SIMPLE_ENTITY_WRITER: + default: + job.setMapperClass(SimpleEntityWriter.class); + // use the current timestamp as the "run id" of the test: this will + // be used as simulating the cluster timestamp for apps + conf.setLong(SimpleEntityWriter.TIMELINE_SERVICE_PERFORMANCE_RUN_ID, + System.currentTimeMillis()); + break; + } + return true; } @@ -153,13 +154,6 @@ public int run(String[] args) throws Exception { return -1; } - // for mtype = 1 - // use the current timestamp as the "run id" of the test: this will be used - // as simulating the cluster timestamp for apps - Configuration conf = job.getConfiguration(); - conf.setLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, - System.currentTimeMillis()); - Date startTime = new Date(); System.out.println("Job started: " + startTime); int ret = job.waitForCompletion(true) ? 0 : 1; @@ -172,7 +166,8 @@ public int run(String[] args) throws Exception { counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue(); double transacrate = writecounts * 1000 / (double)writetime; double iorate = writesize * 1000 / (double)writetime; - int numMaps = Integer.parseInt(conf.get(MRJobConfig.NUM_MAPS)); + int numMaps = + Integer.parseInt(job.getConfiguration().get(MRJobConfig.NUM_MAPS)); System.out.println("TRANSACTION RATE (per mapper): " + transacrate + " ops/s"); @@ -204,95 +199,31 @@ public static void main(String[] args) throws Exception { '3', '4', '5', '6', '7', '8', '9', '0', ' ' }; /** - * Adds simple entities with random string payload, events, metrics, and - * configuration. + * Base mapper for writing entities to the timeline service. Subclasses + * override {@link #writeEntities(Configuration, TimelineCollectorManager, + * org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities + * to the timeline service. */ - public static class SimpleEntityWriter + public static abstract class EntityWriter extends org.apache.hadoop.mapreduce.Mapper { + @Override public void map(IntWritable key, IntWritable val, Context context) throws IOException { - Configuration conf = context.getConfiguration(); - // simulate the app id with the task id - int taskId = context.getTaskAttemptID().getTaskID().getId(); - long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0); - ApplicationId appId = ApplicationId.newInstance(timestamp, taskId); - - // create the app level timeline collector + // create the timeline collector manager wired with the writer Configuration tlConf = new YarnConfiguration(); - AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId); - collector.init(tlConf); - collector.start(); - + TimelineCollectorManager manager = new TimelineCollectorManager("test"); + manager.init(tlConf); + manager.start(); try { - // set the context - // flow id: job name, flow run id: timestamp, user id - TimelineCollectorContext tlContext = - collector.getTimelineEntityContext(); - tlContext.setFlowName(context.getJobName()); - tlContext.setFlowRunId(timestamp); - tlContext.setUserId(context.getUser()); - - final int kbs = Integer.parseInt(conf.get(KBS_SENT)); - - long totalTime = 0; - final int testtimes = Integer.parseInt(conf.get(TEST_TIMES)); - final Random rand = new Random(); - final TaskAttemptID taskAttemptId = context.getTaskAttemptID(); - final char[] payLoad = new char[kbs * 1024]; - - for (int i = 0; i < testtimes; i++) { - // Generate a fixed length random payload - for (int xx = 0; xx < kbs * 1024; xx++) { - int alphaNumIdx = rand.nextInt(alphaNums.length); - payLoad[xx] = alphaNums[alphaNumIdx]; - } - String entId = taskAttemptId + "_" + Integer.toString(i); - final TimelineEntity entity = new TimelineEntity(); - entity.setId(entId); - entity.setType("FOO_ATTEMPT"); - entity.addInfo("PERF_TEST", payLoad); - // add an event - TimelineEvent event = new TimelineEvent(); - event.setTimestamp(System.currentTimeMillis()); - event.addInfo("foo_event", "test"); - entity.addEvent(event); - // add a metric - TimelineMetric metric = new TimelineMetric(); - metric.setId("foo_metric"); - metric.addValue(System.currentTimeMillis(), 123456789L); - entity.addMetric(metric); - // add a config - entity.addConfig("foo", "bar"); - - TimelineEntities entities = new TimelineEntities(); - entities.addEntity(entity); - // use the current user for this purpose - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - long startWrite = System.nanoTime(); - try { - collector.putEntities(entities, ugi); - } catch (Exception e) { - context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES). - increment(1); - e.printStackTrace(); - } - long endWrite = System.nanoTime(); - totalTime += (endWrite-startWrite)/1000000L; - } - LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes + - " kB) in " + totalTime + " ms"); - context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME). - increment(totalTime); - context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). - increment(testtimes); - context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS). - increment(kbs*testtimes); + // invoke the method to have the subclass write entities + writeEntities(tlConf, manager, context); } finally { - // clean up - collector.close(); + manager.close(); } } + + protected abstract void writeEntities(Configuration tlConf, + TimelineCollectorManager manager, Context context) throws IOException; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index e9f2085d70..165754d375 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -48,7 +48,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Unstable -public abstract class TimelineCollectorManager extends AbstractService { +public class TimelineCollectorManager extends AbstractService { private static final Log LOG = LogFactory.getLog(TimelineCollectorManager.class); @@ -90,10 +90,14 @@ protected void serviceStart() throws Exception { Collections.synchronizedMap( new HashMap()); - protected TimelineCollectorManager(String name) { + public TimelineCollectorManager(String name) { super(name); } + protected TimelineWriter getWriter() { + return writer; + } + /** * Put the collector into the collection if an collector mapped by id does * not exist. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index b22b39f45c..4385bbc3ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -47,17 +47,17 @@ public class FileSystemTimelineWriterImpl extends AbstractService private String outputRoot; - /** Config param for timeline service storage tmp root for FILE YARN-3264 */ + /** Config param for timeline service storage tmp root for FILE YARN-3264. */ public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT - = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; + = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; - /** default value for storage location on local disk */ + /** default value for storage location on local disk. */ public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT - = "/tmp/timeline_service_data"; + = "/tmp/timeline_service_data"; public static final String ENTITIES_DIR = "entities"; - /** Default extension for output files */ + /** Default extension for output files. */ public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; FileSystemTimelineWriterImpl() { @@ -81,9 +81,11 @@ private synchronized void write(String clusterId, String userId, String flowName TimelineWriteResponse response) throws IOException { PrintWriter out = null; try { - String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowName, - flowVersion, String.valueOf(flowRun), appId, entity.getType()); - String fileName = dir + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION; + String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId, + escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId, + entity.getType()); + String fileName = dir + entity.getId() + + TIMELINE_SERVICE_STORAGE_EXTENSION; out = new PrintWriter(new BufferedWriter(new OutputStreamWriter( new FileOutputStream(fileName, true), "UTF-8"))); @@ -145,4 +147,9 @@ private static String mkdirs(String... dirStrs) throws IOException { } return path.toString(); } + + // specifically escape the separator character + private static String escape(String str) { + return str.replace(File.separatorChar, '_'); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java new file mode 100644 index 0000000000..f652ffd2b6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability;