MAPREDUCE-2676. MR-279: JobHistory Job page needs reformatted. (Robert Evans via mahadev)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1170379 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4d90df82a9
commit
a3e8f6836b
@ -286,6 +286,9 @@ Release 0.23.0 - Unreleased
|
||||
org.apache.hadoop.yarn.api.records.* to be get/set only. Added javadocs to
|
||||
all public records. (acmurthy)
|
||||
|
||||
MAPREDUCE-2676. MR-279: JobHistory Job page needs reformatted. (Robert Evans via
|
||||
mahadev)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
@ -30,6 +31,7 @@
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
|
||||
|
||||
/**
|
||||
@ -52,6 +54,16 @@ public interface Job {
|
||||
int getCompletedReduces();
|
||||
boolean isUber();
|
||||
String getUserName();
|
||||
|
||||
/**
|
||||
* @return a path to where the config file for this job is located.
|
||||
*/
|
||||
Path getConfFile();
|
||||
|
||||
/**
|
||||
* @return the ACLs for this job for each type of JobACL given.
|
||||
*/
|
||||
Map<JobACL, AccessControlList> getJobACLs();
|
||||
|
||||
TaskAttemptCompletionEvent[]
|
||||
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
|
||||
|
@ -772,6 +772,15 @@ public String getUserName() {
|
||||
return userName;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#getConfFile()
|
||||
*/
|
||||
@Override
|
||||
public Path getConfFile() {
|
||||
return remoteJobConfFile;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return jobName;
|
||||
@ -787,6 +796,15 @@ public int getTotalMaps() {
|
||||
public int getTotalReduces() {
|
||||
return reduceTasks.size(); //FIXME: why indirection? return numReduceTasks
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#getJobACLs()
|
||||
*/
|
||||
@Override
|
||||
public Map<JobACL, AccessControlList> getJobACLs() {
|
||||
return Collections.unmodifiableMap(jobACLs);
|
||||
}
|
||||
|
||||
public static class InitTransition
|
||||
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
|
||||
|
@ -177,11 +177,12 @@ public void attempts() {
|
||||
}
|
||||
setTitle(join(attemptState, " ",
|
||||
MRApps.taskType(taskType).toString(), " attempts in ", $(JOB_ID)));
|
||||
|
||||
render(attemptsPage());
|
||||
} catch (Exception e) {
|
||||
badRequest(e.getMessage());
|
||||
}
|
||||
}
|
||||
render(attemptsPage());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -205,7 +206,7 @@ void notFound(String s) {
|
||||
/**
|
||||
* Ensure that a JOB_ID was passed into the page.
|
||||
*/
|
||||
void requireJob() {
|
||||
public void requireJob() {
|
||||
try {
|
||||
if ($(JOB_ID).isEmpty()) {
|
||||
throw new RuntimeException("missing job ID");
|
||||
@ -216,14 +217,15 @@ void requireJob() {
|
||||
notFound($(JOB_ID));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
badRequest(e.getMessage() == null ? e.getClass().getName() : e.getMessage());
|
||||
badRequest(e.getMessage() == null ?
|
||||
e.getClass().getName() : e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure that a TASK_ID was passed into the page.
|
||||
*/
|
||||
void requireTask() {
|
||||
public void requireTask() {
|
||||
try {
|
||||
if ($(TASK_ID).isEmpty()) {
|
||||
throw new RuntimeException("missing task ID");
|
||||
|
@ -0,0 +1,110 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.webapp;
|
||||
|
||||
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._TH;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.InputType;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
/**
|
||||
* Render the configuration for this job.
|
||||
*/
|
||||
public class ConfBlock extends HtmlBlock {
|
||||
final AppContext appContext;
|
||||
final Configuration conf;
|
||||
|
||||
@Inject ConfBlock(AppContext appctx, Configuration conf) {
|
||||
appContext = appctx;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.apache.hadoop.yarn.webapp.view.HtmlBlock#render(org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block)
|
||||
*/
|
||||
@Override protected void render(Block html) {
|
||||
String jid = $(JOB_ID);
|
||||
if (jid.isEmpty()) {
|
||||
html.
|
||||
p()._("Sorry, can't do anything without a JobID.")._();
|
||||
return;
|
||||
}
|
||||
JobId jobID = MRApps.toJobID(jid);
|
||||
Job job = appContext.getJob(jobID);
|
||||
if (job == null) {
|
||||
html.
|
||||
p()._("Sorry, ", jid, " not found.")._();
|
||||
return;
|
||||
}
|
||||
Path confPath = job.getConfFile();
|
||||
try {
|
||||
//Read in the configuration file and put it in a key/value table.
|
||||
FileContext fc = FileContext.getFileContext(confPath.toUri(), conf);
|
||||
Configuration jobConf = new Configuration(false);
|
||||
jobConf.addResource(fc.open(confPath));
|
||||
|
||||
html.div().h3(confPath.toString())._();
|
||||
TBODY<TABLE<Hamlet>> tbody = html.
|
||||
// Tasks table
|
||||
table("#conf").
|
||||
thead().
|
||||
tr().
|
||||
th(_TH, "key").
|
||||
th(_TH, "value").
|
||||
_().
|
||||
_().
|
||||
tbody();
|
||||
for(Map.Entry<String, String> entry : jobConf) {
|
||||
tbody.
|
||||
tr().
|
||||
td(entry.getKey()).
|
||||
td(entry.getValue()).
|
||||
_();
|
||||
}
|
||||
tbody._().
|
||||
tfoot().
|
||||
tr().
|
||||
th().input("search_init").$type(InputType.text).$name("key").$value("key")._()._().
|
||||
th().input("search_init").$type(InputType.text).$name("value").$value("value")._()._().
|
||||
_().
|
||||
_().
|
||||
_();
|
||||
} catch(IOException e) {
|
||||
LOG.error("Error while reading "+confPath, e);
|
||||
html.p()._("Sorry got an error while reading conf file. ",confPath);
|
||||
}
|
||||
}
|
||||
}
|
@ -26,6 +26,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.ShuffleHandler;
|
||||
import org.apache.hadoop.mapreduce.FileSystemCounter;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
@ -50,6 +51,7 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
@ -465,6 +467,16 @@ public boolean checkAccess(UserGroupInformation callerUGI,
|
||||
public String getUserName() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getConfFile() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<JobACL, AccessControlList> getJobACLs() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -31,6 +31,7 @@
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
@ -58,6 +59,7 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.Clock;
|
||||
import org.apache.hadoop.yarn.SystemClock;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
@ -461,6 +463,16 @@ public boolean checkAccess(UserGroupInformation callerUGI,
|
||||
public String getUserName() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getConfFile() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<JobACL, AccessControlList> getJobACLs() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -70,15 +70,17 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
|
||||
private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
|
||||
private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
|
||||
private final String user;
|
||||
private final Path confFile;
|
||||
|
||||
private List<TaskAttemptCompletionEvent> completionEvents = null;
|
||||
private JobInfo jobInfo;
|
||||
|
||||
public CompletedJob(Configuration conf, JobId jobId, Path historyFile,
|
||||
boolean loadTasks, String userName) throws IOException {
|
||||
boolean loadTasks, String userName, Path confFile) throws IOException {
|
||||
LOG.info("Loading job: " + jobId + " from file: " + historyFile);
|
||||
this.conf = conf;
|
||||
this.jobId = jobId;
|
||||
this.confFile = confFile;
|
||||
|
||||
loadFullHistoryData(loadTasks, historyFile);
|
||||
|
||||
@ -304,8 +306,26 @@ public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)
|
||||
jobInfo.getUsername(), jobACL);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#getJobACLs()
|
||||
*/
|
||||
@Override
|
||||
public Map<JobACL, AccessControlList> getJobACLs() {
|
||||
return jobInfo.getJobACLs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUserName() {
|
||||
return user;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#getConfFile()
|
||||
*/
|
||||
@Override
|
||||
public Path getConfFile() {
|
||||
return confFile;
|
||||
}
|
||||
}
|
||||
|
@ -21,7 +21,6 @@
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
@ -36,8 +35,6 @@
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -87,18 +84,18 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
||||
|
||||
private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
|
||||
|
||||
private static final Pattern DATE_PATTERN = Pattern
|
||||
.compile("([0-1]?[0-9])/([0-3]?[0-9])/((?:2[0-9])[0-9][0-9])");
|
||||
|
||||
/*
|
||||
* TODO Get rid of this once JobId has it's own comparator
|
||||
*/
|
||||
private static final Comparator<JobId> JOB_ID_COMPARATOR = new Comparator<JobId>() {
|
||||
private static final Comparator<JobId> JOB_ID_COMPARATOR =
|
||||
new Comparator<JobId>() {
|
||||
@Override
|
||||
public int compare(JobId o1, JobId o2) {
|
||||
if (o1.getAppId().getClusterTimestamp() > o2.getAppId().getClusterTimestamp()) {
|
||||
if (o1.getAppId().getClusterTimestamp() >
|
||||
o2.getAppId().getClusterTimestamp()) {
|
||||
return 1;
|
||||
} else if (o1.getAppId().getClusterTimestamp() < o2.getAppId().getClusterTimestamp()) {
|
||||
} else if (o1.getAppId().getClusterTimestamp() <
|
||||
o2.getAppId().getClusterTimestamp()) {
|
||||
return -1;
|
||||
} else {
|
||||
return o1.getId() - o2.getId();
|
||||
@ -106,7 +103,8 @@ public int compare(JobId o1, JobId o2) {
|
||||
}
|
||||
};
|
||||
|
||||
private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils.doneSubdirsBeforeSerialTail();
|
||||
private static String DONE_BEFORE_SERIAL_TAIL =
|
||||
JobHistoryUtils.doneSubdirsBeforeSerialTail();
|
||||
|
||||
/**
|
||||
* Maps between a serial number (generated based on jobId) and the timestamp
|
||||
@ -114,29 +112,32 @@ public int compare(JobId o1, JobId o2) {
|
||||
* Facilitates jobId based searches.
|
||||
* If a jobId is not found in this list - it will not be found.
|
||||
*/
|
||||
private final SortedMap<String, Set<String>> idToDateString = new ConcurrentSkipListMap<String, Set<String>>();
|
||||
private final SortedMap<String, Set<String>> idToDateString =
|
||||
new ConcurrentSkipListMap<String, Set<String>>();
|
||||
|
||||
//Maintains minimal details for recent jobs (parsed from history file name).
|
||||
//Sorted on Job Completion Time.
|
||||
private final SortedMap<JobId, MetaInfo> jobListCache = new ConcurrentSkipListMap<JobId, MetaInfo>(
|
||||
JOB_ID_COMPARATOR);
|
||||
private final SortedMap<JobId, MetaInfo> jobListCache =
|
||||
new ConcurrentSkipListMap<JobId, MetaInfo>(JOB_ID_COMPARATOR);
|
||||
|
||||
|
||||
// Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo)
|
||||
// Check for existance of the object when using iterators.
|
||||
private final SortedMap<JobId, MetaInfo> intermediateListCache = new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>(
|
||||
JOB_ID_COMPARATOR);
|
||||
private final SortedMap<JobId, MetaInfo> intermediateListCache =
|
||||
new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>(JOB_ID_COMPARATOR);
|
||||
|
||||
//Maintains a list of known done subdirectories. Not currently used.
|
||||
private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
|
||||
|
||||
private final SortedMap<JobId, Job> loadedJobCache = new ConcurrentSkipListMap<JobId, Job>(
|
||||
JOB_ID_COMPARATOR);
|
||||
private final SortedMap<JobId, Job> loadedJobCache =
|
||||
new ConcurrentSkipListMap<JobId, Job>(JOB_ID_COMPARATOR);
|
||||
|
||||
/**
|
||||
* Maintains a mapping between intermediate user directories and the last known modification time.
|
||||
* Maintains a mapping between intermediate user directories and the last
|
||||
* known modification time.
|
||||
*/
|
||||
private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
|
||||
private Map<String, Long> userDirModificationTimeMap =
|
||||
new HashMap<String, Long>();
|
||||
|
||||
//The number of jobs to maintain in the job list cache.
|
||||
private int jobListCacheSize;
|
||||
@ -187,7 +188,8 @@ public void init(Configuration conf) throws YarnException {
|
||||
debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
|
||||
serialNumberLowDigits = debugMode ? 1 : 3;
|
||||
serialNumberFormat = ("%0"
|
||||
+ (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) + "d");
|
||||
+ (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS
|
||||
+ serialNumberLowDigits) + "d");
|
||||
|
||||
String doneDirPrefix = null;
|
||||
doneDirPrefix = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
|
||||
@ -195,9 +197,11 @@ public void init(Configuration conf) throws YarnException {
|
||||
doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
|
||||
new Path(doneDirPrefix));
|
||||
doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
|
||||
mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
|
||||
mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
|
||||
JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
|
||||
} catch (IOException e) {
|
||||
throw new YarnException("Error creating done directory: [" + doneDirPrefixPath + "]", e);
|
||||
throw new YarnException("Error creating done directory: [" +
|
||||
doneDirPrefixPath + "]", e);
|
||||
}
|
||||
|
||||
String intermediateDoneDirPrefix = null;
|
||||
@ -208,21 +212,27 @@ public void init(Configuration conf) throws YarnException {
|
||||
.makeQualified(new Path(intermediateDoneDirPrefix));
|
||||
intermediateDoneDirFc = FileContext.getFileContext(
|
||||
intermediateDoneDirPath.toUri(), conf);
|
||||
mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
|
||||
mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
|
||||
JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
|
||||
} catch (IOException e) {
|
||||
LOG.info("error creating done directory on dfs " + e);
|
||||
throw new YarnException("Error creating intermediate done directory: [" + intermediateDoneDirPath + "]", e);
|
||||
throw new YarnException("Error creating intermediate done directory: ["
|
||||
+ intermediateDoneDirPath + "]", e);
|
||||
}
|
||||
|
||||
|
||||
|
||||
jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, DEFAULT_JOBLIST_CACHE_SIZE);
|
||||
loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, DEFAULT_LOADEDJOB_CACHE_SIZE);
|
||||
dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, DEFAULT_DATESTRING_CACHE_SIZE);
|
||||
jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
|
||||
DEFAULT_JOBLIST_CACHE_SIZE);
|
||||
loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
|
||||
DEFAULT_LOADEDJOB_CACHE_SIZE);
|
||||
dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
|
||||
DEFAULT_DATESTRING_CACHE_SIZE);
|
||||
moveThreadInterval =
|
||||
conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
|
||||
DEFAULT_MOVE_THREAD_INTERVAL);
|
||||
numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, DEFAULT_MOVE_THREAD_COUNT);
|
||||
numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
|
||||
DEFAULT_MOVE_THREAD_COUNT);
|
||||
try {
|
||||
initExisting();
|
||||
} catch (IOException e) {
|
||||
@ -254,19 +264,21 @@ private void mkdir(FileContext fc, Path path, FsPermission fsp)
|
||||
@Override
|
||||
public void start() {
|
||||
//Start moveIntermediatToDoneThread
|
||||
moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads);
|
||||
moveIntermediateToDoneRunnable =
|
||||
new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads);
|
||||
moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
|
||||
moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
|
||||
moveIntermediateToDoneThread.start();
|
||||
|
||||
//Start historyCleaner
|
||||
boolean startCleanerService = conf.getBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
|
||||
boolean startCleanerService = conf.getBoolean(
|
||||
JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
|
||||
if (startCleanerService) {
|
||||
long maxAgeOfHistoryFiles = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
|
||||
DEFAULT_HISTORY_MAX_AGE);
|
||||
long maxAgeOfHistoryFiles = conf.getLong(
|
||||
JHAdminConfig.MR_HISTORY_MAX_AGE_MS, DEFAULT_HISTORY_MAX_AGE);
|
||||
cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1);
|
||||
long runInterval = conf.getLong(JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
|
||||
DEFAULT_RUN_INTERVAL);
|
||||
long runInterval = conf.getLong(
|
||||
JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, DEFAULT_RUN_INTERVAL);
|
||||
cleanerScheduledExecutor
|
||||
.scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
|
||||
30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
|
||||
@ -331,13 +343,16 @@ private void initExisting() throws IOException {
|
||||
|
||||
private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
|
||||
String serialPart = serialDirPath.getName();
|
||||
String timeStampPart = JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
|
||||
String timeStampPart =
|
||||
JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
|
||||
if (timeStampPart == null) {
|
||||
LOG.warn("Could not find timestamp portion from path: " + serialDirPath.toString() +". Continuing with next");
|
||||
LOG.warn("Could not find timestamp portion from path: " +
|
||||
serialDirPath.toString() +". Continuing with next");
|
||||
return;
|
||||
}
|
||||
if (serialPart == null) {
|
||||
LOG.warn("Could not find serial portion from path: " + serialDirPath.toString() + ". Continuing with next");
|
||||
LOG.warn("Could not find serial portion from path: " +
|
||||
serialDirPath.toString() + ". Continuing with next");
|
||||
return;
|
||||
}
|
||||
if (idToDateString.containsKey(serialPart)) {
|
||||
@ -355,13 +370,16 @@ private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
|
||||
LOG.debug("Adding "+serialDirPath+" to serial index");
|
||||
}
|
||||
String serialPart = serialDirPath.getName();
|
||||
String timestampPart = JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
|
||||
String timestampPart =
|
||||
JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
|
||||
if (timestampPart == null) {
|
||||
LOG.warn("Could not find timestamp portion from path: " + serialDirPath.toString() +". Continuing with next");
|
||||
LOG.warn("Could not find timestamp portion from path: " +
|
||||
serialDirPath.toString() +". Continuing with next");
|
||||
return;
|
||||
}
|
||||
if (serialPart == null) {
|
||||
LOG.warn("Could not find serial portion from path: " + serialDirPath.toString() + ". Continuing with next");
|
||||
LOG.warn("Could not find serial portion from path: " +
|
||||
serialDirPath.toString() + ". Continuing with next");
|
||||
}
|
||||
addToSerialNumberIndex(serialPart, timestampPart);
|
||||
}
|
||||
@ -400,7 +418,8 @@ private void addDirectoryToJobListCache(Path path) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private static List<FileStatus> scanDirectory(Path path, FileContext fc, PathFilter pathFilter) throws IOException {
|
||||
private static List<FileStatus> scanDirectory(Path path, FileContext fc,
|
||||
PathFilter pathFilter) throws IOException {
|
||||
path = fc.makeQualified(path);
|
||||
List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
|
||||
RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
|
||||
@ -414,7 +433,8 @@ private static List<FileStatus> scanDirectory(Path path, FileContext fc, PathFil
|
||||
return jhStatusList;
|
||||
}
|
||||
|
||||
private static List<FileStatus> scanDirectoryForHistoryFiles(Path path, FileContext fc) throws IOException {
|
||||
private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
|
||||
FileContext fc) throws IOException {
|
||||
return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
|
||||
}
|
||||
|
||||
@ -425,7 +445,8 @@ private static List<FileStatus> scanDirectoryForHistoryFiles(Path path, FileCont
|
||||
* @return
|
||||
*/
|
||||
private List<FileStatus> findTimestampedDirectories() throws IOException {
|
||||
List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
|
||||
List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
|
||||
doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
|
||||
return fsList;
|
||||
}
|
||||
|
||||
@ -434,7 +455,8 @@ private List<FileStatus> findTimestampedDirectories() throws IOException {
|
||||
*/
|
||||
private void addToJobListCache(JobId jobId, MetaInfo metaInfo) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Adding "+jobId+" to job list cache with "+metaInfo.getJobIndexInfo());
|
||||
LOG.debug("Adding "+jobId+" to job list cache with "
|
||||
+metaInfo.getJobIndexInfo());
|
||||
}
|
||||
jobListCache.put(jobId, metaInfo);
|
||||
if (jobListCache.size() > jobListCacheSize) {
|
||||
@ -462,14 +484,16 @@ private void addToLoadedJobCache(Job job) {
|
||||
* @throws IOException
|
||||
*/
|
||||
private void scanIntermediateDirectory() throws IOException {
|
||||
List<FileStatus> userDirList = JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
|
||||
List<FileStatus> userDirList =
|
||||
JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
|
||||
|
||||
for (FileStatus userDir : userDirList) {
|
||||
String name = userDir.getPath().getName();
|
||||
long newModificationTime = userDir.getModificationTime();
|
||||
boolean shouldScan = false;
|
||||
synchronized (userDirModificationTimeMap) {
|
||||
if (!userDirModificationTimeMap.containsKey(name) || newModificationTime > userDirModificationTimeMap.get(name)) {
|
||||
if (!userDirModificationTimeMap.containsKey(name) || newModificationTime
|
||||
> userDirModificationTimeMap.get(name)) {
|
||||
shouldScan = true;
|
||||
userDirModificationTimeMap.put(name, newModificationTime);
|
||||
}
|
||||
@ -514,9 +538,11 @@ private void scanIntermediateDirectory(final Path absPath)
|
||||
* @return A MetaInfo object for the jobId, null if not found.
|
||||
* @throws IOException
|
||||
*/
|
||||
private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId) throws IOException {
|
||||
private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId)
|
||||
throws IOException {
|
||||
for (FileStatus fs : fileStatusList) {
|
||||
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
|
||||
JobIndexInfo jobIndexInfo =
|
||||
FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
|
||||
if (jobIndexInfo.getJobId().equals(jobId)) {
|
||||
String confFileName = JobHistoryUtils
|
||||
.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
||||
@ -549,7 +575,8 @@ private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
|
||||
}
|
||||
for (String timestampPart : dateStringSet) {
|
||||
Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
|
||||
List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, doneDirFc);
|
||||
List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
|
||||
doneDirFc);
|
||||
MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
|
||||
if (metaInfo != null) {
|
||||
return metaInfo;
|
||||
@ -559,7 +586,8 @@ private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks for the existence of the job history file in the interemediate directory.
|
||||
* Checks for the existence of the job history file in the intermediate
|
||||
* directory.
|
||||
* @param jobId
|
||||
* @return
|
||||
* @throws IOException
|
||||
@ -586,7 +614,8 @@ public void stop() {
|
||||
|
||||
MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
|
||||
this.sleepTime = sleepTime;
|
||||
moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
|
||||
moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
|
||||
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
|
||||
running = true;
|
||||
}
|
||||
|
||||
@ -604,7 +633,8 @@ public void run() {
|
||||
try {
|
||||
moveToDone(metaInfo);
|
||||
} catch (IOException e) {
|
||||
LOG.info("Failed to process metaInfo for job: " + metaInfo.jobIndexInfo.getJobId(), e);
|
||||
LOG.info("Failed to process metaInfo for job: " +
|
||||
metaInfo.jobIndexInfo.getJobId(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -629,38 +659,17 @@ private Job loadJob(MetaInfo metaInfo) {
|
||||
synchronized(metaInfo) {
|
||||
try {
|
||||
Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
|
||||
metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser());
|
||||
metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser(),
|
||||
metaInfo.getConfFile());
|
||||
addToLoadedJobCache(job);
|
||||
return job;
|
||||
} catch (IOException e) {
|
||||
throw new YarnException("Could not find/load job: " + metaInfo.getJobIndexInfo().getJobId(), e);
|
||||
throw new YarnException("Could not find/load job: " +
|
||||
metaInfo.getJobIndexInfo().getJobId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private SortedMap<JobId, JobIndexInfo> getAllJobsMetaInfo() {
|
||||
SortedMap<JobId, JobIndexInfo> result = new TreeMap<JobId, JobIndexInfo>(JOB_ID_COMPARATOR);
|
||||
try {
|
||||
scanIntermediateDirectory();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to scan intermediate directory", e);
|
||||
throw new YarnException(e);
|
||||
}
|
||||
for (JobId jobId : intermediateListCache.keySet()) {
|
||||
MetaInfo mi = intermediateListCache.get(jobId);
|
||||
if (mi != null) {
|
||||
result.put(jobId, mi.getJobIndexInfo());
|
||||
}
|
||||
}
|
||||
for (JobId jobId : jobListCache.keySet()) {
|
||||
MetaInfo mi = jobListCache.get(jobId);
|
||||
if (mi != null) {
|
||||
result.put(jobId, mi.getJobIndexInfo());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private Map<JobId, Job> getAllJobsInternal() {
|
||||
//TODO This should ideally be using getAllJobsMetaInfo
|
||||
// or get rid of that method once Job has APIs for user, finishTime etc.
|
||||
@ -746,108 +755,6 @@ private Job findJob(JobId jobId) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Searches cached jobs for the specified criteria (AND). Ignores the criteria if null.
|
||||
* @param soughtUser
|
||||
* @param soughtJobNameSubstring
|
||||
* @param soughtDateStrings
|
||||
* @return
|
||||
*/
|
||||
private Map<JobId, Job> findJobs(String soughtUser, String soughtJobNameSubstring, String[] soughtDateStrings) {
|
||||
boolean searchUser = true;
|
||||
boolean searchJobName = true;
|
||||
boolean searchDates = true;
|
||||
List<Calendar> soughtCalendars = null;
|
||||
|
||||
if (soughtUser == null) {
|
||||
searchUser = false;
|
||||
}
|
||||
if (soughtJobNameSubstring == null) {
|
||||
searchJobName = false;
|
||||
}
|
||||
if (soughtDateStrings == null) {
|
||||
searchDates = false;
|
||||
} else {
|
||||
soughtCalendars = getSoughtDateAsCalendar(soughtDateStrings);
|
||||
}
|
||||
|
||||
Map<JobId, Job> resultMap = new TreeMap<JobId, Job>();
|
||||
|
||||
SortedMap<JobId, JobIndexInfo> allJobs = getAllJobsMetaInfo();
|
||||
for (Map.Entry<JobId, JobIndexInfo> entry : allJobs.entrySet()) {
|
||||
JobId jobId = entry.getKey();
|
||||
JobIndexInfo indexInfo = entry.getValue();
|
||||
String jobName = indexInfo.getJobName();
|
||||
String jobUser = indexInfo.getUser();
|
||||
long finishTime = indexInfo.getFinishTime();
|
||||
|
||||
if (searchUser) {
|
||||
if (!soughtUser.equals(jobUser)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (searchJobName) {
|
||||
if (!jobName.contains(soughtJobNameSubstring)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (searchDates) {
|
||||
boolean matchedDate = false;
|
||||
Calendar jobCal = Calendar.getInstance();
|
||||
jobCal.setTimeInMillis(finishTime);
|
||||
for (Calendar cal : soughtCalendars) {
|
||||
if (jobCal.get(Calendar.YEAR) == cal.get(Calendar.YEAR) &&
|
||||
jobCal.get(Calendar.MONTH) == cal.get(Calendar.MONTH) &&
|
||||
jobCal.get(Calendar.DAY_OF_MONTH) == cal.get(Calendar.DAY_OF_MONTH)) {
|
||||
matchedDate = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!matchedDate) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
resultMap.put(jobId, new PartialJob(indexInfo, jobId));
|
||||
}
|
||||
return resultMap;
|
||||
}
|
||||
|
||||
private List<Calendar> getSoughtDateAsCalendar(String [] soughtDateStrings) {
|
||||
List<Calendar> soughtCalendars = new ArrayList<Calendar>();
|
||||
for (int i = 0 ; i < soughtDateStrings.length ; i++) {
|
||||
String soughtDate = soughtDateStrings[i];
|
||||
if (soughtDate.length() != 0) {
|
||||
Matcher m = DATE_PATTERN.matcher(soughtDate);
|
||||
if (m.matches()) {
|
||||
String yyyyPart = m.group(3);
|
||||
String mmPart = m.group(1);
|
||||
String ddPart = m.group(2);
|
||||
|
||||
if (yyyyPart.length() == 2) {
|
||||
yyyyPart = "20" + yyyyPart;
|
||||
}
|
||||
if (mmPart.length() == 1) {
|
||||
mmPart = "0" + mmPart;
|
||||
}
|
||||
if (ddPart.length() == 1) {
|
||||
ddPart = "0" + ddPart;
|
||||
}
|
||||
Calendar soughtCal = Calendar.getInstance();
|
||||
soughtCal.set(Calendar.YEAR, Integer.parseInt(yyyyPart));
|
||||
soughtCal.set(Calendar.MONTH, Integer.parseInt(mmPart) - 1);
|
||||
soughtCal.set(Calendar.DAY_OF_MONTH, Integer.parseInt(ddPart) -1);
|
||||
soughtCalendars.add(soughtCal);
|
||||
}
|
||||
}
|
||||
}
|
||||
return soughtCalendars;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private void moveToDone(MetaInfo metaInfo) throws IOException {
|
||||
long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
|
||||
if (completeTime == 0) completeTime = System.currentTimeMillis();
|
||||
@ -890,26 +797,31 @@ private void moveToDone(MetaInfo metaInfo) throws IOException {
|
||||
try {
|
||||
maybeMakeSubdirectory(targetDir);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed creating subdirectory: " + targetDir + " while attempting to move files for jobId: " + jobId);
|
||||
LOG.warn("Failed creating subdirectory: " + targetDir +
|
||||
" while attempting to move files for jobId: " + jobId);
|
||||
throw e;
|
||||
}
|
||||
synchronized (metaInfo) {
|
||||
if (historyFile != null) {
|
||||
Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile.getName()));
|
||||
Path toPath = doneDirFc.makeQualified(new Path(targetDir,
|
||||
historyFile.getName()));
|
||||
try {
|
||||
moveToDoneNow(historyFile, toPath);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to move file: " + historyFile + " for jobId: " + jobId);
|
||||
LOG.warn("Failed to move file: " + historyFile + " for jobId: "
|
||||
+ jobId);
|
||||
throw e;
|
||||
}
|
||||
metaInfo.setHistoryFile(toPath);
|
||||
}
|
||||
if (confFile != null) {
|
||||
Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile.getName()));
|
||||
Path toPath = doneDirFc.makeQualified(new Path(targetDir,
|
||||
confFile.getName()));
|
||||
try {
|
||||
moveToDoneNow(confFile, toPath);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to move file: " + historyFile + " for jobId: " + jobId);
|
||||
LOG.warn("Failed to move file: " + historyFile + " for jobId: "
|
||||
+ jobId);
|
||||
throw e;
|
||||
}
|
||||
metaInfo.setConfFile(toPath);
|
||||
@ -953,7 +865,8 @@ private void maybeMakeSubdirectory(Path path) throws IOException {
|
||||
}
|
||||
} catch (FileNotFoundException fnfE) {
|
||||
try {
|
||||
FsPermission fsp = new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
|
||||
FsPermission fsp =
|
||||
new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
|
||||
doneDirFc.mkdir(path, fsp, true);
|
||||
FileStatus fsStatus = doneDirFc.getFileStatus(path);
|
||||
LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
|
||||
@ -972,12 +885,15 @@ private void maybeMakeSubdirectory(Path path) throws IOException {
|
||||
}
|
||||
|
||||
private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
|
||||
return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
|
||||
return new Path(doneDirPrefixPath,
|
||||
JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
|
||||
}
|
||||
|
||||
private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
|
||||
String timestampComponent = JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode);
|
||||
return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
|
||||
String timestampComponent =
|
||||
JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode);
|
||||
return new Path(doneDirPrefixPath,
|
||||
JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
|
||||
}
|
||||
|
||||
|
||||
@ -1033,12 +949,13 @@ static class MetaInfo {
|
||||
private Path summaryFile;
|
||||
JobIndexInfo jobIndexInfo;
|
||||
|
||||
MetaInfo(Path historyFile, Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo) {
|
||||
MetaInfo(Path historyFile, Path confFile, Path summaryFile,
|
||||
JobIndexInfo jobIndexInfo) {
|
||||
this.historyFile = historyFile;
|
||||
this.confFile = confFile;
|
||||
this.summaryFile = summaryFile;
|
||||
this.jobIndexInfo = jobIndexInfo;
|
||||
}
|
||||
}
|
||||
|
||||
Path getHistoryFile() { return historyFile; }
|
||||
Path getConfFile() { return confFile; }
|
||||
@ -1073,13 +990,19 @@ public void run() {
|
||||
//Sort in ascending order. Relies on YYYY/MM/DD/Serial
|
||||
Collections.sort(serialDirList);
|
||||
for (FileStatus serialDir : serialDirList) {
|
||||
List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc);
|
||||
List<FileStatus> historyFileList =
|
||||
scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc);
|
||||
for (FileStatus historyFile : historyFileList) {
|
||||
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName());
|
||||
long effectiveTimestamp = getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile);
|
||||
JobIndexInfo jobIndexInfo =
|
||||
FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName());
|
||||
long effectiveTimestamp =
|
||||
getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile);
|
||||
if (shouldDelete(effectiveTimestamp)) {
|
||||
String confFileName = JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
||||
MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(historyFile.getPath().getParent(), confFileName), null, jobIndexInfo);
|
||||
String confFileName =
|
||||
JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
||||
MetaInfo metaInfo = new MetaInfo(historyFile.getPath(),
|
||||
new Path(historyFile.getPath().getParent(), confFileName),
|
||||
null, jobIndexInfo);
|
||||
delete(metaInfo);
|
||||
} else {
|
||||
halted = true;
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
@ -32,6 +33,7 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
|
||||
import clover.org.apache.log4j.Logger;
|
||||
@ -147,4 +149,14 @@ public String getUserName() {
|
||||
return jobIndexInfo.getUser();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getConfFile() {
|
||||
throw new IllegalStateException("Not implemented yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<JobACL, AccessControlList> getJobACLs() {
|
||||
throw new IllegalStateException("Not implemented yet");
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,97 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs.webapp;
|
||||
|
||||
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.ATTEMPT_STATE;
|
||||
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.app.webapp.App;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
|
||||
import org.apache.hadoop.yarn.webapp.SubView;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
/**
|
||||
* Render a page showing the attempts made of a given type and a given job.
|
||||
*/
|
||||
public class HsAttemptsPage extends HsTaskPage {
|
||||
static class FewAttemptsBlock extends HsTaskPage.AttemptsBlock {
|
||||
@Inject
|
||||
FewAttemptsBlock(App ctx) {
|
||||
super(ctx);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.apache.hadoop.mapreduce.v2.hs.webapp.HsTaskPage.AttemptsBlock#isValidRequest()
|
||||
* Verify that a job is given.
|
||||
*/
|
||||
@Override
|
||||
protected boolean isValidRequest() {
|
||||
return app.getJob() != null;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.apache.hadoop.mapreduce.v2.hs.webapp.HsTaskPage.AttemptsBlock#getTaskAttempts()
|
||||
* @return the attempts that are for a given job and a specific type/state.
|
||||
*/
|
||||
@Override
|
||||
protected Collection<TaskAttempt> getTaskAttempts() {
|
||||
List<TaskAttempt> fewTaskAttemps = new ArrayList<TaskAttempt>();
|
||||
String taskTypeStr = $(TASK_TYPE);
|
||||
TaskType taskType = MRApps.taskType(taskTypeStr);
|
||||
String attemptStateStr = $(ATTEMPT_STATE);
|
||||
TaskAttemptStateUI neededState = MRApps
|
||||
.taskAttemptState(attemptStateStr);
|
||||
Job j = app.getJob();
|
||||
Map<TaskId, Task> tasks = j.getTasks(taskType);
|
||||
for (Task task : tasks.values()) {
|
||||
Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
|
||||
for (TaskAttempt attempt : attempts.values()) {
|
||||
if (neededState.correspondsTo(attempt.getState())) {
|
||||
fewTaskAttemps.add(attempt);
|
||||
}
|
||||
}
|
||||
}
|
||||
return fewTaskAttemps;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The content will render a different set of task attempts.
|
||||
* @return FewAttemptsBlock.class
|
||||
*/
|
||||
@Override
|
||||
protected Class<? extends SubView> content() {
|
||||
return FewAttemptsBlock.class;
|
||||
}
|
||||
}
|
@ -0,0 +1,99 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs.webapp;
|
||||
|
||||
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
|
||||
import static org.apache.hadoop.yarn.util.StringHelper.join;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.postInitID;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
|
||||
|
||||
import org.apache.hadoop.mapreduce.v2.app.webapp.ConfBlock;
|
||||
import org.apache.hadoop.yarn.webapp.SubView;
|
||||
|
||||
/**
|
||||
* Render a page with the configuration for a give job in it.
|
||||
*/
|
||||
public class HsConfPage extends HsView {
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.apache.hadoop.mapreduce.v2.hs.webapp.HsView#preHead(org.apache.hadoop.yarn.webapp.hamlet.Hamlet.HTML)
|
||||
*/
|
||||
@Override protected void preHead(Page.HTML<_> html) {
|
||||
String jobID = $(JOB_ID);
|
||||
set(TITLE, jobID.isEmpty() ? "Bad request: missing job ID"
|
||||
: join("Configuration for MapReduce Job ", $(JOB_ID)));
|
||||
commonPreHead(html);
|
||||
set(DATATABLES_ID, "conf");
|
||||
set(initID(DATATABLES, "conf"), confTableInit());
|
||||
set(postInitID(DATATABLES, "conf"), confPostTableInit());
|
||||
setTableStyles(html, "conf");
|
||||
|
||||
//Override the default nav config
|
||||
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:1}");
|
||||
}
|
||||
|
||||
/**
|
||||
* The body of this block is the configuration block.
|
||||
* @return HsConfBlock.class
|
||||
*/
|
||||
@Override protected Class<? extends SubView> content() {
|
||||
return ConfBlock.class;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the end of the JS map that is the jquery datatable config for the
|
||||
* conf table.
|
||||
*/
|
||||
private String confTableInit() {
|
||||
return tableInit().append("}").toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the java script code to allow the jquery conf datatable to filter
|
||||
* by column.
|
||||
*/
|
||||
private String confPostTableInit() {
|
||||
return "var confInitVals = new Array();\n" +
|
||||
"$('tfoot input').keyup( function () \n{"+
|
||||
" confDataTable.fnFilter( this.value, $('tfoot input').index(this) );\n"+
|
||||
"} );\n"+
|
||||
"$('tfoot input').each( function (i) {\n"+
|
||||
" confInitVals[i] = this.value;\n"+
|
||||
"} );\n"+
|
||||
"$('tfoot input').focus( function () {\n"+
|
||||
" if ( this.className == 'search_init' )\n"+
|
||||
" {\n"+
|
||||
" this.className = '';\n"+
|
||||
" this.value = '';\n"+
|
||||
" }\n"+
|
||||
"} );\n"+
|
||||
"$('tfoot input').blur( function (i) {\n"+
|
||||
" if ( this.value == '' )\n"+
|
||||
" {\n"+
|
||||
" this.className = 'search_init';\n"+
|
||||
" this.value = confInitVals[$('tfoot input').index(this)];\n"+
|
||||
" }\n"+
|
||||
"} );\n";
|
||||
}
|
||||
}
|
@ -78,7 +78,16 @@ protected Class<? extends View> tasksPage() {
|
||||
protected Class<? extends View> taskPage() {
|
||||
return HsTaskPage.class;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.apache.hadoop.mapreduce.v2.app.webapp.AppController#attemptsPage()
|
||||
*/
|
||||
@Override
|
||||
protected Class<? extends View> attemptsPage() {
|
||||
return HsAttemptsPage.class;
|
||||
}
|
||||
|
||||
// Need all of these methods here also as Guice doesn't look into parent
|
||||
// classes.
|
||||
|
||||
@ -127,6 +136,21 @@ public void attempts() {
|
||||
super.attempts();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the page that will be used to render the /conf page
|
||||
*/
|
||||
protected Class<? extends View> confPage() {
|
||||
return HsConfPage.class;
|
||||
}
|
||||
|
||||
/**
|
||||
* Render the /conf page
|
||||
*/
|
||||
public void conf() {
|
||||
requireJob();
|
||||
render(confPage());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the page about the current server.
|
||||
*/
|
||||
|
@ -20,8 +20,10 @@
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
@ -32,12 +34,13 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.webapp.ResponseInfo;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||
import org.apache.hadoop.yarn.webapp.view.InfoBlock;
|
||||
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
|
||||
import static org.apache.hadoop.yarn.util.StringHelper.*;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
|
||||
|
||||
/**
|
||||
@ -46,18 +49,9 @@
|
||||
public class HsJobBlock extends HtmlBlock {
|
||||
final AppContext appContext;
|
||||
|
||||
int runningMapTasks = 0;
|
||||
int pendingMapTasks = 0;
|
||||
int runningReduceTasks = 0;
|
||||
int pendingReduceTasks = 0;
|
||||
|
||||
int newMapAttempts = 0;
|
||||
int runningMapAttempts = 0;
|
||||
int killedMapAttempts = 0;
|
||||
int failedMapAttempts = 0;
|
||||
int successfulMapAttempts = 0;
|
||||
int newReduceAttempts = 0;
|
||||
int runningReduceAttempts = 0;
|
||||
int killedReduceAttempts = 0;
|
||||
int failedReduceAttempts = 0;
|
||||
int successfulReduceAttempts = 0;
|
||||
@ -84,9 +78,9 @@ public class HsJobBlock extends HtmlBlock {
|
||||
p()._("Sorry, ", jid, " not found.")._();
|
||||
return;
|
||||
}
|
||||
Map<JobACL, AccessControlList> acls = job.getJobACLs();
|
||||
|
||||
JobReport jobReport = job.getReport();
|
||||
String mapPct = percent(jobReport.getMapProgress());
|
||||
String reducePct = percent(jobReport.getReduceProgress());
|
||||
int mapTasks = job.getTotalMaps();
|
||||
int mapTasksComplete = job.getCompletedMaps();
|
||||
int reduceTasks = job.getTotalReduces();
|
||||
@ -94,13 +88,29 @@ public class HsJobBlock extends HtmlBlock {
|
||||
long startTime = jobReport.getStartTime();
|
||||
long finishTime = jobReport.getFinishTime();
|
||||
countTasksAndAttempts(job);
|
||||
info("Job Overview").
|
||||
ResponseInfo infoBlock = info("Job Overview").
|
||||
_("Job Name:", job.getName()).
|
||||
_("User Name:", job.getUserName()).
|
||||
_("State:", job.getState()).
|
||||
_("Uberized:", job.isUber()).
|
||||
_("Started:", new Date(startTime)).
|
||||
_("Finished:", new Date(finishTime)).
|
||||
_("Elapsed:", StringUtils.formatTime(
|
||||
Times.elapsed(startTime, finishTime)));
|
||||
|
||||
List<String> diagnostics = job.getDiagnostics();
|
||||
if(diagnostics != null && !diagnostics.isEmpty()) {
|
||||
StringBuffer b = new StringBuffer();
|
||||
for(String diag: diagnostics) {
|
||||
b.append(diag);
|
||||
}
|
||||
infoBlock._("Diagnostics:", b.toString());
|
||||
}
|
||||
|
||||
for(Map.Entry<JobACL, AccessControlList> entry : acls.entrySet()) {
|
||||
infoBlock._("ACL "+entry.getKey().getAclName()+":",
|
||||
entry.getValue().getAclString());
|
||||
}
|
||||
html.
|
||||
_(InfoBlock.class).
|
||||
div(_INFO_WRAP).
|
||||
@ -109,34 +119,17 @@ public class HsJobBlock extends HtmlBlock {
|
||||
table("#job").
|
||||
tr().
|
||||
th(_TH, "Task Type").
|
||||
th(_TH, "Progress").
|
||||
th(_TH, "Total").
|
||||
th(_TH, "Pending").
|
||||
th(_TH, "Running").
|
||||
th(_TH, "Complete")._().
|
||||
tr(_ODD).
|
||||
th().
|
||||
a(url("tasks", jid, "m"), "Map")._().
|
||||
td().
|
||||
div(_PROGRESSBAR).
|
||||
$title(join(mapPct, '%')). // tooltip
|
||||
div(_PROGRESSBAR_VALUE).
|
||||
$style(join("width:", mapPct, '%'))._()._()._().
|
||||
td(String.valueOf(mapTasks)).
|
||||
td(String.valueOf(pendingMapTasks)).
|
||||
td(String.valueOf(runningMapTasks)).
|
||||
td(String.valueOf(mapTasksComplete))._().
|
||||
tr(_EVEN).
|
||||
th().
|
||||
a(url("tasks", jid, "r"), "Reduce")._().
|
||||
td().
|
||||
div(_PROGRESSBAR).
|
||||
$title(join(reducePct, '%')). // tooltip
|
||||
div(_PROGRESSBAR_VALUE).
|
||||
$style(join("width:", reducePct, '%'))._()._()._().
|
||||
td(String.valueOf(reduceTasks)).
|
||||
td(String.valueOf(pendingReduceTasks)).
|
||||
td(String.valueOf(runningReduceTasks)).
|
||||
td(String.valueOf(reducesTasksComplete))._()
|
||||
._().
|
||||
|
||||
@ -144,19 +137,11 @@ public class HsJobBlock extends HtmlBlock {
|
||||
table("#job").
|
||||
tr().
|
||||
th(_TH, "Attempt Type").
|
||||
th(_TH, "New").
|
||||
th(_TH, "Running").
|
||||
th(_TH, "Failed").
|
||||
th(_TH, "Killed").
|
||||
th(_TH, "Successful")._().
|
||||
tr(_ODD).
|
||||
th("Maps").
|
||||
td().a(url("attempts", jid, "m",
|
||||
TaskAttemptStateUI.NEW.toString()),
|
||||
String.valueOf(newMapAttempts))._().
|
||||
td().a(url("attempts", jid, "m",
|
||||
TaskAttemptStateUI.RUNNING.toString()),
|
||||
String.valueOf(runningMapAttempts))._().
|
||||
td().a(url("attempts", jid, "m",
|
||||
TaskAttemptStateUI.FAILED.toString()),
|
||||
String.valueOf(failedMapAttempts))._().
|
||||
@ -169,12 +154,6 @@ public class HsJobBlock extends HtmlBlock {
|
||||
_().
|
||||
tr(_EVEN).
|
||||
th("Reduces").
|
||||
td().a(url("attempts", jid, "r",
|
||||
TaskAttemptStateUI.NEW.toString()),
|
||||
String.valueOf(newReduceAttempts))._().
|
||||
td().a(url("attempts", jid, "r",
|
||||
TaskAttemptStateUI.RUNNING.toString()),
|
||||
String.valueOf(runningReduceAttempts))._().
|
||||
td().a(url("attempts", jid, "r",
|
||||
TaskAttemptStateUI.FAILED.toString()),
|
||||
String.valueOf(failedReduceAttempts))._().
|
||||
@ -197,42 +176,17 @@ public class HsJobBlock extends HtmlBlock {
|
||||
private void countTasksAndAttempts(Job job) {
|
||||
Map<TaskId, Task> tasks = job.getTasks();
|
||||
for (Task task : tasks.values()) {
|
||||
switch (task.getType()) {
|
||||
case MAP:
|
||||
// Task counts
|
||||
switch (task.getState()) {
|
||||
case RUNNING:
|
||||
++runningMapTasks;
|
||||
break;
|
||||
case SCHEDULED:
|
||||
++pendingMapTasks;
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case REDUCE:
|
||||
// Task counts
|
||||
switch (task.getState()) {
|
||||
case RUNNING:
|
||||
++runningReduceTasks;
|
||||
break;
|
||||
case SCHEDULED:
|
||||
++pendingReduceTasks;
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// Attempts counts
|
||||
Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
|
||||
for (TaskAttempt attempt : attempts.values()) {
|
||||
|
||||
int newAttempts = 0, running = 0, successful = 0, failed = 0, killed =0;
|
||||
int successful = 0, failed = 0, killed =0;
|
||||
|
||||
if (TaskAttemptStateUI.NEW.correspondsTo(attempt.getState())) {
|
||||
++newAttempts;
|
||||
//Do Nothing
|
||||
} else if (TaskAttemptStateUI.RUNNING.correspondsTo(attempt
|
||||
.getState())) {
|
||||
++running;
|
||||
//Do Nothing
|
||||
} else if (TaskAttemptStateUI.SUCCESSFUL.correspondsTo(attempt
|
||||
.getState())) {
|
||||
++successful;
|
||||
@ -246,15 +200,11 @@ private void countTasksAndAttempts(Job job) {
|
||||
|
||||
switch (task.getType()) {
|
||||
case MAP:
|
||||
newMapAttempts += newAttempts;
|
||||
runningMapAttempts += running;
|
||||
successfulMapAttempts += successful;
|
||||
failedMapAttempts += failed;
|
||||
killedMapAttempts += killed;
|
||||
break;
|
||||
case REDUCE:
|
||||
newReduceAttempts += newAttempts;
|
||||
runningReduceAttempts += running;
|
||||
successfulReduceAttempts += successful;
|
||||
failedReduceAttempts += failed;
|
||||
killedReduceAttempts += killed;
|
||||
|
@ -52,6 +52,7 @@ public class HsNavBlock extends HtmlBlock {
|
||||
ul().
|
||||
li().a(url("job", jobid), "Overview")._().
|
||||
li().a(url("jobcounters", jobid), "Counters")._().
|
||||
li().a(url("conf", jobid), "Configuration")._().
|
||||
li().a(url("tasks", jobid, "m"), "Map tasks")._().
|
||||
li().a(url("tasks", jobid, "r"), "Reduce tasks")._()._();
|
||||
}
|
||||
|
@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs.webapp;
|
||||
|
||||
import static org.apache.hadoop.yarn.util.StringHelper.percent;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
|
||||
@ -73,7 +72,6 @@ protected void render(Block html) {
|
||||
thead().
|
||||
tr().
|
||||
th(".id", "Attempt").
|
||||
th(".progress", "Progress").
|
||||
th(".state", "State").
|
||||
th(".node", "node").
|
||||
th(".tsh", "Started").
|
||||
@ -83,7 +81,6 @@ protected void render(Block html) {
|
||||
tbody();
|
||||
for (TaskAttempt ta : getTaskAttempts()) {
|
||||
String taid = MRApps.toString(ta.getID());
|
||||
String progress = percent(ta.getProgress());
|
||||
ContainerId containerId = ta.getAssignedContainerID();
|
||||
|
||||
String nodeHttpAddr = ta.getNodeHttpAddress();
|
||||
@ -93,7 +90,6 @@ protected void render(Block html) {
|
||||
TD<TR<TBODY<TABLE<Hamlet>>>> nodeTd = tbody.
|
||||
tr().
|
||||
td(".id", taid).
|
||||
td(".progress", progress).
|
||||
td(".state", ta.getState().toString()).
|
||||
td().
|
||||
a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr);
|
||||
|
@ -0,0 +1,99 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.hs.webapp;
|
||||
|
||||
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
|
||||
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.webapp.App;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
/**
|
||||
* Render the a table of tasks for a given type.
|
||||
*/
|
||||
public class HsTasksBlock extends HtmlBlock {
|
||||
final App app;
|
||||
|
||||
@Inject HsTasksBlock(App app) {
|
||||
this.app = app;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.apache.hadoop.yarn.webapp.view.HtmlBlock#render(org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block)
|
||||
*/
|
||||
@Override protected void render(Block html) {
|
||||
if (app.getJob() == null) {
|
||||
html.
|
||||
h2($(TITLE));
|
||||
return;
|
||||
}
|
||||
TaskType type = null;
|
||||
String symbol = $(TASK_TYPE);
|
||||
if (!symbol.isEmpty()) {
|
||||
type = MRApps.taskType(symbol);
|
||||
}
|
||||
TBODY<TABLE<Hamlet>> tbody = html.
|
||||
table("#tasks").
|
||||
thead().
|
||||
tr().
|
||||
th("Task").
|
||||
th("State").
|
||||
th("Start Time").
|
||||
th("Finish Time").
|
||||
th("Elapsed Time")._()._().
|
||||
tbody();
|
||||
for (Task task : app.getJob().getTasks().values()) {
|
||||
if (type != null && task.getType() != type) {
|
||||
continue;
|
||||
}
|
||||
String tid = MRApps.toString(task.getID());
|
||||
TaskReport report = task.getReport();
|
||||
long startTime = report.getStartTime();
|
||||
long finishTime = report.getFinishTime();
|
||||
long elapsed = Times.elapsed(startTime, finishTime);
|
||||
tbody.
|
||||
tr().
|
||||
td().
|
||||
br().$title(String.valueOf(task.getID().getId()))._(). // sorting
|
||||
a(url("task", tid), tid)._().
|
||||
td(report.getTaskState().toString()).
|
||||
td().
|
||||
br().$title(String.valueOf(startTime))._().
|
||||
_(Times.format(startTime))._().
|
||||
td().
|
||||
br().$title(String.valueOf(finishTime))._().
|
||||
_(Times.format(finishTime))._().
|
||||
td().
|
||||
br().$title(String.valueOf(elapsed))._().
|
||||
_(StringUtils.formatTime(elapsed))._()._();
|
||||
}
|
||||
tbody._()._();
|
||||
}
|
||||
}
|
@ -24,7 +24,6 @@
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
|
||||
|
||||
import org.apache.hadoop.mapreduce.v2.app.webapp.TasksBlock;
|
||||
import org.apache.hadoop.yarn.webapp.SubView;
|
||||
|
||||
/**
|
||||
@ -46,10 +45,10 @@ public class HsTasksPage extends HsView {
|
||||
|
||||
/**
|
||||
* The content of this page is the TasksBlock
|
||||
* @return TasksBlock.class
|
||||
* @return HsTasksBlock.class
|
||||
*/
|
||||
@Override protected Class<? extends SubView> content() {
|
||||
return TasksBlock.class;
|
||||
return HsTasksBlock.class;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -39,6 +39,7 @@ public void setup() {
|
||||
route("/", HsController.class);
|
||||
route("/app", HsController.class);
|
||||
route(pajoin("/job", JOB_ID), HsController.class, "job");
|
||||
route(pajoin("/conf", JOB_ID), HsController.class, "conf");
|
||||
route(pajoin("/jobcounters", JOB_ID), HsController.class, "jobCounters");
|
||||
route(pajoin("/tasks", JOB_ID, TASK_TYPE), HsController.class, "tasks");
|
||||
route(pajoin("/attempts", JOB_ID, TASK_TYPE, ATTEMPT_STATE),
|
||||
|
@ -19,10 +19,16 @@
|
||||
package org.apache.hadoop.mapreduce.v2.hs.webapp;
|
||||
|
||||
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.APP_ID;
|
||||
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.ATTEMPT_STATE;
|
||||
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
|
||||
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
|
||||
@ -31,13 +37,13 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.util.Apps;
|
||||
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.inject.Injector;
|
||||
|
||||
public class TestHSWebApp {
|
||||
private static final Log LOG = LogFactory.getLog(TestHSWebApp.class);
|
||||
|
||||
static class TestAppContext implements AppContext {
|
||||
final ApplicationAttemptId appAttemptID;
|
||||
@ -111,16 +117,43 @@ public long getStartTime() {
|
||||
}
|
||||
|
||||
@Test public void testJobView() {
|
||||
LOG.info("HsJobPage");
|
||||
WebAppTests.testPage(HsJobPage.class, AppContext.class, new TestAppContext());
|
||||
}
|
||||
|
||||
@Test public void testTasksView() {
|
||||
LOG.info("HsTasksPage");
|
||||
WebAppTests.testPage(HsTasksPage.class, AppContext.class,
|
||||
new TestAppContext());
|
||||
}
|
||||
|
||||
@Test public void testTaskView() {
|
||||
LOG.info("HsTaskPage");
|
||||
WebAppTests.testPage(HsTaskPage.class, AppContext.class,
|
||||
new TestAppContext());
|
||||
}
|
||||
|
||||
@Test public void testAttemptsWithJobView() {
|
||||
LOG.info("HsAttemptsPage with data");
|
||||
TestAppContext ctx = new TestAppContext();
|
||||
JobId id = ctx.getAllJobs().keySet().iterator().next();
|
||||
Map<String, String> params = new HashMap<String,String>();
|
||||
params.put(JOB_ID, id.toString());
|
||||
params.put(TASK_TYPE, "m");
|
||||
params.put(ATTEMPT_STATE, "SUCCESSFUL");
|
||||
WebAppTests.testPage(HsAttemptsPage.class, AppContext.class,
|
||||
ctx, params);
|
||||
}
|
||||
|
||||
@Test public void testAttemptsView() {
|
||||
LOG.info("HsAttemptsPage");
|
||||
WebAppTests.testPage(HsAttemptsPage.class, AppContext.class,
|
||||
new TestAppContext());
|
||||
}
|
||||
|
||||
@Test public void testConfView() {
|
||||
LOG.info("HsConfPage");
|
||||
WebAppTests.testPage(HsConfPage.class, AppContext.class,
|
||||
new TestAppContext());
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,8 @@
|
||||
import org.apache.hadoop.yarn.webapp.WebAppException;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Scopes;
|
||||
import com.google.inject.servlet.RequestScoped;
|
||||
@ -126,22 +128,31 @@ public static <T> Injector testController(Class<? extends Controller> ctrlr,
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> Injector testController(Class<? extends Controller> ctrlr,
|
||||
String methodName) {
|
||||
return testController(ctrlr, methodName, null, null);
|
||||
}
|
||||
|
||||
public static <T> Injector testPage(Class<? extends View> page, Class<T> api,
|
||||
T impl, Module... modules) {
|
||||
T impl, Map<String,String> params, Module... modules) {
|
||||
Injector injector = createMockInjector(api, impl, modules);
|
||||
injector.getInstance(page).render();
|
||||
View view = injector.getInstance(page);
|
||||
if(params != null) {
|
||||
for(Map.Entry<String, String> entry: params.entrySet()) {
|
||||
view.set(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
view.render();
|
||||
flushOutput(injector);
|
||||
return injector;
|
||||
}
|
||||
|
||||
public static <T> Injector testPage(Class<? extends View> page, Class<T> api,
|
||||
T impl, Module... modules) {
|
||||
return testPage(page, api, impl, null, modules);
|
||||
}
|
||||
|
||||
// convenience
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> Injector testPage(Class<? extends View> page) {
|
||||
return testPage(page, null, null);
|
||||
}
|
||||
@ -155,7 +166,6 @@ public static <T> Injector testBlock(Class<? extends SubView> block,
|
||||
}
|
||||
|
||||
// convenience
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> Injector testBlock(Class<? extends SubView> block) {
|
||||
return testBlock(block, null, null);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user