YARN-2556. Tool to measure the performance of the timeline server (Chang Li via sjlee)
This commit is contained in:
parent
f9da5cdb2b
commit
58590fef49
@ -0,0 +1,53 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||
|
||||
class JobHistoryFileParser {
|
||||
private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class);
|
||||
|
||||
private final FileSystem fs;
|
||||
|
||||
public JobHistoryFileParser(FileSystem fs) {
|
||||
LOG.info("JobHistoryFileParser created with " + fs);
|
||||
this.fs = fs;
|
||||
}
|
||||
|
||||
public JobInfo parseHistoryFile(Path path) throws IOException {
|
||||
LOG.info("parsing job history file " + path);
|
||||
JobHistoryParser parser = new JobHistoryParser(fs, path);
|
||||
return parser.parse();
|
||||
}
|
||||
|
||||
public Configuration parseConfiguration(Path path) throws IOException {
|
||||
LOG.info("parsing job configuration file " + path);
|
||||
Configuration conf = new Configuration(false);
|
||||
conf.addResource(fs.open(path));
|
||||
return conf;
|
||||
}
|
||||
}
|
@ -0,0 +1,196 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.mapreduce.Mapper.Context;
|
||||
|
||||
class JobHistoryFileReplayHelper {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(JobHistoryFileReplayHelper.class);
|
||||
static final String PROCESSING_PATH = "processing path";
|
||||
static final String REPLAY_MODE = "replay mode";
|
||||
static final int WRITE_ALL_AT_ONCE = 1;
|
||||
static final int WRITE_PER_ENTITY = 2;
|
||||
static final int REPLAY_MODE_DEFAULT = WRITE_ALL_AT_ONCE;
|
||||
|
||||
private static Pattern JOB_ID_PARSER =
|
||||
Pattern.compile("^(job_[0-9]+_([0-9]+)).*");
|
||||
private enum FileType { JOB_HISTORY_FILE, JOB_CONF_FILE, UNKNOWN };
|
||||
JobHistoryFileParser parser;
|
||||
int replayMode;
|
||||
Collection<JobFiles> jobFiles;
|
||||
|
||||
JobHistoryFileReplayHelper(Context context) throws IOException {
|
||||
Configuration conf = context.getConfiguration();
|
||||
int taskId = context.getTaskAttemptID().getTaskID().getId();
|
||||
int size = conf.getInt(MRJobConfig.NUM_MAPS,
|
||||
TimelineServicePerformance.NUM_MAPS_DEFAULT);
|
||||
replayMode = conf.getInt(JobHistoryFileReplayHelper.REPLAY_MODE,
|
||||
JobHistoryFileReplayHelper.REPLAY_MODE_DEFAULT);
|
||||
String processingDir =
|
||||
conf.get(JobHistoryFileReplayHelper.PROCESSING_PATH);
|
||||
|
||||
Path processingPath = new Path(processingDir);
|
||||
FileSystem processingFs = processingPath.getFileSystem(conf);
|
||||
parser = new JobHistoryFileParser(processingFs);
|
||||
jobFiles = selectJobFiles(processingFs, processingPath, taskId, size);
|
||||
}
|
||||
|
||||
public int getReplayMode() {
|
||||
return replayMode;
|
||||
}
|
||||
|
||||
public Collection<JobFiles> getJobFiles() {
|
||||
return jobFiles;
|
||||
}
|
||||
|
||||
public JobHistoryFileParser getParser() {
|
||||
return parser;
|
||||
}
|
||||
|
||||
public static class JobFiles {
|
||||
private final String jobId;
|
||||
private Path jobHistoryFilePath;
|
||||
private Path jobConfFilePath;
|
||||
|
||||
public JobFiles(String jobId) {
|
||||
this.jobId = jobId;
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
return jobId;
|
||||
}
|
||||
|
||||
public Path getJobHistoryFilePath() {
|
||||
return jobHistoryFilePath;
|
||||
}
|
||||
|
||||
public void setJobHistoryFilePath(Path jobHistoryFilePath) {
|
||||
this.jobHistoryFilePath = jobHistoryFilePath;
|
||||
}
|
||||
|
||||
public Path getJobConfFilePath() {
|
||||
return jobConfFilePath;
|
||||
}
|
||||
|
||||
public void setJobConfFilePath(Path jobConfFilePath) {
|
||||
this.jobConfFilePath = jobConfFilePath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return jobId.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
JobFiles other = (JobFiles) obj;
|
||||
return jobId.equals(other.jobId);
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<JobFiles> selectJobFiles(FileSystem fs,
|
||||
Path processingRoot, int i, int size) throws IOException {
|
||||
Map<String, JobFiles> jobs = new HashMap<>();
|
||||
RemoteIterator<LocatedFileStatus> it = fs.listFiles(processingRoot, true);
|
||||
while (it.hasNext()) {
|
||||
LocatedFileStatus status = it.next();
|
||||
Path path = status.getPath();
|
||||
String fileName = path.getName();
|
||||
Matcher m = JOB_ID_PARSER.matcher(fileName);
|
||||
if (!m.matches()) {
|
||||
continue;
|
||||
}
|
||||
String jobId = m.group(1);
|
||||
int lastId = Integer.parseInt(m.group(2));
|
||||
int mod = lastId % size;
|
||||
if (mod != i) {
|
||||
continue;
|
||||
}
|
||||
LOG.info("this mapper will process file " + fileName);
|
||||
// it's mine
|
||||
JobFiles jobFiles = jobs.get(jobId);
|
||||
if (jobFiles == null) {
|
||||
jobFiles = new JobFiles(jobId);
|
||||
jobs.put(jobId, jobFiles);
|
||||
}
|
||||
setFilePath(fileName, path, jobFiles);
|
||||
}
|
||||
return jobs.values();
|
||||
}
|
||||
|
||||
private void setFilePath(String fileName, Path path,
|
||||
JobFiles jobFiles) {
|
||||
// determine if we're dealing with a job history file or a job conf file
|
||||
FileType type = getFileType(fileName);
|
||||
switch (type) {
|
||||
case JOB_HISTORY_FILE:
|
||||
if (jobFiles.getJobHistoryFilePath() == null) {
|
||||
jobFiles.setJobHistoryFilePath(path);
|
||||
} else {
|
||||
LOG.warn("we already have the job history file " +
|
||||
jobFiles.getJobHistoryFilePath() + ": skipping " + path);
|
||||
}
|
||||
break;
|
||||
case JOB_CONF_FILE:
|
||||
if (jobFiles.getJobConfFilePath() == null) {
|
||||
jobFiles.setJobConfFilePath(path);
|
||||
} else {
|
||||
LOG.warn("we already have the job conf file " +
|
||||
jobFiles.getJobConfFilePath() + ": skipping " + path);
|
||||
}
|
||||
break;
|
||||
case UNKNOWN:
|
||||
LOG.warn("unknown type: " + path);
|
||||
}
|
||||
}
|
||||
|
||||
private FileType getFileType(String fileName) {
|
||||
if (fileName.endsWith(".jhist")) {
|
||||
return FileType.JOB_HISTORY_FILE;
|
||||
}
|
||||
if (fileName.endsWith("_conf.xml")) {
|
||||
return FileType.JOB_CONF_FILE;
|
||||
}
|
||||
return FileType.UNKNOWN;
|
||||
}
|
||||
}
|
@ -0,0 +1,158 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.mapreduce.Mapper.Context;
|
||||
import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
|
||||
import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper;
|
||||
import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
|
||||
/**
|
||||
* Mapper for TimelineServicePerformanceV1 that replays job history files to the
|
||||
* timeline service.
|
||||
*
|
||||
*/
|
||||
class JobHistoryFileReplayMapperV1 extends
|
||||
org.apache.hadoop.mapreduce.
|
||||
Mapper<IntWritable,IntWritable,Writable,Writable> {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(JobHistoryFileReplayMapperV1.class);
|
||||
|
||||
public void map(IntWritable key, IntWritable val, Context context) throws IOException {
|
||||
// collect the apps it needs to process
|
||||
TimelineClient tlc = new TimelineClientImpl();
|
||||
TimelineEntityConverterV1 converter = new TimelineEntityConverterV1();
|
||||
JobHistoryFileReplayHelper helper = new JobHistoryFileReplayHelper(context);
|
||||
int replayMode = helper.getReplayMode();
|
||||
Collection<JobFiles> jobs =
|
||||
helper.getJobFiles();
|
||||
JobHistoryFileParser parser = helper.getParser();
|
||||
|
||||
if (jobs.isEmpty()) {
|
||||
LOG.info(context.getTaskAttemptID().getTaskID() +
|
||||
" will process no jobs");
|
||||
} else {
|
||||
LOG.info(context.getTaskAttemptID().getTaskID() + " will process " +
|
||||
jobs.size() + " jobs");
|
||||
}
|
||||
for (JobFiles job: jobs) {
|
||||
// process each job
|
||||
String jobIdStr = job.getJobId();
|
||||
LOG.info("processing " + jobIdStr + "...");
|
||||
JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr));
|
||||
ApplicationId appId = jobId.getAppId();
|
||||
|
||||
try {
|
||||
// parse the job info and configuration
|
||||
Path historyFilePath = job.getJobHistoryFilePath();
|
||||
Path confFilePath = job.getJobConfFilePath();
|
||||
if ((historyFilePath == null) || (confFilePath == null)) {
|
||||
continue;
|
||||
}
|
||||
JobInfo jobInfo =
|
||||
parser.parseHistoryFile(historyFilePath);
|
||||
Configuration jobConf =
|
||||
parser.parseConfiguration(confFilePath);
|
||||
LOG.info("parsed the job history file and the configuration file for job "
|
||||
+ jobIdStr);
|
||||
|
||||
// create entities from job history and write them
|
||||
long totalTime = 0;
|
||||
Set<TimelineEntity> entitySet =
|
||||
converter.createTimelineEntities(jobInfo, jobConf);
|
||||
LOG.info("converted them into timeline entities for job " + jobIdStr);
|
||||
// use the current user for this purpose
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
long startWrite = System.nanoTime();
|
||||
try {
|
||||
switch (replayMode) {
|
||||
case JobHistoryFileReplayHelper.WRITE_ALL_AT_ONCE:
|
||||
writeAllEntities(tlc, entitySet, ugi);
|
||||
break;
|
||||
case JobHistoryFileReplayHelper.WRITE_PER_ENTITY:
|
||||
writePerEntity(tlc, entitySet, ugi);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
|
||||
increment(1);
|
||||
LOG.error("writing to the timeline service failed", e);
|
||||
}
|
||||
long endWrite = System.nanoTime();
|
||||
totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
|
||||
int numEntities = entitySet.size();
|
||||
LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms");
|
||||
|
||||
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
|
||||
increment(totalTime);
|
||||
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
|
||||
increment(numEntities);
|
||||
} finally {
|
||||
context.progress(); // move it along
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeAllEntities(TimelineClient tlc,
|
||||
Set<TimelineEntity> entitySet, UserGroupInformation ugi)
|
||||
throws IOException, YarnException {
|
||||
tlc.putEntities((TimelineEntity[])entitySet.toArray());
|
||||
}
|
||||
|
||||
private void writePerEntity(TimelineClient tlc,
|
||||
Set<TimelineEntity> entitySet, UserGroupInformation ugi)
|
||||
throws IOException, YarnException {
|
||||
for (TimelineEntity entity : entitySet) {
|
||||
tlc.putEntities(entity);
|
||||
LOG.info("wrote entity " + entity.getEntityId());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,120 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.mapreduce.Mapper.Context;
|
||||
import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
/**
|
||||
* Adds simple entities with random string payload, events, metrics, and
|
||||
* configuration.
|
||||
*/
|
||||
class SimpleEntityWriterV1 extends
|
||||
org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> {
|
||||
private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV1.class);
|
||||
|
||||
// constants for mtype = 1
|
||||
static final String KBS_SENT = "kbs sent";
|
||||
static final int KBS_SENT_DEFAULT = 1;
|
||||
static final String TEST_TIMES = "testtimes";
|
||||
static final int TEST_TIMES_DEFAULT = 100;
|
||||
static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID =
|
||||
"timeline.server.performance.run.id";
|
||||
/**
|
||||
* To ensure that the compression really gets exercised, generate a
|
||||
* random alphanumeric fixed length payload
|
||||
*/
|
||||
private static char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f',
|
||||
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
|
||||
's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D',
|
||||
'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
|
||||
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2',
|
||||
'3', '4', '5', '6', '7', '8', '9', '0', ' ' };
|
||||
|
||||
public void map(IntWritable key, IntWritable val, Context context) throws IOException {
|
||||
TimelineClient tlc = new TimelineClientImpl();
|
||||
Configuration conf = context.getConfiguration();
|
||||
|
||||
final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT);
|
||||
|
||||
long totalTime = 0;
|
||||
final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT);
|
||||
final Random rand = new Random();
|
||||
final TaskAttemptID taskAttemptId = context.getTaskAttemptID();
|
||||
final char[] payLoad = new char[kbs * 1024];
|
||||
|
||||
for (int i = 0; i < testtimes; i++) {
|
||||
// Generate a fixed length random payload
|
||||
for (int xx = 0; xx < kbs * 1024; xx++) {
|
||||
int alphaNumIdx =
|
||||
rand.nextInt(ALPHA_NUMS.length);
|
||||
payLoad[xx] = ALPHA_NUMS[alphaNumIdx];
|
||||
}
|
||||
String entId = taskAttemptId + "_" + Integer.toString(i);
|
||||
final TimelineEntity entity = new TimelineEntity();
|
||||
entity.setEntityId(entId);
|
||||
entity.setEntityType("FOO_ATTEMPT");
|
||||
entity.addOtherInfo("PERF_TEST", payLoad);
|
||||
// add an event
|
||||
TimelineEvent event = new TimelineEvent();
|
||||
event.setTimestamp(System.currentTimeMillis());
|
||||
event.setEventType("foo_event");
|
||||
entity.addEvent(event);
|
||||
|
||||
// use the current user for this purpose
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
long startWrite = System.nanoTime();
|
||||
try {
|
||||
tlc.putEntities(entity);
|
||||
} catch (Exception e) {
|
||||
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).
|
||||
increment(1);
|
||||
LOG.error("writing to the timeline service failed", e);
|
||||
}
|
||||
long endWrite = System.nanoTime();
|
||||
totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite);
|
||||
}
|
||||
LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes +
|
||||
" kB) in " + totalTime + " ms");
|
||||
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).
|
||||
increment(totalTime);
|
||||
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).
|
||||
increment(testtimes);
|
||||
context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).
|
||||
increment(kbs*testtimes);
|
||||
}
|
||||
}
|
@ -0,0 +1,167 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.Counter;
|
||||
import org.apache.hadoop.mapreduce.CounterGroup;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TaskID;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
|
||||
class TimelineEntityConverterV1 {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TimelineEntityConverterV1.class);
|
||||
|
||||
static final String JOB = "MAPREDUCE_JOB";
|
||||
static final String TASK = "MAPREDUCE_TASK";
|
||||
static final String TASK_ATTEMPT = "MAPREDUCE_TASK_ATTEMPT";
|
||||
|
||||
/**
|
||||
* Creates job, task, and task attempt entities based on the job history info
|
||||
* and configuration.
|
||||
*
|
||||
* Note: currently these are plan timeline entities created for mapreduce
|
||||
* types. These are not meant to be the complete and accurate entity set-up
|
||||
* for mapreduce jobs. We do not leverage hierarchical timeline entities. If
|
||||
* we create canonical mapreduce hierarchical timeline entities with proper
|
||||
* parent-child relationship, we could modify this to use that instead.
|
||||
*
|
||||
* Note that we also do not add info to the YARN application entity, which
|
||||
* would be needed for aggregation.
|
||||
*/
|
||||
public Set<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
|
||||
Configuration conf) {
|
||||
Set<TimelineEntity> entities = new HashSet<>();
|
||||
|
||||
// create the job entity
|
||||
TimelineEntity job = createJobEntity(jobInfo, conf);
|
||||
entities.add(job);
|
||||
|
||||
// create the task and task attempt entities
|
||||
Set<TimelineEntity> tasksAndAttempts =
|
||||
createTaskAndTaskAttemptEntities(jobInfo);
|
||||
entities.addAll(tasksAndAttempts);
|
||||
|
||||
return entities;
|
||||
}
|
||||
|
||||
private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) {
|
||||
TimelineEntity job = new TimelineEntity();
|
||||
job.setEntityType(JOB);
|
||||
job.setEntityId(jobInfo.getJobId().toString());
|
||||
job.setStartTime(jobInfo.getSubmitTime());
|
||||
|
||||
job.addPrimaryFilter("JOBNAME", jobInfo.getJobname());
|
||||
job.addPrimaryFilter("USERNAME", jobInfo.getUsername());
|
||||
job.addOtherInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName());
|
||||
job.addOtherInfo("SUBMIT_TIME", jobInfo.getSubmitTime());
|
||||
job.addOtherInfo("LAUNCH_TIME", jobInfo.getLaunchTime());
|
||||
job.addOtherInfo("FINISH_TIME", jobInfo.getFinishTime());
|
||||
job.addOtherInfo("JOB_STATUS", jobInfo.getJobStatus());
|
||||
job.addOtherInfo("PRIORITY", jobInfo.getPriority());
|
||||
job.addOtherInfo("TOTAL_MAPS", jobInfo.getTotalMaps());
|
||||
job.addOtherInfo("TOTAL_REDUCES", jobInfo.getTotalReduces());
|
||||
job.addOtherInfo("UBERIZED", jobInfo.getUberized());
|
||||
job.addOtherInfo("ERROR_INFO", jobInfo.getErrorInfo());
|
||||
|
||||
LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity");
|
||||
return job;
|
||||
}
|
||||
|
||||
private Set<TimelineEntity> createTaskAndTaskAttemptEntities(JobInfo jobInfo) {
|
||||
Set<TimelineEntity> entities = new HashSet<>();
|
||||
Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
|
||||
LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
|
||||
" tasks");
|
||||
for (TaskInfo taskInfo: taskInfoMap.values()) {
|
||||
TimelineEntity task = createTaskEntity(taskInfo);
|
||||
entities.add(task);
|
||||
// add the task attempts from this task
|
||||
Set<TimelineEntity> taskAttempts = createTaskAttemptEntities(taskInfo);
|
||||
entities.addAll(taskAttempts);
|
||||
}
|
||||
return entities;
|
||||
}
|
||||
|
||||
private TimelineEntity createTaskEntity(TaskInfo taskInfo) {
|
||||
TimelineEntity task = new TimelineEntity();
|
||||
task.setEntityType(TASK);
|
||||
task.setEntityId(taskInfo.getTaskId().toString());
|
||||
task.setStartTime(taskInfo.getStartTime());
|
||||
|
||||
task.addOtherInfo("START_TIME", taskInfo.getStartTime());
|
||||
task.addOtherInfo("FINISH_TIME", taskInfo.getFinishTime());
|
||||
task.addOtherInfo("TASK_TYPE", taskInfo.getTaskType());
|
||||
task.addOtherInfo("TASK_STATUS", taskInfo.getTaskStatus());
|
||||
task.addOtherInfo("ERROR_INFO", taskInfo.getError());
|
||||
|
||||
LOG.info("converted task " + taskInfo.getTaskId() +
|
||||
" to a timeline entity");
|
||||
return task;
|
||||
}
|
||||
|
||||
private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) {
|
||||
Set<TimelineEntity> taskAttempts = new HashSet<TimelineEntity>();
|
||||
Map<TaskAttemptID,TaskAttemptInfo> taskAttemptInfoMap =
|
||||
taskInfo.getAllTaskAttempts();
|
||||
LOG.info("task " + taskInfo.getTaskId() + " has " +
|
||||
taskAttemptInfoMap.size() + " task attempts");
|
||||
for (TaskAttemptInfo taskAttemptInfo: taskAttemptInfoMap.values()) {
|
||||
TimelineEntity taskAttempt = createTaskAttemptEntity(taskAttemptInfo);
|
||||
taskAttempts.add(taskAttempt);
|
||||
}
|
||||
return taskAttempts;
|
||||
}
|
||||
|
||||
private TimelineEntity createTaskAttemptEntity(TaskAttemptInfo taskAttemptInfo) {
|
||||
TimelineEntity taskAttempt = new TimelineEntity();
|
||||
taskAttempt.setEntityType(TASK_ATTEMPT);
|
||||
taskAttempt.setEntityId(taskAttemptInfo.getAttemptId().toString());
|
||||
taskAttempt.setStartTime(taskAttemptInfo.getStartTime());
|
||||
|
||||
taskAttempt.addOtherInfo("START_TIME", taskAttemptInfo.getStartTime());
|
||||
taskAttempt.addOtherInfo("FINISH_TIME", taskAttemptInfo.getFinishTime());
|
||||
taskAttempt.addOtherInfo("MAP_FINISH_TIME",
|
||||
taskAttemptInfo.getMapFinishTime());
|
||||
taskAttempt.addOtherInfo("SHUFFLE_FINISH_TIME",
|
||||
taskAttemptInfo.getShuffleFinishTime());
|
||||
taskAttempt.addOtherInfo("SORT_FINISH_TIME",
|
||||
taskAttemptInfo.getSortFinishTime());
|
||||
taskAttempt.addOtherInfo("TASK_STATUS", taskAttemptInfo.getTaskStatus());
|
||||
taskAttempt.addOtherInfo("STATE", taskAttemptInfo.getState());
|
||||
taskAttempt.addOtherInfo("ERROR", taskAttemptInfo.getError());
|
||||
taskAttempt.addOtherInfo("CONTAINER_ID",
|
||||
taskAttemptInfo.getContainerId().toString());
|
||||
|
||||
LOG.info("converted task attempt " + taskAttemptInfo.getAttemptId() +
|
||||
" to a timeline entity");
|
||||
return taskAttempt;
|
||||
}
|
||||
}
|
@ -0,0 +1,197 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
|
||||
public class TimelineServicePerformance extends Configured implements Tool {
|
||||
static final int NUM_MAPS_DEFAULT = 1;
|
||||
|
||||
static final int SIMPLE_ENTITY_WRITER = 1;
|
||||
static final int JOB_HISTORY_FILE_REPLAY_MAPPER = 2;
|
||||
static int mapperType = SIMPLE_ENTITY_WRITER;
|
||||
static final int TIMELINE_SERVICE_VERSION_1 = 1;
|
||||
static final int TIMELINE_SERVICE_VERSION_2 = 2;
|
||||
static int timeline_service_version = TIMELINE_SERVICE_VERSION_1;
|
||||
|
||||
protected static int printUsage() {
|
||||
System.err.println(
|
||||
"Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT +
|
||||
")\n" +
|
||||
" [-v] timeline service version\n" +
|
||||
" [-mtype <mapper type in integer>]\n" +
|
||||
" 1. simple entity write mapper\n" +
|
||||
" 2. jobhistory files replay mapper\n" +
|
||||
" [-s <(KBs)test>] number of KB per put (mtype=1, default: " +
|
||||
SimpleEntityWriterV1.KBS_SENT_DEFAULT + " KB)\n" +
|
||||
" [-t] package sending iterations per mapper (mtype=1, default: " +
|
||||
SimpleEntityWriterV1.TEST_TIMES_DEFAULT + ")\n" +
|
||||
" [-d <path>] root path of job history files (mtype=2)\n" +
|
||||
" [-r <replay mode>] (mtype=2)\n" +
|
||||
" 1. write all entities for a job in one put (default)\n" +
|
||||
" 2. write one entity at a time\n");
|
||||
GenericOptionsParser.printGenericCommandUsage(System.err);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure a job given argv.
|
||||
*/
|
||||
public static boolean parseArgs(String[] args, Job job) throws IOException {
|
||||
// set the common defaults
|
||||
Configuration conf = job.getConfiguration();
|
||||
conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT);
|
||||
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
if (args.length == i + 1) {
|
||||
System.out.println("ERROR: Required parameter missing from " + args[i]);
|
||||
return printUsage() == 0;
|
||||
}
|
||||
try {
|
||||
if ("-v".equals(args[i])) {
|
||||
timeline_service_version = Integer.parseInt(args[++i]);
|
||||
}
|
||||
if ("-m".equals(args[i])) {
|
||||
if (Integer.parseInt(args[++i]) > 0) {
|
||||
job.getConfiguration()
|
||||
.setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i]));
|
||||
}
|
||||
} else if ("-mtype".equals(args[i])) {
|
||||
mapperType = Integer.parseInt(args[++i]);
|
||||
} else if ("-s".equals(args[i])) {
|
||||
if (Integer.parseInt(args[++i]) > 0) {
|
||||
conf.setInt(SimpleEntityWriterV1.KBS_SENT, Integer.parseInt(args[i]));
|
||||
}
|
||||
} else if ("-t".equals(args[i])) {
|
||||
if (Integer.parseInt(args[++i]) > 0) {
|
||||
conf.setInt(SimpleEntityWriterV1.TEST_TIMES,
|
||||
Integer.parseInt(args[i]));
|
||||
}
|
||||
} else if ("-d".equals(args[i])) {
|
||||
conf.set(JobHistoryFileReplayHelper.PROCESSING_PATH, args[++i]);
|
||||
} else if ("-r".equals(args[i])) {
|
||||
conf.setInt(JobHistoryFileReplayHelper.REPLAY_MODE,
|
||||
Integer.parseInt(args[++i]));
|
||||
} else {
|
||||
System.out.println("Unexpected argument: " + args[i]);
|
||||
return printUsage() == 0;
|
||||
}
|
||||
} catch (NumberFormatException except) {
|
||||
System.out.println("ERROR: Integer expected instead of " + args[i]);
|
||||
return printUsage() == 0;
|
||||
} catch (Exception e) {
|
||||
throw (IOException)new IOException().initCause(e);
|
||||
}
|
||||
}
|
||||
|
||||
// handle mapper-specific settings
|
||||
switch (timeline_service_version) {
|
||||
case TIMELINE_SERVICE_VERSION_1:
|
||||
default:
|
||||
switch (mapperType) {
|
||||
case JOB_HISTORY_FILE_REPLAY_MAPPER:
|
||||
job.setMapperClass(JobHistoryFileReplayMapperV1.class);
|
||||
String processingPath =
|
||||
conf.get(JobHistoryFileReplayHelper.PROCESSING_PATH);
|
||||
if (processingPath == null || processingPath.isEmpty()) {
|
||||
System.out.println("processing path is missing while mtype = 2");
|
||||
return printUsage() == 0;
|
||||
}
|
||||
break;
|
||||
case SIMPLE_ENTITY_WRITER:
|
||||
default:
|
||||
job.setMapperClass(SimpleEntityWriterV1.class);
|
||||
// use the current timestamp as the "run id" of the test: this will
|
||||
// be used as simulating the cluster timestamp for apps
|
||||
conf.setLong(SimpleEntityWriterV1.TIMELINE_SERVICE_PERFORMANCE_RUN_ID,
|
||||
System.currentTimeMillis());
|
||||
break;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* TimelineServer Performance counters
|
||||
*/
|
||||
static enum PerfCounters {
|
||||
TIMELINE_SERVICE_WRITE_TIME,
|
||||
TIMELINE_SERVICE_WRITE_COUNTER,
|
||||
TIMELINE_SERVICE_WRITE_FAILURES,
|
||||
TIMELINE_SERVICE_WRITE_KBS,
|
||||
}
|
||||
|
||||
public int run(String[] args) throws Exception {
|
||||
|
||||
Job job = Job.getInstance(getConf());
|
||||
job.setJarByClass(TimelineServicePerformance.class);
|
||||
job.setMapperClass(SimpleEntityWriterV1.class);
|
||||
job.setInputFormatClass(SleepInputFormat.class);
|
||||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
job.setNumReduceTasks(0);
|
||||
if (!parseArgs(args, job)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
Date startTime = new Date();
|
||||
System.out.println("Job started: " + startTime);
|
||||
int ret = job.waitForCompletion(true) ? 0 : 1;
|
||||
org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
|
||||
long writetime =
|
||||
counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue();
|
||||
long writecounts =
|
||||
counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue();
|
||||
long writesize =
|
||||
counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue();
|
||||
double transacrate = writecounts * 1000 / (double)writetime;
|
||||
double iorate = writesize * 1000 / (double)writetime;
|
||||
int numMaps =
|
||||
Integer.parseInt(job.getConfiguration().get(MRJobConfig.NUM_MAPS));
|
||||
|
||||
System.out.println("TRANSACTION RATE (per mapper): " + transacrate +
|
||||
" ops/s");
|
||||
System.out.println("IO RATE (per mapper): " + iorate + " KB/s");
|
||||
|
||||
System.out.println("TRANSACTION RATE (total): " + transacrate*numMaps +
|
||||
" ops/s");
|
||||
System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s");
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
int res =
|
||||
ToolRunner.run(new Configuration(), new TimelineServicePerformance(),
|
||||
args);
|
||||
System.exit(res);
|
||||
}
|
||||
|
||||
}
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.mapred.TestSequenceFileInputFormat;
|
||||
import org.apache.hadoop.mapred.TestTextInputFormat;
|
||||
import org.apache.hadoop.mapred.ThreadedMapBenchmark;
|
||||
import org.apache.hadoop.mapreduce.TimelineServicePerformance;
|
||||
import org.apache.hadoop.mapreduce.FailJob;
|
||||
import org.apache.hadoop.mapreduce.LargeSorter;
|
||||
import org.apache.hadoop.mapreduce.MiniHadoopClusterManager;
|
||||
@ -90,6 +91,8 @@ public MapredTestDriver(ProgramDriver pgd) {
|
||||
pgd.addClass("fail", FailJob.class, "a job that always fails");
|
||||
pgd.addClass("sleep", SleepJob.class,
|
||||
"A job that sleeps at each map and reduce task.");
|
||||
pgd.addClass("timelineperformance", TimelineServicePerformance.class,
|
||||
"A job that launches mappers to test timlineserver performance.");
|
||||
pgd.addClass("nnbench", NNBench.class,
|
||||
"A benchmark that stresses the namenode w/ MR.");
|
||||
pgd.addClass("nnbenchWithoutMR", NNBenchWithoutMR.class,
|
||||
|
@ -232,6 +232,9 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-3964. Support NodeLabelsProvider at Resource Manager side.
|
||||
(Dian Fu via devaraj)
|
||||
|
||||
YARN-2556. Tool to measure the performance of the timeline server (Chang Li
|
||||
via sjlee)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
Loading…
Reference in New Issue
Block a user