From b2b976a03b3e06017b9844be2e345afc6ab836c7 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Sun, 14 Aug 2011 01:52:31 +0000 Subject: [PATCH] Fixed bad commit for MAPREDUCE-901. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1157454 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/mapred/JobInProgress.java | 5 - .../hadoop/mapred/JobInProgress.java.orig | 3729 ----------------- 2 files changed, 3734 deletions(-) delete mode 100644 mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java.orig diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java b/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java index 5a722c00a7..0f4a1352ec 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java @@ -1251,11 +1251,6 @@ else if (state == TaskStatus.State.FAILED || * @return the job-level counters. */ public synchronized Counters getJobCounters() { - try { - throw new IOException(""); - } catch (IOException ioe) { - LOG.info("getJC", ioe); - } return jobCounters; } diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java.orig b/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java.orig deleted file mode 100644 index 70a7ca6fd0..0000000000 --- a/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java.orig +++ /dev/null @@ -1,3729 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.mapred; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.UnknownHostException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.HashSet; -import java.util.IdentityHashMap; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.Vector; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobCounter; -import org.apache.hadoop.mapreduce.JobSubmissionFiles; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; -import org.apache.hadoop.mapreduce.jobhistory.JobHistory; -import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent; -import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent; -import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent; -import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent; -import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; -import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent; -import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent; -import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent; -import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent; -import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; -import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; -import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; -import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; -import org.apache.hadoop.mapreduce.security.TokenCache; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal; -import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; -import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; -import org.apache.hadoop.mapreduce.split.JobSplit; -import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; -import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; -import org.apache.hadoop.mapreduce.task.JobContextImpl; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.NetworkTopology; -import org.apache.hadoop.net.Node; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.StringUtils; - -/** - * JobInProgress maintains all the info for keeping a Job on the straight and - * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of - * tables for doing bookkeeping of its Tasks. - */ -@InterfaceAudience.LimitedPrivate({"MapReduce"}) -@InterfaceStability.Unstable -public class JobInProgress { - /** - * Used when the a kill is issued to a job which is initializing. - */ - static class KillInterruptedException extends InterruptedException { - private static final long serialVersionUID = 1L; - public KillInterruptedException(String msg) { - super(msg); - } - } - - static final Log LOG = LogFactory.getLog(JobInProgress.class); - - JobProfile profile; - JobStatus status; - Path jobFile = null; - Path localJobFile = null; - - TaskInProgress maps[] = new TaskInProgress[0]; - TaskInProgress reduces[] = new TaskInProgress[0]; - TaskInProgress cleanup[] = new TaskInProgress[0]; - TaskInProgress setup[] = new TaskInProgress[0]; - int numMapTasks = 0; - int numReduceTasks = 0; - final long memoryPerMap; - final long memoryPerReduce; - volatile int numSlotsPerMap = 1; - volatile int numSlotsPerReduce = 1; - final int maxTaskFailuresPerTracker; - - // Counters to track currently running/finished/failed Map/Reduce task-attempts - int runningMapTasks = 0; - int runningReduceTasks = 0; - int finishedMapTasks = 0; - int finishedReduceTasks = 0; - int failedMapTasks = 0; - int failedReduceTasks = 0; - - static final float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f; - int completedMapsForReduceSlowstart = 0; - - // runningMapTasks include speculative tasks, so we need to capture - // speculative tasks separately - int speculativeMapTasks = 0; - int speculativeReduceTasks = 0; - - int mapFailuresPercent = 0; - int reduceFailuresPercent = 0; - int failedMapTIPs = 0; - int failedReduceTIPs = 0; - private volatile boolean launchedCleanup = false; - private volatile boolean launchedSetup = false; - private volatile boolean jobKilled = false; - private volatile boolean jobFailed = false; - private final boolean jobSetupCleanupNeeded; - private final boolean taskCleanupNeeded; - - JobPriority priority = JobPriority.NORMAL; - protected JobTracker jobtracker; - - protected Credentials tokenStorage; - - JobHistory jobHistory; - - // NetworkTopology Node to the set of TIPs - Map> nonRunningMapCache; - - // Map of NetworkTopology Node to set of running TIPs - Map> runningMapCache; - - // A list of non-local non-running maps - List nonLocalMaps; - - // A set of non-local running maps - Set nonLocalRunningMaps; - - // A list of non-running reduce TIPs - List nonRunningReduces; - - // A set of running reduce TIPs - Set runningReduces; - - // A list of cleanup tasks for the map task attempts, to be launched - List mapCleanupTasks = new LinkedList(); - - // A list of cleanup tasks for the reduce task attempts, to be launched - List reduceCleanupTasks = new LinkedList(); - - int maxLevel; - - /** - * A special value indicating that - * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should - * schedule any available map tasks for this job, including speculative tasks. - */ - int anyCacheLevel; - - /** - * A special value indicating that - * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should - * schedule any only off-switch and speculative map tasks for this job. - */ - private static final int NON_LOCAL_CACHE_LEVEL = -1; - - private int taskCompletionEventTracker = 0; - List taskCompletionEvents; - - // The maximum percentage of trackers in cluster added to the 'blacklist'. - private static final double CLUSTER_BLACKLIST_PERCENT = 0.25; - - // The maximum percentage of fetch failures allowed for a map - private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5; - - // No. of tasktrackers in the cluster - private volatile int clusterSize = 0; - - // The no. of tasktrackers where >= conf.getMaxTaskFailuresPerTracker() - // tasks have failed - private volatile int flakyTaskTrackers = 0; - // Map of trackerHostName -> no. of task failures - private Map trackerToFailuresMap = - new TreeMap(); - - //Confine estimation algorithms to an "oracle" class that JIP queries. - ResourceEstimator resourceEstimator; - - long startTime; - long launchTime; - long finishTime; - - // First *task launch times - final Map firstTaskLaunchTimes = - new EnumMap(TaskType.class); - - // Indicates how many times the job got restarted - private final int restartCount; - - JobConf conf; - protected AtomicBoolean tasksInited = new AtomicBoolean(false); - private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus(); - - LocalFileSystem localFs; - FileSystem fs; - String user; - JobID jobId; - volatile private boolean hasSpeculativeMaps; - volatile private boolean hasSpeculativeReduces; - long inputLength = 0; - - Counters jobCounters = new Counters(); - - // Maximum no. of fetch-failure notifications after which map task is killed - private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3; - - // Don't lower speculativeCap below one TT's worth (for small clusters) - private static final int MIN_SPEC_CAP = 10; - - private static final float MIN_SLOTS_CAP = 0.01f; - - // Map of mapTaskId -> no. of fetch failures - private Map mapTaskIdToFetchFailuresMap = - new TreeMap(); - - private Object schedulingInfo; - private String submitHostName; - private String submitHostAddress; - - //thresholds for speculative execution - float slowTaskThreshold; - float speculativeCap; - float slowNodeThreshold; //standard deviations - - //Statistics are maintained for a couple of things - //mapTaskStats is used for maintaining statistics about - //the completion time of map tasks on the trackers. On a per - //tracker basis, the mean time for task completion is maintained - private DataStatistics mapTaskStats = new DataStatistics(); - //reduceTaskStats is used for maintaining statistics about - //the completion time of reduce tasks on the trackers. On a per - //tracker basis, the mean time for task completion is maintained - private DataStatistics reduceTaskStats = new DataStatistics(); - //trackerMapStats used to maintain a mapping from the tracker to the - //the statistics about completion time of map tasks - private Map trackerMapStats = - new HashMap(); - //trackerReduceStats used to maintain a mapping from the tracker to the - //the statistics about completion time of reduce tasks - private Map trackerReduceStats = - new HashMap(); - //runningMapStats used to maintain the RUNNING map tasks' statistics - private DataStatistics runningMapTaskStats = new DataStatistics(); - //runningReduceStats used to maintain the RUNNING reduce tasks' statistics - private DataStatistics runningReduceTaskStats = new DataStatistics(); - - private static class FallowSlotInfo { - long timestamp; - int numSlots; - - public FallowSlotInfo(long timestamp, int numSlots) { - this.timestamp = timestamp; - this.numSlots = numSlots; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - public int getNumSlots() { - return numSlots; - } - - public void setNumSlots(int numSlots) { - this.numSlots = numSlots; - } - } - - private Map trackersReservedForMaps = - new HashMap(); - private Map trackersReservedForReduces = - new HashMap(); - private Path jobSubmitDir = null; - - /** - * Create an almost empty JobInProgress, which can be used only for tests - */ - protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) { - this.conf = conf; - this.jobId = jobid; - this.numMapTasks = conf.getNumMapTasks(); - this.numReduceTasks = conf.getNumReduceTasks(); - this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL; - this.anyCacheLevel = this.maxLevel+1; - this.jobtracker = tracker; - this.restartCount = 0; - this.profile = new JobProfile(conf.getUser(), jobid, "", "", - conf.getJobName(),conf.getQueueName()); - - this.memoryPerMap = conf.getMemoryForMapTask(); - this.memoryPerReduce = conf.getMemoryForReduceTask(); - - this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker(); - - - hasSpeculativeMaps = conf.getMapSpeculativeExecution(); - hasSpeculativeReduces = conf.getReduceSpeculativeExecution(); - this.nonLocalMaps = new LinkedList(); - this.nonLocalRunningMaps = new LinkedHashSet(); - this.runningMapCache = new IdentityHashMap>(); - this.nonRunningReduces = new LinkedList(); - this.runningReduces = new LinkedHashSet(); - this.resourceEstimator = new ResourceEstimator(this); - this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP, - this.profile.getUser(), this.profile.getJobName(), - this.profile.getJobFile(), ""); - this.jobtracker.getInstrumentation().addPrepJob(conf, jobid); - this.taskCompletionEvents = new ArrayList - (numMapTasks + numReduceTasks + 10); - - this.slowTaskThreshold = Math.max(0.0f, - conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f)); - this.speculativeCap = conf.getFloat( - MRJobConfig.SPECULATIVECAP,0.1f); - this.slowNodeThreshold = conf.getFloat( - MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f); - this.jobSetupCleanupNeeded = conf.getBoolean( - MRJobConfig.SETUP_CLEANUP_NEEDED, true); - this.taskCleanupNeeded = conf.getBoolean( - MRJobConfig.TASK_CLEANUP_NEEDED, true); - if (tracker != null) { // Some mock tests have null tracker - this.jobHistory = tracker.getJobHistory(); - } - this.tokenStorage = null; - } - - JobInProgress(JobConf conf) { - restartCount = 0; - jobSetupCleanupNeeded = false; - taskCleanupNeeded = true; - - this.memoryPerMap = conf.getMemoryForMapTask(); - this.memoryPerReduce = conf.getMemoryForReduceTask(); - - this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker(); - } - - /** - * Create a JobInProgress with the given job file, plus a handle - * to the tracker. - */ - public JobInProgress(JobTracker jobtracker, - final JobConf default_conf, int rCount, - JobInfo jobInfo, - Credentials ts - ) throws IOException, InterruptedException { - try { - this.restartCount = rCount; - this.jobId = JobID.downgrade(jobInfo.getJobID()); - String url = "http://" + jobtracker.getJobTrackerMachine() + ":" - + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + this.jobId; - this.jobtracker = jobtracker; - this.jobHistory = jobtracker.getJobHistory(); - this.startTime = System.currentTimeMillis(); - - this.localFs = jobtracker.getLocalFileSystem(); - this.tokenStorage = ts; - // use the user supplied token to add user credentials to the conf - jobSubmitDir = jobInfo.getJobSubmitDir(); - user = jobInfo.getUser().toString(); - - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); - if (ts != null) { - for (Token token : ts.getAllTokens()) { - ugi.addToken(token); - } - } - - fs = ugi.doAs(new PrivilegedExceptionAction() { - public FileSystem run() throws IOException { - return jobSubmitDir.getFileSystem(default_conf); - } - }); - this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR + "/" - + this.jobId + ".xml"); - - jobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir); - fs.copyToLocalFile(jobFile, localJobFile); - conf = new JobConf(localJobFile); - if (conf.getUser() == null) { - this.conf.setUser(user); - } - if (!conf.getUser().equals(user)) { - String desc = "The username " + conf.getUser() + " obtained from the " - + "conf doesn't match the username " + user + " the user " - + "authenticated as"; - AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(), - conf.getUser(), jobId.toString(), desc); - throw new IOException(desc); - } - - String userGroups[] = ugi.getGroupNames(); - String primaryGroup = (userGroups.length > 0) ? userGroups[0] : null; - if (primaryGroup != null) { - conf.set("group.name", primaryGroup); - } - - this.priority = conf.getJobPriority(); - this.profile = new JobProfile(conf.getUser(), this.jobId, jobFile - .toString(), url, conf.getJobName(), conf.getQueueName()); - this.status = new JobStatus(this.jobId, 0.0f, 0.0f, JobStatus.PREP, - profile.getUser(), profile.getJobName(), profile.getJobFile(), - profile.getURL().toString()); - this.jobtracker.getInstrumentation().addPrepJob(conf, this.jobId); - status.setStartTime(startTime); - this.status.setJobPriority(this.priority); - - this.numMapTasks = conf.getNumMapTasks(); - this.numReduceTasks = conf.getNumReduceTasks(); - - this.memoryPerMap = conf.getMemoryForMapTask(); - this.memoryPerReduce = conf.getMemoryForReduceTask(); - - this.taskCompletionEvents = new ArrayList( - numMapTasks + numReduceTasks + 10); - JobContext jobContext = new JobContextImpl(conf, jobId); - this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded(); - this.taskCleanupNeeded = jobContext.getTaskCleanupNeeded(); - - // Construct the jobACLs - status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf)); - - this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent(); - this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent(); - - this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker(); - - hasSpeculativeMaps = conf.getMapSpeculativeExecution(); - hasSpeculativeReduces = conf.getReduceSpeculativeExecution(); - this.maxLevel = jobtracker.getNumTaskCacheLevels(); - this.anyCacheLevel = this.maxLevel + 1; - this.nonLocalMaps = new LinkedList(); - this.nonLocalRunningMaps = new LinkedHashSet(); - this.runningMapCache = new IdentityHashMap>(); - this.nonRunningReduces = new LinkedList(); - this.runningReduces = new LinkedHashSet(); - this.resourceEstimator = new ResourceEstimator(this); - this.submitHostName = conf.getJobSubmitHostName(); - this.submitHostAddress = conf.getJobSubmitHostAddress(); - - this.slowTaskThreshold = Math.max(0.0f, conf.getFloat( - MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f)); - this.speculativeCap = conf.getFloat(MRJobConfig.SPECULATIVECAP, 0.1f); - this.slowNodeThreshold = conf.getFloat( - MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD, 1.0f); - // register job's tokens for renewal - DelegationTokenRenewal.registerDelegationTokensForRenewal(jobInfo - .getJobID(), ts, jobtracker.getConf()); - } finally { - // close all FileSystems that was created above for the current user - // At this point, this constructor is called in the context of an RPC, and - // hence the "current user" is actually referring to the kerberos - // authenticated user (if security is ON). - FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser()); - } - } - - private void printCache (Map> cache) { - LOG.info("The taskcache info:"); - for (Map.Entry> n : cache.entrySet()) { - List tips = n.getValue(); - LOG.info("Cached TIPs on node: " + n.getKey()); - for (TaskInProgress tip : tips) { - LOG.info("tip : " + tip.getTIPId()); - } - } - } - - Map> createCache( - TaskSplitMetaInfo[] splits, int maxLevel) { - Map> cache = - new IdentityHashMap>(maxLevel); - - for (int i = 0; i < splits.length; i++) { - String[] splitLocations = splits[i].getLocations(); - if (splitLocations.length == 0) { - nonLocalMaps.add(maps[i]); - continue; - } - - for(String host: splitLocations) { - Node node = jobtracker.resolveAndAddToTopology(host); - LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node); - for (int j = 0; j < maxLevel; j++) { - List hostMaps = cache.get(node); - if (hostMaps == null) { - hostMaps = new ArrayList(); - cache.put(node, hostMaps); - hostMaps.add(maps[i]); - } - //check whether the hostMaps already contains an entry for a TIP - //This will be true for nodes that are racks and multiple nodes in - //the rack contain the input for a tip. Note that if it already - //exists in the hostMaps, it must be the last element there since - //we process one TIP at a time sequentially in the split-size order - if (hostMaps.get(hostMaps.size() - 1) != maps[i]) { - hostMaps.add(maps[i]); - } - node = node.getParent(); - } - } - } - return cache; - } - - /** - * Check if the job has been initialized. - * @return true if the job has been initialized, - * false otherwise - */ - public boolean inited() { - return tasksInited.get(); - } - - /** - * Get the user for the job - */ - public String getUser() { - return user; - } - - boolean getMapSpeculativeExecution() { - return hasSpeculativeMaps; - } - - boolean getReduceSpeculativeExecution() { - return hasSpeculativeReduces; - } - - long getMemoryForMapTask() { - return memoryPerMap; - } - - long getMemoryForReduceTask() { - return memoryPerReduce; - } - - /** - * Get the number of slots required to run a single map task-attempt. - * @return the number of slots required to run a single map task-attempt - */ - int getNumSlotsPerMap() { - return numSlotsPerMap; - } - - /** - * Set the number of slots required to run a single map task-attempt. - * This is typically set by schedulers which support high-ram jobs. - * @param slots the number of slots required to run a single map task-attempt - */ - void setNumSlotsPerMap(int numSlotsPerMap) { - this.numSlotsPerMap = numSlotsPerMap; - } - - /** - * Get the number of slots required to run a single reduce task-attempt. - * @return the number of slots required to run a single reduce task-attempt - */ - int getNumSlotsPerReduce() { - return numSlotsPerReduce; - } - - /** - * Set the number of slots required to run a single reduce task-attempt. - * This is typically set by schedulers which support high-ram jobs. - * @param slots the number of slots required to run a single reduce - * task-attempt - */ - void setNumSlotsPerReduce(int numSlotsPerReduce) { - this.numSlotsPerReduce = numSlotsPerReduce; - } - - /** - * Construct the splits, etc. This is invoked from an async - * thread so that split-computation doesn't block anyone. Only the - * {@link JobTracker} should invoke this api. Look - * at {@link JobTracker#initJob(JobInProgress)} for more details. - */ - public synchronized void initTasks() - throws IOException, KillInterruptedException, UnknownHostException { - if (tasksInited.get() || isComplete()) { - return; - } - synchronized(jobInitKillStatus){ - if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) { - return; - } - jobInitKillStatus.initStarted = true; - } - - LOG.info("Initializing " + jobId); - - logSubmissionToJobHistory(); - - // log the job priority - setPriority(this.priority); - - // - // generate security keys needed by Tasks - // - generateAndStoreTokens(); - - // - // read input splits and create a map per a split - // - TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId); - numMapTasks = taskSplitMetaInfo.length; - - checkTaskLimits(); - - // Sanity check the locations so we don't create/initialize unnecessary tasks - for (TaskSplitMetaInfo split : taskSplitMetaInfo) { - NetUtils.verifyHostnames(split.getLocations()); - } - - jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks); - jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks); - - createMapTasks(jobFile.toString(), taskSplitMetaInfo); - - if (numMapTasks > 0) { - nonRunningMapCache = createCache(taskSplitMetaInfo, - maxLevel); - } - - // set the launch time - this.launchTime = JobTracker.getClock().getTime(); - - createReduceTasks(jobFile.toString()); - - // Calculate the minimum number of maps to be complete before - // we should start scheduling reduces - completedMapsForReduceSlowstart = - (int)Math.ceil( - (conf.getFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, - DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * - numMapTasks)); - - initSetupCleanupTasks(jobFile.toString()); - - synchronized(jobInitKillStatus){ - jobInitKillStatus.initDone = true; - if(jobInitKillStatus.killed) { - //setup not launched so directly terminate - throw new KillInterruptedException("Job " + jobId + " killed in init"); - } - } - - tasksInited.set(true); - JobInitedEvent jie = new JobInitedEvent( - profile.getJobID(), this.launchTime, - numMapTasks, numReduceTasks, - JobStatus.getJobRunState(JobStatus.PREP)); - - jobHistory.logEvent(jie, jobId); - - // Log the number of map and reduce tasks - LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks - + " map tasks and " + numReduceTasks + " reduce tasks."); - } - - // Returns true if the job is empty (0 maps, 0 reduces and no setup-cleanup) - // else return false. - synchronized boolean isJobEmpty() { - return maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded; - } - - synchronized boolean isSetupCleanupRequired() { - return jobSetupCleanupNeeded; - } - - // Should be called once the init is done. This will complete the job - // because the job is empty (0 maps, 0 reduces and no setup-cleanup). - synchronized void completeEmptyJob() { - jobComplete(); - } - - synchronized void completeSetup() { - setupComplete(); - } - - void logSubmissionToJobHistory() throws IOException { - // log job info - String username = conf.getUser(); - if (username == null) { username = ""; } - String jobname = conf.getJobName(); - String jobQueueName = conf.getQueueName(); - - setUpLocalizedJobConf(conf, jobId); - jobHistory.setupEventWriter(jobId, conf); - JobSubmittedEvent jse = - new JobSubmittedEvent(jobId, jobname, username, this.startTime, - jobFile.toString(), status.getJobACLs(), jobQueueName); - jobHistory.logEvent(jse, jobId); - - } - - TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId) - throws IOException { - TaskSplitMetaInfo[] allTaskSplitMetaInfo = - SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf, jobSubmitDir); - return allTaskSplitMetaInfo; - } - - /** - * If the number of taks is greater than the configured value - * throw an exception that will fail job initialization - */ - void checkTaskLimits() throws IOException { - int maxTasks = jobtracker.getMaxTasksPerJob(); - if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) { - throw new IOException( - "The number of tasks for this job " + - (numMapTasks + numReduceTasks) + - " exceeds the configured limit " + maxTasks); - } - } - - synchronized void createMapTasks(String jobFile, - TaskSplitMetaInfo[] splits) { - maps = new TaskInProgress[numMapTasks]; - for(int i=0; i < numMapTasks; ++i) { - inputLength += splits[i].getInputDataLength(); - maps[i] = new TaskInProgress(jobId, jobFile, - splits[i], - jobtracker, conf, this, - i, numSlotsPerMap); - } - LOG.info("Input size for job " + jobId + " = " + inputLength - + ". Number of splits = " + splits.length); - - } - - synchronized void createReduceTasks(String jobFile) { - this.reduces = new TaskInProgress[numReduceTasks]; - for (int i = 0; i < numReduceTasks; i++) { - reduces[i] = new TaskInProgress(jobId, jobFile, - numMapTasks, i, - jobtracker, conf, - this, numSlotsPerReduce); - nonRunningReduces.add(reduces[i]); - } - } - - - synchronized void initSetupCleanupTasks(String jobFile) { - if (!jobSetupCleanupNeeded) { - LOG.info("Setup/Cleanup not needed for job " + jobId); - // nothing to initialize - return; - } - // create cleanup two cleanup tips, one map and one reduce. - cleanup = new TaskInProgress[2]; - - // cleanup map tip. This map doesn't use any splits. Just assign an empty - // split. - TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT; - cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, - jobtracker, conf, this, numMapTasks, 1); - cleanup[0].setJobCleanupTask(); - - // cleanup reduce tip. - cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, - numReduceTasks, jobtracker, conf, this, 1); - cleanup[1].setJobCleanupTask(); - - // create two setup tips, one map and one reduce. - setup = new TaskInProgress[2]; - - // setup map tip. This map doesn't use any split. Just assign an empty - // split. - setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, - jobtracker, conf, this, numMapTasks + 1, 1); - setup[0].setJobSetupTask(); - - // setup reduce tip. - setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, - numReduceTasks + 1, jobtracker, conf, this, 1); - setup[1].setJobSetupTask(); - } - - void setupComplete() { - status.setSetupProgress(1.0f); - if (this.status.getRunState() == JobStatus.PREP) { - changeStateTo(JobStatus.RUNNING); - JobStatusChangedEvent jse = - new JobStatusChangedEvent(profile.getJobID(), - JobStatus.getJobRunState(JobStatus.RUNNING)); - jobHistory.logEvent(jse, profile.getJobID()); - } - } - - ///////////////////////////////////////////////////// - // Accessors for the JobInProgress - ///////////////////////////////////////////////////// - public JobProfile getProfile() { - return profile; - } - public JobStatus getStatus() { - return status; - } - public synchronized long getLaunchTime() { - return launchTime; - } - Map getFirstTaskLaunchTimes() { - return firstTaskLaunchTimes; - } - public long getStartTime() { - return startTime; - } - public long getFinishTime() { - return finishTime; - } - public int desiredMaps() { - return numMapTasks; - } - public synchronized int finishedMaps() { - return finishedMapTasks; - } - public int desiredReduces() { - return numReduceTasks; - } - public synchronized int runningMaps() { - return runningMapTasks; - } - public synchronized int runningReduces() { - return runningReduceTasks; - } - public synchronized int finishedReduces() { - return finishedReduceTasks; - } - public synchronized int pendingMaps() { - return numMapTasks - runningMapTasks - failedMapTIPs - - finishedMapTasks + speculativeMapTasks; - } - public synchronized int pendingReduces() { - return numReduceTasks - runningReduceTasks - failedReduceTIPs - - finishedReduceTasks + speculativeReduceTasks; - } - - public int getNumSlotsPerTask(TaskType taskType) { - if (taskType == TaskType.MAP) { - return numSlotsPerMap; - } else if (taskType == TaskType.REDUCE) { - return numSlotsPerReduce; - } else { - return 1; - } - } - public JobPriority getPriority() { - return this.priority; - } - public void setPriority(JobPriority priority) { - if(priority == null) { - priority = JobPriority.NORMAL; - } - synchronized (this) { - this.priority = priority; - status.setJobPriority(priority); - // log and change to the job's priority - JobPriorityChangeEvent prEvent = - new JobPriorityChangeEvent(jobId, priority); - - jobHistory.logEvent(prEvent, jobId); - - } - } - - // Update the job start/launch time (upon restart) and log to history - synchronized void updateJobInfo(long startTime, long launchTime) { - // log and change to the job's start/launch time - this.startTime = startTime; - this.launchTime = launchTime; - JobInfoChangeEvent event = - new JobInfoChangeEvent(jobId, startTime, launchTime); - - jobHistory.logEvent(event, jobId); - - } - - /** - * Get the number of times the job has restarted - */ - int getNumRestarts() { - return restartCount; - } - - long getInputLength() { - return inputLength; - } - - boolean isCleanupLaunched() { - return launchedCleanup; - } - - boolean isSetupLaunched() { - return launchedSetup; - } - - /** - * Get all the tasks of the desired type in this job. - * @param type {@link TaskType} of the tasks required - * @return An array of {@link TaskInProgress} matching the given type. - * Returns an empty array if no tasks are found for the given type. - */ - TaskInProgress[] getTasks(TaskType type) { - TaskInProgress[] tasks = null; - switch (type) { - case MAP: - { - tasks = maps; - } - break; - case REDUCE: - { - tasks = reduces; - } - break; - case JOB_SETUP: - { - tasks = setup; - } - break; - case JOB_CLEANUP: - { - tasks = cleanup; - } - break; - default: - { - tasks = new TaskInProgress[0]; - } - break; - } - return tasks; - } - - /** - * Return the nonLocalRunningMaps - * @return - */ - Set getNonLocalRunningMaps() - { - return nonLocalRunningMaps; - } - - /** - * Return the runningMapCache - * @return - */ - Map> getRunningMapCache() - { - return runningMapCache; - } - - /** - * Return runningReduces - * @return - */ - Set getRunningReduces() - { - return runningReduces; - } - - /** - * Get the job configuration - * @return the job's configuration - */ - JobConf getJobConf() { - return conf; - } - - /** - * Return a vector of completed TaskInProgress objects - */ - public synchronized Vector reportTasksInProgress(boolean shouldBeMap, - boolean shouldBeComplete) { - - Vector results = new Vector(); - TaskInProgress tips[] = null; - if (shouldBeMap) { - tips = maps; - } else { - tips = reduces; - } - for (int i = 0; i < tips.length; i++) { - if (tips[i].isComplete() == shouldBeComplete) { - results.add(tips[i]); - } - } - return results; - } - - /** - * Return a vector of cleanup TaskInProgress objects - */ - public synchronized Vector reportCleanupTIPs( - boolean shouldBeComplete) { - - Vector results = new Vector(); - for (int i = 0; i < cleanup.length; i++) { - if (cleanup[i].isComplete() == shouldBeComplete) { - results.add(cleanup[i]); - } - } - return results; - } - - /** - * Return a vector of setup TaskInProgress objects - */ - public synchronized Vector reportSetupTIPs( - boolean shouldBeComplete) { - - Vector results = new Vector(); - for (int i = 0; i < setup.length; i++) { - if (setup[i].isComplete() == shouldBeComplete) { - results.add(setup[i]); - } - } - return results; - } - - //////////////////////////////////////////////////// - // Status update methods - //////////////////////////////////////////////////// - - /** - * Assuming {@link JobTracker} is locked on entry. - */ - public synchronized void updateTaskStatus(TaskInProgress tip, - TaskStatus status) { - - double oldProgress = tip.getProgress(); // save old progress - boolean wasRunning = tip.isRunning(); - boolean wasComplete = tip.isComplete(); - boolean wasPending = tip.isOnlyCommitPending(); - TaskAttemptID taskid = status.getTaskID(); - boolean wasAttemptRunning = tip.isAttemptRunning(taskid); - - - // If the TIP is already completed and the task reports as SUCCEEDED then - // mark the task as KILLED. - // In case of task with no promotion the task tracker will mark the task - // as SUCCEEDED. - // User has requested to kill the task, but TT reported SUCCEEDED, - // mark the task KILLED. - if ((wasComplete || tip.wasKilled(taskid)) && - (status.getRunState() == TaskStatus.State.SUCCEEDED)) { - status.setRunState(TaskStatus.State.KILLED); - } - - // If the job is complete or task-cleanup is switched off - // and a task has just reported its state as FAILED_UNCLEAN/KILLED_UNCLEAN, - // make the task's state FAILED/KILLED without launching cleanup attempt. - // Note that if task is already a cleanup attempt, - // we don't change the state to make sure the task gets a killTaskAction - if ((this.isComplete() || jobFailed || jobKilled || !taskCleanupNeeded) && - !tip.isCleanupAttempt(taskid)) { - if (status.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { - status.setRunState(TaskStatus.State.FAILED); - } else if (status.getRunState() == TaskStatus.State.KILLED_UNCLEAN) { - status.setRunState(TaskStatus.State.KILLED); - } - } - - boolean change = tip.updateStatus(status); - if (change) { - TaskStatus.State state = status.getRunState(); - // get the TaskTrackerStatus where the task ran - TaskTracker taskTracker = - this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid)); - TaskTrackerStatus ttStatus = - (taskTracker == null) ? null : taskTracker.getStatus(); - String taskTrackerHttpLocation = null; - - if (null != ttStatus){ - String host; - if (NetUtils.getStaticResolution(ttStatus.getHost()) != null) { - host = NetUtils.getStaticResolution(ttStatus.getHost()); - } else { - host = ttStatus.getHost(); - } - taskTrackerHttpLocation = "http://" + host + ":" - + ttStatus.getHttpPort(); - } - - TaskCompletionEvent taskEvent = null; - if (state == TaskStatus.State.SUCCEEDED) { - taskEvent = new TaskCompletionEvent( - taskCompletionEventTracker, - taskid, - tip.idWithinJob(), - status.getIsMap() && - !tip.isJobCleanupTask() && - !tip.isJobSetupTask(), - TaskCompletionEvent.Status.SUCCEEDED, - taskTrackerHttpLocation - ); - taskEvent.setTaskRunTime((int)(status.getFinishTime() - - status.getStartTime())); - tip.setSuccessEventNumber(taskCompletionEventTracker); - } else if (state == TaskStatus.State.COMMIT_PENDING) { - // If it is the first attempt reporting COMMIT_PENDING - // ask the task to commit. - if (!wasComplete && !wasPending) { - tip.doCommit(taskid); - } - return; - } else if (state == TaskStatus.State.FAILED_UNCLEAN || - state == TaskStatus.State.KILLED_UNCLEAN) { - tip.incompleteSubTask(taskid, this.status); - // add this task, to be rescheduled as cleanup attempt - if (tip.isMapTask()) { - mapCleanupTasks.add(taskid); - } else { - reduceCleanupTasks.add(taskid); - } - // Remove the task entry from jobtracker - jobtracker.removeTaskEntry(taskid); - } - //For a failed task update the JT datastructures. - else if (state == TaskStatus.State.FAILED || - state == TaskStatus.State.KILLED) { - // Get the event number for the (possibly) previously successful - // task. If there exists one, then set that status to OBSOLETE - int eventNumber; - if ((eventNumber = tip.getSuccessEventNumber()) != -1) { - TaskCompletionEvent t = - this.taskCompletionEvents.get(eventNumber); - if (t.getTaskAttemptId().equals(taskid)) - t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE); - } - - // Tell the job to fail the relevant task - failedTask(tip, taskid, status, taskTracker, - wasRunning, wasComplete, wasAttemptRunning); - - // Did the task failure lead to tip failure? - TaskCompletionEvent.Status taskCompletionStatus = - (state == TaskStatus.State.FAILED ) ? - TaskCompletionEvent.Status.FAILED : - TaskCompletionEvent.Status.KILLED; - if (tip.isFailed()) { - taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED; - } - taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, - taskid, - tip.idWithinJob(), - status.getIsMap() && - !tip.isJobCleanupTask() && - !tip.isJobSetupTask(), - taskCompletionStatus, - taskTrackerHttpLocation - ); - } - - // Add the 'complete' task i.e. successful/failed - // It _is_ safe to add the TaskCompletionEvent.Status.SUCCEEDED - // *before* calling TIP.completedTask since: - // a. One and only one task of a TIP is declared as a SUCCESS, the - // other (speculative tasks) are marked KILLED - // b. TIP.completedTask *does not* throw _any_ exception at all. - if (taskEvent != null) { - this.taskCompletionEvents.add(taskEvent); - taskCompletionEventTracker++; - JobTrackerStatistics.TaskTrackerStat ttStat = jobtracker. - getStatistics().getTaskTrackerStat(tip.machineWhereTaskRan(taskid)); - if(ttStat != null) { // ttStat can be null in case of lost tracker - ttStat.incrTotalTasks(); - } - if (state == TaskStatus.State.SUCCEEDED) { - completedTask(tip, status); - if(ttStat != null) { - ttStat.incrSucceededTasks(); - } - } - } - } - - // - // Update JobInProgress status - // - if (LOG.isDebugEnabled()) { - LOG.debug("Taking progress for " + tip.getTIPId() + " from " + - oldProgress + " to " + tip.getProgress()); - } - - if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) { - double progressDelta = tip.getProgress() - oldProgress; - if (tip.isMapTask()) { - this.status.setMapProgress((float) (this.status.mapProgress() + - progressDelta / maps.length)); - } else { - this.status.setReduceProgress((float) (this.status.reduceProgress() + - (progressDelta / reduces.length))); - } - } - } - - /** - * Returns the job-level counters. - * - * @return the job-level counters. - */ - public synchronized Counters getJobCounters() { - return jobCounters; - } - - /** - * Returns map phase counters by summing over all map tasks in progress. - */ - public synchronized Counters getMapCounters() { - return incrementTaskCounters(new Counters(), maps); - } - - /** - * Returns map phase counters by summing over all map tasks in progress. - */ - public synchronized Counters getReduceCounters() { - return incrementTaskCounters(new Counters(), reduces); - } - - /** - * Returns the total job counters, by adding together the job, - * the map and the reduce counters. - */ - public Counters getCounters() { - Counters result = new Counters(); - synchronized (this) { - result.incrAllCounters(getJobCounters()); - } - - // the counters of TIPs are not updated in place. - // hence read-only access is ok without any locks - incrementTaskCounters(result, maps); - return incrementTaskCounters(result, reduces); - } - - /** - * Increments the counters with the counters from each task. - * @param counters the counters to increment - * @param tips the tasks to add in to counters - * @return counters the same object passed in as counters - */ - private Counters incrementTaskCounters(Counters counters, - TaskInProgress[] tips) { - for (TaskInProgress tip : tips) { - counters.incrAllCounters(tip.getCounters()); - } - return counters; - } - - ///////////////////////////////////////////////////// - // Create/manage tasks - ///////////////////////////////////////////////////// - /** - * Return a MapTask, if appropriate, to run on the given tasktracker - */ - public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, - int clusterSize, - int numUniqueHosts, - int maxCacheLevel - ) throws IOException { - if (status.getRunState() != JobStatus.RUNNING) { - LOG.info("Cannot create task split for " + profile.getJobID()); - return null; - } - - int target = findNewMapTask(tts, clusterSize, numUniqueHosts, - maxCacheLevel); - if (target == -1) { - return null; - } - - Task result = maps[target].getTaskToRun(tts.getTrackerName()); - if (result != null) { - addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true); - } - - return result; - } - - /** - * Return a MapTask, if appropriate, to run on the given tasktracker - */ - public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, - int clusterSize, - int numUniqueHosts - ) throws IOException { - return obtainNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel); - } - - /* - * Return task cleanup attempt if any, to run on a given tracker - */ - public Task obtainTaskCleanupTask(TaskTrackerStatus tts, - boolean isMapSlot) - throws IOException { - if (!tasksInited.get()) { - return null; - } - synchronized (this) { - if (this.status.getRunState() != JobStatus.RUNNING || - jobFailed || jobKilled) { - return null; - } - String taskTracker = tts.getTrackerName(); - if (!shouldRunOnTaskTracker(taskTracker)) { - return null; - } - TaskAttemptID taskid = null; - TaskInProgress tip = null; - if (isMapSlot) { - if (!mapCleanupTasks.isEmpty()) { - taskid = mapCleanupTasks.remove(0); - tip = maps[taskid.getTaskID().getId()]; - } - } else { - if (!reduceCleanupTasks.isEmpty()) { - taskid = reduceCleanupTasks.remove(0); - tip = reduces[taskid.getTaskID().getId()]; - } - } - if (tip != null) { - return tip.addRunningTask(taskid, taskTracker, true); - } - return null; - } - } - - public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts, - int clusterSize, - int numUniqueHosts) - throws IOException { - if (!tasksInited.get()) { - LOG.info("Cannot create task split for " + profile.getJobID()); - return null; - } - - return obtainNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel); - } - - public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts, - int clusterSize, - int numUniqueHosts) - throws IOException { - if (!tasksInited.get()) { - LOG.info("Cannot create task split for " + profile.getJobID()); - return null; - } - - return obtainNewMapTask(tts, clusterSize, numUniqueHosts, - NON_LOCAL_CACHE_LEVEL); - } - - /** - * Return a CleanupTask, if appropriate, to run on the given tasktracker - * - */ - public Task obtainJobCleanupTask(TaskTrackerStatus tts, - int clusterSize, - int numUniqueHosts, - boolean isMapSlot - ) throws IOException { - if(!tasksInited.get() || !jobSetupCleanupNeeded) { - return null; - } - - synchronized(this) { - if (!canLaunchJobCleanupTask()) { - return null; - } - - String taskTracker = tts.getTrackerName(); - // Update the last-known clusterSize - this.clusterSize = clusterSize; - if (!shouldRunOnTaskTracker(taskTracker)) { - return null; - } - - List cleanupTaskList = new ArrayList(); - if (isMapSlot) { - cleanupTaskList.add(cleanup[0]); - } else { - cleanupTaskList.add(cleanup[1]); - } - TaskInProgress tip = findTaskFromList(cleanupTaskList, - tts, numUniqueHosts, false); - if (tip == null) { - return null; - } - - // Now launch the cleanupTask - Task result = tip.getTaskToRun(tts.getTrackerName()); - if (result != null) { - addRunningTaskToTIP(tip, result.getTaskID(), tts, true); - if (jobFailed) { - result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus - .State.FAILED); - } else if (jobKilled) { - result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus - .State.KILLED); - } else { - result.setJobCleanupTaskState(org.apache.hadoop.mapreduce - .JobStatus.State.SUCCEEDED); - } - } - return result; - } - - } - - /** - * Check whether cleanup task can be launched for the job. - * - * Cleanup task can be launched if it is not already launched - * or job is Killed - * or all maps and reduces are complete - * @return true/false - */ - private synchronized boolean canLaunchJobCleanupTask() { - // check if the job is running - if (status.getRunState() != JobStatus.RUNNING && - status.getRunState() != JobStatus.PREP) { - return false; - } - // check if cleanup task has been launched already or if setup isn't - // launched already. The later check is useful when number of maps is - // zero. - if (launchedCleanup || !isSetupFinished()) { - return false; - } - // check if job has failed or killed - if (jobKilled || jobFailed) { - return true; - } - // Check if all maps and reducers have finished. - boolean launchCleanupTask = - ((finishedMapTasks + failedMapTIPs) == (numMapTasks)); - if (launchCleanupTask) { - launchCleanupTask = - ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks); - } - return launchCleanupTask; - } - - /** - * Return a SetupTask, if appropriate, to run on the given tasktracker - * - */ - public Task obtainJobSetupTask(TaskTrackerStatus tts, - int clusterSize, - int numUniqueHosts, - boolean isMapSlot - ) throws IOException { - if(!tasksInited.get() || !jobSetupCleanupNeeded) { - return null; - } - - synchronized(this) { - if (!canLaunchSetupTask()) { - return null; - } - String taskTracker = tts.getTrackerName(); - // Update the last-known clusterSize - this.clusterSize = clusterSize; - if (!shouldRunOnTaskTracker(taskTracker)) { - return null; - } - - List setupTaskList = new ArrayList(); - if (isMapSlot) { - setupTaskList.add(setup[0]); - } else { - setupTaskList.add(setup[1]); - } - TaskInProgress tip = findTaskFromList(setupTaskList, - tts, numUniqueHosts, false); - if (tip == null) { - return null; - } - - // Now launch the setupTask - Task result = tip.getTaskToRun(tts.getTrackerName()); - if (result != null) { - addRunningTaskToTIP(tip, result.getTaskID(), tts, true); - } - return result; - } - } - - public synchronized boolean scheduleReduces() { - return finishedMapTasks >= completedMapsForReduceSlowstart; - } - - /** - * Check whether setup task can be launched for the job. - * - * Setup task can be launched after the tasks are inited - * and Job is in PREP state - * and if it is not already launched - * or job is not Killed/Failed - * @return true/false - */ - private synchronized boolean canLaunchSetupTask() { - return (tasksInited.get() && status.getRunState() == JobStatus.PREP && - !launchedSetup && !jobKilled && !jobFailed); - } - - - /** - * Return a ReduceTask, if appropriate, to run on the given tasktracker. - * We don't have cache-sensitivity for reduce tasks, as they - * work on temporary MapRed files. - */ - public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts, - int clusterSize, - int numUniqueHosts - ) throws IOException { - if (status.getRunState() != JobStatus.RUNNING) { - LOG.info("Cannot create task split for " + profile.getJobID()); - return null; - } - - // Ensure we have sufficient map outputs ready to shuffle before - // scheduling reduces - if (!scheduleReduces()) { - return null; - } - - int target = findNewReduceTask(tts, clusterSize, numUniqueHosts); - if (target == -1) { - return null; - } - - Task result = reduces[target].getTaskToRun(tts.getTrackerName()); - if (result != null) { - addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true); - } - - return result; - } - - // returns the (cache)level at which the nodes matches - private int getMatchingLevelForNodes(Node n1, Node n2) { - int count = 0; - do { - if (n1.equals(n2)) { - return count; - } - ++count; - n1 = n1.getParent(); - n2 = n2.getParent(); - } while (n1 != null); - return this.maxLevel; - } - - /** - * Populate the data structures as a task is scheduled. - * - * Assuming {@link JobTracker} is locked on entry. - * - * @param tip The tip for which the task is added - * @param id The attempt-id for the task - * @param tts task-tracker status - * @param isScheduled Whether this task is scheduled from the JT or has - * joined back upon restart - */ - synchronized void addRunningTaskToTIP(TaskInProgress tip, TaskAttemptID id, - TaskTrackerStatus tts, - boolean isScheduled) { - // Make an entry in the tip if the attempt is not scheduled i.e externally - // added - if (!isScheduled) { - tip.addRunningTask(id, tts.getTrackerName()); - } - final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation(); - - // keeping the earlier ordering intact - TaskType name; - String splits = ""; - Enum counter = null; - if (tip.isJobSetupTask()) { - launchedSetup = true; - name = TaskType.JOB_SETUP; - } else if (tip.isJobCleanupTask()) { - launchedCleanup = true; - name = TaskType.JOB_CLEANUP; - } else if (tip.isMapTask()) { - ++runningMapTasks; - name = TaskType.MAP; - counter = JobCounter.TOTAL_LAUNCHED_MAPS; - splits = tip.getSplitNodes(); - if (tip.isSpeculating()) { - speculativeMapTasks++; - metrics.speculateMap(id); - if (LOG.isDebugEnabled()) { - LOG.debug("Chosen speculative task, current speculativeMap task count: " - + speculativeMapTasks); - } - } - metrics.launchMap(id); - } else { - ++runningReduceTasks; - name = TaskType.REDUCE; - counter = JobCounter.TOTAL_LAUNCHED_REDUCES; - if (tip.isSpeculating()) { - speculativeReduceTasks++; - metrics.speculateReduce(id); - if (LOG.isDebugEnabled()) { - LOG.debug("Chosen speculative task, current speculativeReduce task count: " - + speculativeReduceTasks); - } - } - metrics.launchReduce(id); - } - // Note that the logs are for the scheduled tasks only. Tasks that join on - // restart has already their logs in place. - if (tip.isFirstAttempt(id)) { - TaskStartedEvent tse = new TaskStartedEvent(tip.getTIPId(), - tip.getExecStartTime(), - name, splits); - - jobHistory.logEvent(tse, tip.getJob().jobId); - setFirstTaskLaunchTime(tip); - } - if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) { - jobCounters.incrCounter(counter, 1); - } - - //TODO The only problem with these counters would be on restart. - // The jobtracker updates the counter only when the task that is scheduled - // if from a non-running tip and is local (data, rack ...). But upon restart - // as the reports come from the task tracker, there is no good way to infer - // when exactly to increment the locality counters. The only solution is to - // increment the counters for all the tasks irrespective of - // - whether the tip is running or not - // - whether its a speculative task or not - // - // So to simplify, increment the data locality counter whenever there is - // data locality. - if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) { - // increment the data locality counter for maps - int level = getLocalityLevel(tip, tts); - switch (level) { - case 0 : - LOG.info("Choosing data-local task " + tip.getTIPId()); - jobCounters.incrCounter(JobCounter.DATA_LOCAL_MAPS, 1); - metrics.launchDataLocalMap(id); - break; - case 1: - LOG.info("Choosing rack-local task " + tip.getTIPId()); - jobCounters.incrCounter(JobCounter.RACK_LOCAL_MAPS, 1); - metrics.launchRackLocalMap(id); - break; - default : - // check if there is any locality - if (level != this.maxLevel) { - LOG.info("Choosing cached task at level " + level + tip.getTIPId()); - jobCounters.incrCounter(JobCounter.OTHER_LOCAL_MAPS, 1); - } - break; - } - } - } - - void setFirstTaskLaunchTime(TaskInProgress tip) { - TaskType key = getTaskType(tip); - - synchronized(firstTaskLaunchTimes) { - // Could be optimized to do only one lookup with a little more code - if (!firstTaskLaunchTimes.containsKey(key)) { - firstTaskLaunchTimes.put(key, tip.getExecStartTime()); - } - } - } - - public static String convertTrackerNameToHostName(String trackerName) { - // Ugly! - // Convert the trackerName to it's host name - int indexOfColon = trackerName.indexOf(":"); - String trackerHostName = (indexOfColon == -1) ? - trackerName : - trackerName.substring(0, indexOfColon); - return trackerHostName.substring("tracker_".length()); - } - - /** - * Note that a task has failed on a given tracker and add the tracker - * to the blacklist iff too many trackers in the cluster i.e. - * (clusterSize * CLUSTER_BLACKLIST_PERCENT) haven't turned 'flaky' already. - * - * @param taskTracker task-tracker on which a task failed - */ - synchronized void addTrackerTaskFailure(String trackerName, - TaskTracker taskTracker) { - if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) { - String trackerHostName = convertTrackerNameToHostName(trackerName); - - Integer trackerFailures = trackerToFailuresMap.get(trackerHostName); - if (trackerFailures == null) { - trackerFailures = 0; - } - trackerToFailuresMap.put(trackerHostName, ++trackerFailures); - - // Check if this tasktracker has turned 'flaky' - if (trackerFailures.intValue() == maxTaskFailuresPerTracker) { - ++flakyTaskTrackers; - - // Cancel reservations if appropriate - if (taskTracker != null) { - if (trackersReservedForMaps.containsKey(taskTracker)) { - taskTracker.unreserveSlots(TaskType.MAP, this); - } - if (trackersReservedForReduces.containsKey(taskTracker)) { - taskTracker.unreserveSlots(TaskType.REDUCE, this); - } - } - LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'"); - } - } - } - - public synchronized void reserveTaskTracker(TaskTracker taskTracker, - TaskType type, int numSlots) { - Map map = - (type == TaskType.MAP) ? trackersReservedForMaps : trackersReservedForReduces; - - long now = System.currentTimeMillis(); - - FallowSlotInfo info = map.get(taskTracker); - int reservedSlots = 0; - if (info == null) { - info = new FallowSlotInfo(now, numSlots); - reservedSlots = numSlots; - } else { - // Increment metering info if the reservation is changing - if (info.getNumSlots() != numSlots) { - Enum counter = - (type == TaskType.MAP) ? - JobCounter.FALLOW_SLOTS_MILLIS_MAPS : - JobCounter.FALLOW_SLOTS_MILLIS_REDUCES; - long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots(); - jobCounters.incrCounter(counter, fallowSlotMillis); - - // Update - reservedSlots = numSlots - info.getNumSlots(); - info.setTimestamp(now); - info.setNumSlots(numSlots); - } - } - map.put(taskTracker, info); - if (type == TaskType.MAP) { - jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots); - } - else { - jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots); - } - jobtracker.incrementReservations(type, reservedSlots); - } - - public synchronized void unreserveTaskTracker(TaskTracker taskTracker, - TaskType type) { - Map map = - (type == TaskType.MAP) ? trackersReservedForMaps : - trackersReservedForReduces; - - FallowSlotInfo info = map.get(taskTracker); - if (info == null) { - LOG.warn("Cannot find information about fallow slots for " + - taskTracker.getTrackerName()); - return; - } - - long now = System.currentTimeMillis(); - - Enum counter = - (type == TaskType.MAP) ? - JobCounter.FALLOW_SLOTS_MILLIS_MAPS : - JobCounter.FALLOW_SLOTS_MILLIS_REDUCES; - long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots(); - jobCounters.incrCounter(counter, fallowSlotMillis); - - map.remove(taskTracker); - if (type == TaskType.MAP) { - jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots()); - } - else { - jobtracker.getInstrumentation().decReservedReduceSlots( - info.getNumSlots()); - } - jobtracker.decrementReservations(type, info.getNumSlots()); - } - - public int getNumReservedTaskTrackersForMaps() { - return trackersReservedForMaps.size(); - } - - public int getNumReservedTaskTrackersForReduces() { - return trackersReservedForReduces.size(); - } - - private int getTrackerTaskFailures(String trackerName) { - String trackerHostName = convertTrackerNameToHostName(trackerName); - Integer failedTasks = trackerToFailuresMap.get(trackerHostName); - return (failedTasks != null) ? failedTasks.intValue() : 0; - } - - /** - * Get the black listed trackers for the job - * - * @return List of blacklisted tracker names - */ - List getBlackListedTrackers() { - List blackListedTrackers = new ArrayList(); - for (Map.Entry e : trackerToFailuresMap.entrySet()) { - if (e.getValue().intValue() >= maxTaskFailuresPerTracker) { - blackListedTrackers.add(e.getKey()); - } - } - return blackListedTrackers; - } - - /** - * Get the no. of 'flaky' tasktrackers for a given job. - * - * @return the no. of 'flaky' tasktrackers for a given job. - */ - int getNoOfBlackListedTrackers() { - return flakyTaskTrackers; - } - - /** - * Get the information on tasktrackers and no. of errors which occurred - * on them for a given job. - * - * @return the map of tasktrackers and no. of errors which occurred - * on them for a given job. - */ - synchronized Map getTaskTrackerErrors() { - // Clone the 'trackerToFailuresMap' and return the copy - Map trackerErrors = - new TreeMap(trackerToFailuresMap); - return trackerErrors; - } - - /** - * Remove a map TIP from the lists for running maps. - * Called when a map fails/completes (note if a map is killed, - * it won't be present in the list since it was completed earlier) - * @param tip the tip that needs to be retired - */ - private synchronized void retireMap(TaskInProgress tip) { - if (runningMapCache == null) { - LOG.warn("Running cache for maps missing!! " - + "Job details are missing."); - return; - } - - String[] splitLocations = tip.getSplitLocations(); - - // Remove the TIP from the list for running non-local maps - if (splitLocations == null || splitLocations.length == 0) { - nonLocalRunningMaps.remove(tip); - return; - } - - // Remove from the running map caches - for(String host: splitLocations) { - Node node = jobtracker.getNode(host); - - for (int j = 0; j < maxLevel; ++j) { - Set hostMaps = runningMapCache.get(node); - if (hostMaps != null) { - hostMaps.remove(tip); - if (hostMaps.size() == 0) { - runningMapCache.remove(node); - } - } - node = node.getParent(); - } - } - } - - /** - * Remove a reduce TIP from the list for running-reduces - * Called when a reduce fails/completes - * @param tip the tip that needs to be retired - */ - private synchronized void retireReduce(TaskInProgress tip) { - if (runningReduces == null) { - LOG.warn("Running list for reducers missing!! " - + "Job details are missing."); - return; - } - runningReduces.remove(tip); - } - - /** - * Adds a map tip to the list of running maps. - * @param tip the tip that needs to be scheduled as running - */ - protected synchronized void scheduleMap(TaskInProgress tip) { - - runningMapTaskStats.add(0.0f); - if (runningMapCache == null) { - LOG.warn("Running cache for maps is missing!! " - + "Job details are missing."); - return; - } - String[] splitLocations = tip.getSplitLocations(); - - // Add the TIP to the list of non-local running TIPs - if (splitLocations == null || splitLocations.length == 0) { - nonLocalRunningMaps.add(tip); - return; - } - - for(String host: splitLocations) { - Node node = jobtracker.getNode(host); - - for (int j = 0; j < maxLevel; ++j) { - Set hostMaps = runningMapCache.get(node); - if (hostMaps == null) { - // create a cache if needed - hostMaps = new LinkedHashSet(); - runningMapCache.put(node, hostMaps); - } - hostMaps.add(tip); - node = node.getParent(); - } - } - } - - /** - * Adds a reduce tip to the list of running reduces - * @param tip the tip that needs to be scheduled as running - */ - protected synchronized void scheduleReduce(TaskInProgress tip) { - runningReduceTaskStats.add(0.0f); - if (runningReduces == null) { - LOG.warn("Running cache for reducers missing!! " - + "Job details are missing."); - return; - } - runningReduces.add(tip); - } - - /** - * Adds the failed TIP in the front of the list for non-running maps - * @param tip the tip that needs to be failed - */ - private synchronized void failMap(TaskInProgress tip) { - if (nonRunningMapCache == null) { - LOG.warn("Non-running cache for maps missing!! " - + "Job details are missing."); - return; - } - - // 1. Its added everywhere since other nodes (having this split local) - // might have removed this tip from their local cache - // 2. Give high priority to failed tip - fail early - - String[] splitLocations = tip.getSplitLocations(); - - // Add the TIP in the front of the list for non-local non-running maps - if (splitLocations.length == 0) { - nonLocalMaps.add(0, tip); - return; - } - - for(String host: splitLocations) { - Node node = jobtracker.getNode(host); - - for (int j = 0; j < maxLevel; ++j) { - List hostMaps = nonRunningMapCache.get(node); - if (hostMaps == null) { - hostMaps = new LinkedList(); - nonRunningMapCache.put(node, hostMaps); - } - hostMaps.add(0, tip); - node = node.getParent(); - } - } - } - - /** - * Adds a failed TIP in the front of the list for non-running reduces - * @param tip the tip that needs to be failed - */ - private synchronized void failReduce(TaskInProgress tip) { - if (nonRunningReduces == null) { - LOG.warn("Failed cache for reducers missing!! " - + "Job details are missing."); - return; - } - nonRunningReduces.add(0, tip); - } - - /** - * Find a non-running task in the passed list of TIPs - * @param tips a collection of TIPs - * @param ttStatus the status of tracker that has requested a task to run - * @param numUniqueHosts number of unique hosts that run trask trackers - * @param removeFailedTip whether to remove the failed tips - */ - private synchronized TaskInProgress findTaskFromList( - Collection tips, TaskTrackerStatus ttStatus, - int numUniqueHosts, - boolean removeFailedTip) { - Iterator iter = tips.iterator(); - while (iter.hasNext()) { - TaskInProgress tip = iter.next(); - - // Select a tip if - // 1. runnable : still needs to be run and is not completed - // 2. ~running : no other node is running it - // 3. earlier attempt failed : has not failed on this host - // and has failed on all the other hosts - // A TIP is removed from the list if - // (1) this tip is scheduled - // (2) if the passed list is a level 0 (host) cache - // (3) when the TIP is non-schedulable (running, killed, complete) - if (tip.isRunnable() && !tip.isRunning()) { - // check if the tip has failed on this host - if (!tip.hasFailedOnMachine(ttStatus.getHost()) || - tip.getNumberOfFailedMachines() >= numUniqueHosts) { - // check if the tip has failed on all the nodes - iter.remove(); - return tip; - } else if (removeFailedTip) { - // the case where we want to remove a failed tip from the host cache - // point#3 in the TIP removal logic above - iter.remove(); - } - } else { - // see point#3 in the comment above for TIP removal logic - iter.remove(); - } - } - return null; - } - - public boolean hasSpeculativeMaps() { - return hasSpeculativeMaps; - } - - public boolean hasSpeculativeReduces() { - return hasSpeculativeReduces; - } - - /** - * Retrieve a task for speculation. - * If a task slot becomes available and there are less than SpeculativeCap - * speculative tasks running: - * 1)Ignore the request if the TT's progressRate is < SlowNodeThreshold - * 2)Choose candidate tasks - those tasks whose progress rate is below - * slowTaskThreshold * mean(progress-rates) - * 3)Speculate task that's expected to complete last - * @param list pool of tasks to choose from - * @param taskTrackerName the name of the TaskTracker asking for a task - * @param taskTrackerHost the hostname of the TaskTracker asking for a task - * @param taskType the type of task (MAP/REDUCE) that we are considering - * @return the TIP to speculatively re-execute - */ - protected synchronized TaskInProgress findSpeculativeTask( - Collection list, String taskTrackerName, - String taskTrackerHost, TaskType taskType) { - if (list.isEmpty()) { - return null; - } - long now = JobTracker.getClock().getTime(); - - // Don't return anything if either the TaskTracker is slow or we have - // already launched enough speculative tasks in the cluster. - if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list, taskType)) { - return null; - } - - TaskInProgress slowestTIP = null; - Comparator LateComparator = - new EstimatedTimeLeftComparator(now); - - Iterator iter = list.iterator(); - while (iter.hasNext()) { - TaskInProgress tip = iter.next(); - - // If this tip has already run on this machine once or it doesn't need any - // more speculative attempts, skip it. - if (tip.hasRunOnMachine(taskTrackerHost, taskTrackerName) || - !tip.canBeSpeculated(now)) { - continue; - } - - if (slowestTIP == null) { - slowestTIP = tip; - } else { - slowestTIP = - LateComparator.compare(tip, slowestTIP) < 0 ? tip : slowestTIP; - } - } - - if (slowestTIP != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Chose task " + slowestTIP.getTIPId() + ". Statistics: Task's : " + - slowestTIP.getCurrentProgressRate(now) + " Job's : " + - (slowestTIP.isMapTask() ? runningMapTaskStats : runningReduceTaskStats)); - } - } - - return slowestTIP; - } - - /** - * Find new map task - * @param tts The task tracker that is asking for a task - * @param clusterSize The number of task trackers in the cluster - * @param numUniqueHosts The number of hosts that run task trackers - * @param maxCacheLevel The maximum topology level until which to schedule - * maps. - * A value of {@link #anyCacheLevel} implies any - * available task (node-local, rack-local, off-switch and - * speculative tasks). - * A value of {@link #NON_LOCAL_CACHE_LEVEL} implies only - * off-switch/speculative tasks should be scheduled. - * @return the index in tasks of the selected task (or -1 for no task) - */ - private synchronized int findNewMapTask(final TaskTrackerStatus tts, - final int clusterSize, - final int numUniqueHosts, - final int maxCacheLevel) { - String taskTrackerName = tts.getTrackerName(); - String taskTrackerHost = tts.getHost(); - if (numMapTasks == 0) { - if(LOG.isDebugEnabled()) { - LOG.debug("No maps to schedule for " + profile.getJobID()); - } - return -1; - } - - TaskInProgress tip = null; - - // - // Update the last-known clusterSize - // - this.clusterSize = clusterSize; - - if (!shouldRunOnTaskTracker(taskTrackerName)) { - return -1; - } - - // Check to ensure this TaskTracker has enough resources to - // run tasks from this job - long outSize = resourceEstimator.getEstimatedMapOutputSize(); - long availSpace = tts.getResourceStatus().getAvailableSpace(); - if(availSpace < outSize) { - LOG.warn("No room for map task. Node " + tts.getHost() + - " has " + availSpace + - " bytes free; but we expect map to take " + outSize); - - return -1; //see if a different TIP might work better. - } - - - // For scheduling a map task, we have two caches and a list (optional) - // I) one for non-running task - // II) one for running task (this is for handling speculation) - // III) a list of TIPs that have empty locations (e.g., dummy splits), - // the list is empty if all TIPs have associated locations - - // First a look up is done on the non-running cache and on a miss, a look - // up is done on the running cache. The order for lookup within the cache: - // 1. from local node to root [bottom up] - // 2. breadth wise for all the parent nodes at max level - - // We fall to linear scan of the list (III above) if we have misses in the - // above caches - - Node node = jobtracker.getNode(tts.getHost()); - - // - // I) Non-running TIP : - // - - // 1. check from local node to the root [bottom up cache lookup] - // i.e if the cache is available and the host has been resolved - // (node!=null) - if (node != null) { - Node key = node; - int level = 0; - // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is - // called to schedule any task (local, rack-local, off-switch or speculative) - // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is - // (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative - // tasks - int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel); - for (level = 0;level < maxLevelToSchedule; ++level) { - List cacheForLevel = nonRunningMapCache.get(key); - if (cacheForLevel != null) { - tip = findTaskFromList(cacheForLevel, tts, - numUniqueHosts,level == 0); - if (tip != null) { - // Add to running cache - scheduleMap(tip); - - // remove the cache if its empty - if (cacheForLevel.size() == 0) { - nonRunningMapCache.remove(key); - } - - return tip.getIdWithinJob(); - } - } - key = key.getParent(); - } - - // Check if we need to only schedule a local task (node-local/rack-local) - if (level == maxCacheLevel) { - return -1; - } - } - - //2. Search breadth-wise across parents at max level for non-running - // TIP if - // - cache exists and there is a cache miss - // - node information for the tracker is missing (tracker's topology - // info not obtained yet) - - // collection of node at max level in the cache structure - Collection nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel(); - - // get the node parent at max level - Node nodeParentAtMaxLevel = - (node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1); - - for (Node parent : nodesAtMaxLevel) { - - // skip the parent that has already been scanned - if (parent == nodeParentAtMaxLevel) { - continue; - } - - List cache = nonRunningMapCache.get(parent); - if (cache != null) { - tip = findTaskFromList(cache, tts, numUniqueHosts, false); - if (tip != null) { - // Add to the running cache - scheduleMap(tip); - - // remove the cache if empty - if (cache.size() == 0) { - nonRunningMapCache.remove(parent); - } - LOG.info("Choosing a non-local task " + tip.getTIPId()); - return tip.getIdWithinJob(); - } - } - } - - // 3. Search non-local tips for a new task - tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false); - if (tip != null) { - // Add to the running list - scheduleMap(tip); - - LOG.info("Choosing a non-local task " + tip.getTIPId()); - return tip.getIdWithinJob(); - } - - // - // II) Running TIP : - // - - if (hasSpeculativeMaps) { - tip = getSpeculativeMap(taskTrackerName, taskTrackerHost); - if (tip != null) { - return tip.getIdWithinJob(); - } - } - return -1; - } - - private synchronized TaskInProgress getSpeculativeMap(String taskTrackerName, - String taskTrackerHost) { - - //////// Populate allTips with all TaskInProgress - Set allTips = new HashSet(); - - // collection of node at max level in the cache structure - Collection nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel(); - // Add all tasks from max-level nodes breadth-wise - for (Node parent : nodesAtMaxLevel) { - Set cache = runningMapCache.get(parent); - if (cache != null) { - allTips.addAll(cache); - } - } - // Add all non-local TIPs - allTips.addAll(nonLocalRunningMaps); - - ///////// Select a TIP to run on - TaskInProgress tip = findSpeculativeTask(allTips, taskTrackerName, - taskTrackerHost, TaskType.MAP); - - if (tip != null) { - LOG.info("Choosing map task " + tip.getTIPId() + - " for speculative execution"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("No speculative map task found for tracker " + taskTrackerName); - } - } - return tip; - } - - /** - * Find new reduce task - * @param tts The task tracker that is asking for a task - * @param clusterSize The number of task trackers in the cluster - * @param numUniqueHosts The number of hosts that run task trackers - * @return the index in tasks of the selected task (or -1 for no task) - */ - private synchronized int findNewReduceTask(TaskTrackerStatus tts, - int clusterSize, - int numUniqueHosts) { - String taskTrackerName = tts.getTrackerName(); - String taskTrackerHost = tts.getHost(); - if (numReduceTasks == 0) { - if(LOG.isDebugEnabled()) { - LOG.debug("No reduces to schedule for " + profile.getJobID()); - } - return -1; - } - TaskInProgress tip = null; - - // Update the last-known clusterSize - this.clusterSize = clusterSize; - - if (!shouldRunOnTaskTracker(taskTrackerName)) { - return -1; - } - - long outSize = resourceEstimator.getEstimatedReduceInputSize(); - long availSpace = tts.getResourceStatus().getAvailableSpace(); - if(availSpace < outSize) { - LOG.warn("No room for reduce task. Node " + taskTrackerName + " has " + - availSpace + - " bytes free; but we expect reduce input to take " + outSize); - - return -1; //see if a different TIP might work better. - } - - // 1. check for a never-executed reduce tip - // reducers don't have a cache and so pass -1 to explicitly call that out - tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false); - if (tip != null) { - scheduleReduce(tip); - return tip.getIdWithinJob(); - } - - // 2. check for a reduce tip to be speculated - if (hasSpeculativeReduces) { - tip = getSpeculativeReduce(taskTrackerName, taskTrackerHost); - if (tip != null) { - return tip.getIdWithinJob(); - } - } - - return -1; - } - - private synchronized TaskInProgress getSpeculativeReduce( - String taskTrackerName, String taskTrackerHost) { - TaskInProgress tip = findSpeculativeTask( - runningReduces, taskTrackerName, taskTrackerHost, TaskType.REDUCE); - if (tip != null) { - LOG.info("Choosing reduce task " + tip.getTIPId() + - " for speculative execution"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("No speculative map task found for tracker " - + taskTrackerHost); - } - } - return tip; - } - - /** - * Check to see if the maximum number of speculative tasks are - * already being executed currently. - * @param tasks the set of tasks to test - * @param type the type of task (MAP/REDUCE) that we are considering - * @return has the cap been reached? - */ - private boolean atSpeculativeCap(Collection tasks, - TaskType type) { - float numTasks = tasks.size(); - if (numTasks == 0){ - return true; // avoid divide by zero - } - int speculativeTaskCount = type == TaskType.MAP ? speculativeMapTasks - : speculativeReduceTasks; - //return true if totalSpecTask < max(10, 0.01 * total-slots, - // 0.1 * total-running-tasks) - - if (speculativeTaskCount < MIN_SPEC_CAP) { - return false; // at least one slow tracker's worth of slots(default=10) - } - ClusterStatus c = jobtracker.getClusterStatus(false); - int numSlots = (type == TaskType.MAP ? c.getMaxMapTasks() : c.getMaxReduceTasks()); - if ((float)speculativeTaskCount < numSlots * MIN_SLOTS_CAP) { - return false; - } - boolean atCap = (((float)(speculativeTaskCount)/numTasks) >= speculativeCap); - if (LOG.isDebugEnabled()) { - LOG.debug("SpeculativeCap is "+speculativeCap+", specTasks/numTasks is " + - ((float)(speculativeTaskCount)/numTasks)+ - ", so atSpecCap() is returning "+atCap); - } - return atCap; - } - - /** - * A class for comparing the estimated time to completion of two tasks - */ - private static class EstimatedTimeLeftComparator - implements Comparator { - private long time; - public EstimatedTimeLeftComparator(long now) { - this.time = now; - } - /** - * Estimated time to completion is measured as: - * % of task left to complete (1 - progress) / progress rate of the task. - * - * This assumes that tasks are linear in their progress, which is - * often wrong, especially since progress for reducers is currently - * calculated by evenly weighting their three stages (shuffle, sort, map) - * which rarely account for 1/3 each. This should be fixed in the future - * by calculating progressRate more intelligently or splitting these - * multi-phase tasks into individual tasks. - * - * The ordering this comparator defines is: task1 < task2 if task1 is - * estimated to finish farther in the future => compare(t1,t2) returns -1 - */ - public int compare(TaskInProgress tip1, TaskInProgress tip2) { - //we have to use the Math.max in the denominator to avoid divide by zero - //error because prog and progRate can both be zero (if one is zero, - //the other one will be 0 too). - //We use inverse of time_reminaing=[(1- prog) / progRate] - //so that (1-prog) is in denom. because tasks can have arbitrarily - //low progRates in practice (e.g. a task that is half done after 1000 - //seconds will have progRate of 0.0000005) so we would rather - //use Math.maxnon (1-prog) by putting it in the denominator - //which will cause tasks with prog=1 look 99.99% done instead of 100% - //which is okay - double t1 = tip1.getCurrentProgressRate(time) / Math.max(0.0001, - 1.0 - tip1.getProgress()); - double t2 = tip2.getCurrentProgressRate(time) / Math.max(0.0001, - 1.0 - tip2.getProgress()); - if (t1 < t2) return -1; - else if (t2 < t1) return 1; - else return 0; - } - } - - /** - * Compares the ave progressRate of tasks that have finished on this - * taskTracker to the ave of all succesfull tasks thus far to see if this - * TT one is too slow for speculating. - * slowNodeThreshold is used to determine the number of standard deviations - * @param taskTracker the name of the TaskTracker we are checking - * @return is this TaskTracker slow - */ - protected boolean isSlowTracker(String taskTracker) { - if (trackerMapStats.get(taskTracker) != null && - trackerMapStats.get(taskTracker).mean() - - mapTaskStats.mean() > mapTaskStats.std()*slowNodeThreshold) { - if (LOG.isDebugEnabled()) { - LOG.debug("Tracker " + taskTracker + - " declared slow. trackerMapStats.get(taskTracker).mean() :" + trackerMapStats.get(taskTracker).mean() + - " mapTaskStats :" + mapTaskStats); - } - return true; - } - if (trackerReduceStats.get(taskTracker) != null && - trackerReduceStats.get(taskTracker).mean() - - reduceTaskStats.mean() > reduceTaskStats.std()*slowNodeThreshold) { - if (LOG.isDebugEnabled()) { - LOG.debug("Tracker " + taskTracker + - " declared slow. trackerReduceStats.get(taskTracker).mean() :" + trackerReduceStats.get(taskTracker).mean() + - " reduceTaskStats :" + reduceTaskStats); - } - return true; - } - return false; - } - - static class DataStatistics{ - private int count = 0; - private double sum = 0; - private double sumSquares = 0; - - public DataStatistics() { - } - - public DataStatistics(double initNum) { - this.count = 1; - this.sum = initNum; - this.sumSquares = initNum * initNum; - } - - public void add(double newNum) { - this.count++; - this.sum += newNum; - this.sumSquares += newNum * newNum; - } - - public void updateStatistics(double old, double update) { - sub(old); - add(update); - } - private void sub(double oldNum) { - this.count--; - this.sum = Math.max(this.sum -= oldNum, 0.0d); - this.sumSquares = Math.max(this.sumSquares -= oldNum * oldNum, 0.0d); - } - - public double mean() { - return sum/count; - } - - public double var() { - // E(X^2) - E(X)^2 - return Math.max((sumSquares/count) - mean() * mean(), 0.0d); - } - - public double std() { - return Math.sqrt(this.var()); - } - - public String toString() { - return "DataStatistics: count is " + count + ", sum is " + sum + - ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std(); - } - - } - - private boolean shouldRunOnTaskTracker(String taskTracker) { - // - // Check if too many tasks of this job have failed on this - // tasktracker prior to assigning it a new one. - // - int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker); - if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && - taskTrackerFailedTasks >= maxTaskFailuresPerTracker) { - if (LOG.isDebugEnabled()) { - String flakyTracker = convertTrackerNameToHostName(taskTracker); - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker - + "' for assigning a new task"); - } - } - return false; - } - return true; - } - - - /** - * Metering: Occupied Slots * (Finish - Start) - * @param tip {@link TaskInProgress} to be metered which just completed, - * cannot be null - * @param status {@link TaskStatus} of the completed task, cannot be - * null - */ - private void meterTaskAttempt(TaskInProgress tip, TaskStatus status) { - JobCounter slotCounter = - (tip.isMapTask()) ? JobCounter.SLOTS_MILLIS_MAPS : - JobCounter.SLOTS_MILLIS_REDUCES; - jobCounters.incrCounter(slotCounter, - tip.getNumSlotsRequired() * - (status.getFinishTime() - status.getStartTime())); - } - - /** - * A taskid assigned to this JobInProgress has reported in successfully. - */ - public synchronized boolean completedTask(TaskInProgress tip, - TaskStatus status) - { - TaskAttemptID taskid = status.getTaskID(); - final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation(); - - // Metering - meterTaskAttempt(tip, status); - - // Sanity check: is the TIP already complete? - // This would not happen, - // because no two tasks are SUCCEEDED at the same time. - if (tip.isComplete()) { - // Mark this task as KILLED - tip.alreadyCompletedTask(taskid); - - // Let the JobTracker cleanup this taskid if the job isn't running - if (this.status.getRunState() != JobStatus.RUNNING) { - jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); - } - return false; - } - boolean wasSpeculating = tip.isSpeculating(); //store this fact - LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + - " successfully."); - // Mark the TIP as complete - tip.completed(taskid); - resourceEstimator.updateWithCompletedTask(status, tip); - - // Update jobhistory - TaskTrackerStatus ttStatus = - this.jobtracker.getTaskTrackerStatus(status.getTaskTracker()); - String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString(); - TaskType taskType = getTaskType(tip); - - TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent( - status.getTaskID(), taskType, status.getStartTime(), - status.getTaskTracker(), ttStatus.getHttpPort()); - - jobHistory.logEvent(tse, status.getTaskID().getJobID()); - TaskAttemptID statusAttemptID = status.getTaskID(); - - if (status.getIsMap()){ - MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent( - statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(), - status.getMapFinishTime(), - status.getFinishTime(), trackerHostname, - status.getStateString(), - new org.apache.hadoop.mapreduce.Counters(status.getCounters()), - tip.getSplits(statusAttemptID).burst() - ); - - jobHistory.logEvent(mfe, status.getTaskID().getJobID()); - - }else{ - ReduceAttemptFinishedEvent rfe = new ReduceAttemptFinishedEvent( - statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(), - status.getShuffleFinishTime(), - status.getSortFinishTime(), status.getFinishTime(), - trackerHostname, status.getStateString(), - new org.apache.hadoop.mapreduce.Counters(status.getCounters()), - tip.getSplits(statusAttemptID).burst() - ); - - jobHistory.logEvent(rfe, status.getTaskID().getJobID()); - - } - - TaskFinishedEvent tfe = new TaskFinishedEvent(tip.getTIPId(), - tip.getExecFinishTime(), taskType, - TaskStatus.State.SUCCEEDED.toString(), - new org.apache.hadoop.mapreduce.Counters(status.getCounters())); - - jobHistory.logEvent(tfe, tip.getJob().getJobID()); - - - if (tip.isJobSetupTask()) { - // setup task has finished. kill the extra setup tip - killSetupTip(!tip.isMapTask()); - setupComplete(); - } else if (tip.isJobCleanupTask()) { - // cleanup task has finished. Kill the extra cleanup tip - if (tip.isMapTask()) { - // kill the reduce tip - cleanup[1].kill(); - } else { - cleanup[0].kill(); - } - // - // The Job is done - // if the job is failed, then mark the job failed. - if (jobFailed) { - terminateJob(JobStatus.FAILED); - } - // if the job is killed, then mark the job killed. - if (jobKilled) { - terminateJob(JobStatus.KILLED); - } - else { - jobComplete(); - } - // The job has been killed/failed/successful - // JobTracker should cleanup this task - jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); - } else if (tip.isMapTask()) { - runningMapTasks -= 1; - finishedMapTasks += 1; - metrics.completeMap(taskid); - if (!tip.isJobSetupTask() && hasSpeculativeMaps) { - updateTaskTrackerStats(tip,ttStatus,trackerMapStats,mapTaskStats); - } - // remove the completed map from the resp running caches - retireMap(tip); - if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) { - this.status.setMapProgress(1.0f); - } - } else { - runningReduceTasks -= 1; - finishedReduceTasks += 1; - metrics.completeReduce(taskid); - if (!tip.isJobSetupTask() && hasSpeculativeReduces) { - updateTaskTrackerStats(tip,ttStatus,trackerReduceStats,reduceTaskStats); - } - // remove the completed reduces from the running reducers set - retireReduce(tip); - if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) { - this.status.setReduceProgress(1.0f); - } - } - decrementSpeculativeCount(wasSpeculating, tip); - // is job complete? - if (!jobSetupCleanupNeeded && canLaunchJobCleanupTask()) { - jobComplete(); - } - return true; - } - - private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus, - Map trackerStats, DataStatistics overallStats) { - float tipDuration = tip.getExecFinishTime()-tip.getDispatchTime(tip.getSuccessfulTaskid()); - DataStatistics ttStats = - trackerStats.get(ttStatus.getTrackerName()); - double oldMean = 0.0d; - //We maintain the mean of TaskTrackers' means. That way, we get a single - //data-point for every tracker (used in the evaluation in isSlowTracker) - if (ttStats != null) { - oldMean = ttStats.mean(); - ttStats.add(tipDuration); - overallStats.updateStatistics(oldMean, ttStats.mean()); - } else { - trackerStats.put(ttStatus.getTrackerName(), - (ttStats = new DataStatistics(tipDuration))); - overallStats.add(tipDuration); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Added mean of " +ttStats.mean() + " to trackerStats of type "+ - (tip.isMapTask() ? "Map" : "Reduce") + - " on "+ttStatus.getTrackerName()+". DataStatistics is now: " + - trackerStats.get(ttStatus.getTrackerName())); - } - } - - public void updateStatistics(double oldProg, double newProg, boolean isMap) { - if (isMap) { - runningMapTaskStats.updateStatistics(oldProg, newProg); - } else { - runningReduceTaskStats.updateStatistics(oldProg, newProg); - } - } - - public DataStatistics getRunningTaskStatistics(boolean isMap) { - if (isMap) { - return runningMapTaskStats; - } else { - return runningReduceTaskStats; - } - } - - public float getSlowTaskThreshold() { - return slowTaskThreshold; - } - - /** - * Job state change must happen thru this call - */ - private void changeStateTo(int newState) { - int oldState = this.status.getRunState(); - if (oldState == newState) { - return; //old and new states are same - } - this.status.setRunState(newState); - - //update the metrics - if (oldState == JobStatus.PREP) { - this.jobtracker.getInstrumentation().decPrepJob(conf, jobId); - } else if (oldState == JobStatus.RUNNING) { - this.jobtracker.getInstrumentation().decRunningJob(conf, jobId); - } - - if (newState == JobStatus.PREP) { - this.jobtracker.getInstrumentation().addPrepJob(conf, jobId); - } else if (newState == JobStatus.RUNNING) { - this.jobtracker.getInstrumentation().addRunningJob(conf, jobId); - } - - } - - /** - * The job is done since all it's component tasks are either - * successful or have failed. - */ - private void jobComplete() { - final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation(); - // - // All tasks are complete, then the job is done! - // - if (this.status.getRunState() == JobStatus.RUNNING || - this.status.getRunState() == JobStatus.PREP) { - changeStateTo(JobStatus.SUCCEEDED); - this.status.setCleanupProgress(1.0f); - if (maps.length == 0) { - this.status.setMapProgress(1.0f); - } - if (reduces.length == 0) { - this.status.setReduceProgress(1.0f); - } - this.finishTime = JobTracker.getClock().getTime(); - this.status.setFinishTime(this.finishTime); - LOG.info("Job " + this.status.getJobID() + - " has completed successfully."); - - // Log the job summary (this should be done prior to logging to - // job-history to ensure job-counters are in-sync - JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false)); - - // Log job-history - JobFinishedEvent jfe = - new JobFinishedEvent(this.status.getJobID(), - this.finishTime, - this.finishedMapTasks,this.finishedReduceTasks, failedMapTasks, - failedReduceTasks, - new org.apache.hadoop.mapreduce.Counters(getMapCounters()), - new org.apache.hadoop.mapreduce.Counters(getReduceCounters()), - new org.apache.hadoop.mapreduce.Counters(getCounters())); - - jobHistory.logEvent(jfe, this.status.getJobID()); - jobHistory.closeWriter(this.status.getJobID()); - - // Note that finalize will close the job history handles which garbage collect - // might try to finalize - garbageCollect(); - - metrics.completeJob(this.conf, this.status.getJobID()); - } - } - - private synchronized void terminateJob(int jobTerminationState) { - if ((status.getRunState() == JobStatus.RUNNING) || - (status.getRunState() == JobStatus.PREP)) { - - this.finishTime = JobTracker.getClock().getTime(); - this.status.setMapProgress(1.0f); - this.status.setReduceProgress(1.0f); - this.status.setCleanupProgress(1.0f); - this.status.setFinishTime(this.finishTime); - - if (jobTerminationState == JobStatus.FAILED) { - changeStateTo(JobStatus.FAILED); - } else { - changeStateTo(JobStatus.KILLED); - } - // Log the job summary - JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false)); - - JobUnsuccessfulCompletionEvent failedEvent = - new JobUnsuccessfulCompletionEvent(this.status.getJobID(), - finishTime, - this.finishedMapTasks, - this.finishedReduceTasks, - JobStatus.getJobRunState(jobTerminationState)); - - jobHistory.logEvent(failedEvent, this.status.getJobID()); - jobHistory.closeWriter(this.status.getJobID()); - - garbageCollect(); - - jobtracker.getInstrumentation().terminateJob( - this.conf, this.status.getJobID()); - if (jobTerminationState == JobStatus.FAILED) { - jobtracker.getInstrumentation().failedJob( - this.conf, this.status.getJobID()); - } else { - jobtracker.getInstrumentation().killedJob( - this.conf, this.status.getJobID()); - } - } - } - - /** - * Terminate the job and all its component tasks. - * Calling this will lead to marking the job as failed/killed. Cleanup - * tip will be launched. If the job has not inited, it will directly call - * terminateJob as there is no need to launch cleanup tip. - * This method is reentrant. - * @param jobTerminationState job termination state - */ - private synchronized void terminate(int jobTerminationState) { - if(!tasksInited.get()) { - //init could not be done, we just terminate directly. - terminateJob(jobTerminationState); - return; - } - - if ((status.getRunState() == JobStatus.RUNNING) || - (status.getRunState() == JobStatus.PREP)) { - LOG.info("Killing job '" + this.status.getJobID() + "'"); - if (jobTerminationState == JobStatus.FAILED) { - if(jobFailed) {//reentrant - return; - } - jobFailed = true; - } else if (jobTerminationState == JobStatus.KILLED) { - if(jobKilled) {//reentrant - return; - } - jobKilled = true; - } - // clear all unclean tasks - clearUncleanTasks(); - // - // kill all TIPs. - // - for (int i = 0; i < setup.length; i++) { - setup[i].kill(); - } - for (int i = 0; i < maps.length; i++) { - maps[i].kill(); - } - for (int i = 0; i < reduces.length; i++) { - reduces[i].kill(); - } - - if (!jobSetupCleanupNeeded) { - terminateJob(jobTerminationState); - } - } - } - - /** - * Cancel all reservations since the job is done - */ - private void cancelReservedSlots() { - // Make a copy of the set of TaskTrackers to prevent a - // ConcurrentModificationException ... - Set tm = - new HashSet(trackersReservedForMaps.keySet()); - for (TaskTracker tt : tm) { - tt.unreserveSlots(TaskType.MAP, this); - } - - Set tr = - new HashSet(trackersReservedForReduces.keySet()); - for (TaskTracker tt : tr) { - tt.unreserveSlots(TaskType.REDUCE, this); - } - } - - private void clearUncleanTasks() { - TaskAttemptID taskid = null; - TaskInProgress tip = null; - while (!mapCleanupTasks.isEmpty()) { - taskid = mapCleanupTasks.remove(0); - tip = maps[taskid.getTaskID().getId()]; - updateTaskStatus(tip, tip.getTaskStatus(taskid)); - } - while (!reduceCleanupTasks.isEmpty()) { - taskid = reduceCleanupTasks.remove(0); - tip = reduces[taskid.getTaskID().getId()]; - updateTaskStatus(tip, tip.getTaskStatus(taskid)); - } - } - - /** - * Kill the job and all its component tasks. This method should be called from - * jobtracker and should return fast as it locks the jobtracker. - */ - public void kill() { - boolean killNow = false; - synchronized(jobInitKillStatus) { - jobInitKillStatus.killed = true; - //if not in middle of init, terminate it now - if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) { - //avoiding nested locking by setting flag - killNow = true; - } - } - if(killNow) { - terminate(JobStatus.KILLED); - } - } - - /** - * Fails the job and all its component tasks. This should be called only from - * {@link JobInProgress} or {@link JobTracker}. Look at - * {@link JobTracker#failJob(JobInProgress)} for more details. - * Note that the job doesnt expect itself to be failed before its inited. - * Only when the init is done (successfully or otherwise), the job can be - * failed. - */ - synchronized void fail() { - terminate(JobStatus.FAILED); - } - - private void decrementSpeculativeCount(boolean wasSpeculating, - TaskInProgress tip) { - if (wasSpeculating) { - if (tip.isMapTask()) { - speculativeMapTasks--; - if (LOG.isDebugEnabled()) { - LOG.debug("Decremented count for " + - tip.getTIPId()+"/"+tip.getJob().getJobID() + - ". Current speculativeMap task count: " - + speculativeMapTasks); - } - } else { - speculativeReduceTasks--; - if (LOG.isDebugEnabled()) { - LOG.debug("Decremented count for " + - tip.getTIPId()+"/"+tip.getJob().getJobID() + - ". Current speculativeReduce task count: " - + speculativeReduceTasks); - } - } - } - } - - /** - * A task assigned to this JobInProgress has reported in as failed. - * Most of the time, we'll just reschedule execution. However, after - * many repeated failures we may instead decide to allow the entire - * job to fail or succeed if the user doesn't care about a few tasks failing. - * - * Even if a task has reported as completed in the past, it might later - * be reported as failed. That's because the TaskTracker that hosts a map - * task might die before the entire job can complete. If that happens, - * we need to schedule reexecution so that downstream reduce tasks can - * obtain the map task's output. - */ - private void failedTask(TaskInProgress tip, TaskAttemptID taskid, - TaskStatus status, - TaskTracker taskTracker, boolean wasRunning, - boolean wasComplete, boolean wasAttemptRunning) { - // check if the TIP is already failed - boolean wasFailed = tip.isFailed(); - boolean wasSpeculating = tip.isSpeculating(); - - // Mark the taskid as FAILED or KILLED - tip.incompleteSubTask(taskid, this.status); - decrementSpeculativeCount(wasSpeculating, tip); - - boolean isRunning = tip.isRunning(); - boolean isComplete = tip.isComplete(); - - if(wasAttemptRunning) { - // We are decrementing counters without looking for isRunning , - // because we increment the counters when we obtain - // new map task attempt or reduce task attempt.We do not really check - // for tip being running. - // Whenever we obtain new task attempt runningMapTasks incremented. - // hence we are decrementing the same. - if(!tip.isJobCleanupTask() && !tip.isJobSetupTask()) { - if(tip.isMapTask()) { - runningMapTasks -= 1; - } else { - runningReduceTasks -= 1; - } - } - - // Metering - meterTaskAttempt(tip, status); - } - - //update running count on task failure. - if (wasRunning && !isRunning) { - if (tip.isJobCleanupTask()) { - launchedCleanup = false; - } else if (tip.isJobSetupTask()) { - launchedSetup = false; - } else if (tip.isMapTask()) { - // remove from the running queue and put it in the non-running cache - // if the tip is not complete i.e if the tip still needs to be run - if (!isComplete) { - retireMap(tip); - failMap(tip); - } - } else { - // remove from the running queue and put in the failed queue if the tip - // is not complete - if (!isComplete) { - retireReduce(tip); - failReduce(tip); - } - } - } - - // The case when the map was complete but the task tracker went down. - // However, we don't need to do any metering here... - if (wasComplete && !isComplete) { - if (tip.isMapTask()) { - // Put the task back in the cache. This will help locality for cases - // where we have a different TaskTracker from the same rack/switch - // asking for a task. - // We bother about only those TIPs that were successful - // earlier (wasComplete and !isComplete) - // (since they might have been removed from the cache of other - // racks/switches, if the input split blocks were present there too) - failMap(tip); - finishedMapTasks -= 1; - } - } - - // update job history - // get taskStatus from tip - TaskStatus taskStatus = tip.getTaskStatus(taskid); - String taskTrackerName = taskStatus.getTaskTracker(); - String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName); - int taskTrackerPort = -1; - TaskTrackerStatus taskTrackerStatus = - (taskTracker == null) ? null : taskTracker.getStatus(); - if (taskTrackerStatus != null) { - taskTrackerPort = taskTrackerStatus.getHttpPort(); - } - long startTime = taskStatus.getStartTime(); - long finishTime = taskStatus.getFinishTime(); - List taskDiagnosticInfo = tip.getDiagnosticInfo(taskid); - String diagInfo = taskDiagnosticInfo == null ? "" : - StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0])); - TaskType taskType = getTaskType(tip); - TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent( - taskid, taskType, startTime, taskTrackerName, taskTrackerPort); - - jobHistory.logEvent(tse, taskid.getJobID()); - - ProgressSplitsBlock splits = tip.getSplits(taskStatus.getTaskID()); - - TaskAttemptUnsuccessfulCompletionEvent tue = - new TaskAttemptUnsuccessfulCompletionEvent - (taskid, - taskType, taskStatus.getRunState().toString(), - finishTime, - taskTrackerHostName, diagInfo, - splits.burst()); - jobHistory.logEvent(tue, taskid.getJobID()); - - // After this, try to assign tasks with the one after this, so that - // the failed task goes to the end of the list. - if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) { - if (tip.isMapTask()) { - failedMapTasks++; - } else { - failedReduceTasks++; - } - } - - // - // Note down that a task has failed on this tasktracker - // - if (status.getRunState() == TaskStatus.State.FAILED) { - addTrackerTaskFailure(taskTrackerName, taskTracker); - } - - // - // Let the JobTracker know that this task has failed - // - jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); - - // - // Check if we need to kill the job because of too many failures or - // if the job is complete since all component tasks have completed - - // We do it once per TIP and that too for the task that fails the TIP - if (!wasFailed && tip.isFailed()) { - // - // Allow upto 'mapFailuresPercent' of map tasks to fail or - // 'reduceFailuresPercent' of reduce tasks to fail - // - boolean killJob = tip.isJobCleanupTask() || tip.isJobSetupTask() ? true : - tip.isMapTask() ? - ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) : - ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks)); - - if (killJob) { - LOG.info("Aborting job " + profile.getJobID()); - TaskFailedEvent tfe = - new TaskFailedEvent(tip.getTIPId(), finishTime, taskType, diagInfo, - TaskStatus.State.FAILED.toString(), - null); - - jobHistory.logEvent(tfe, tip.getJob().getJobID()); - - if (tip.isJobCleanupTask()) { - // kill the other tip - if (tip.isMapTask()) { - cleanup[1].kill(); - } else { - cleanup[0].kill(); - } - terminateJob(JobStatus.FAILED); - } else { - if (tip.isJobSetupTask()) { - // kill the other tip - killSetupTip(!tip.isMapTask()); - } - fail(); - } - } - - // - // Update the counters - // - if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) { - if (tip.isMapTask()) { - jobCounters.incrCounter(JobCounter.NUM_FAILED_MAPS, 1); - } else { - jobCounters.incrCounter(JobCounter.NUM_FAILED_REDUCES, 1); - } - } - } - } - - void killSetupTip(boolean isMap) { - if (isMap) { - setup[0].kill(); - } else { - setup[1].kill(); - } - } - - boolean isSetupFinished() { - // if there is no setup to be launched, consider setup is finished. - if ((tasksInited.get() && setup.length == 0) || - setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete() - || setup[1].isFailed()) { - return true; - } - return false; - } - - /** - * Fail a task with a given reason, but without a status object. - * - * Assuming {@link JobTracker} is locked on entry. - * - * @param tip The task's tip - * @param taskid The task id - * @param reason The reason that the task failed - * @param trackerName The task tracker the task failed on - */ - public synchronized void failedTask(TaskInProgress tip, TaskAttemptID taskid, - String reason, TaskStatus.Phase phase, TaskStatus.State state, - String trackerName) { - TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), - taskid, - 0.0f, - tip.isMapTask() ? - numSlotsPerMap : - numSlotsPerReduce, - state, - reason, - reason, - trackerName, phase, - new Counters()); - // update the actual start-time of the attempt - TaskStatus oldStatus = tip.getTaskStatus(taskid); - long startTime = oldStatus == null - ? JobTracker.getClock().getTime() - : oldStatus.getStartTime(); - status.setStartTime(startTime); - status.setFinishTime(JobTracker.getClock().getTime()); - boolean wasComplete = tip.isComplete(); - updateTaskStatus(tip, status); - boolean isComplete = tip.isComplete(); - if (wasComplete && !isComplete) { // mark a successful tip as failed - TaskType taskType = getTaskType(tip); - TaskFailedEvent tfe = - new TaskFailedEvent(tip.getTIPId(), tip.getExecFinishTime(), taskType, - reason, TaskStatus.State.FAILED.toString(), - taskid); - - jobHistory.logEvent(tfe, tip.getJob().getJobID()); - - } - } - - - /** - * The job is dead. We're now GC'ing it, getting rid of the job - * from all tables. Be sure to remove all of this job's tasks - * from the various tables. - */ - void garbageCollect() { - synchronized(this) { - // Cancel task tracker reservation - cancelReservedSlots(); - - - // Let the JobTracker know that a job is complete - jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps()); - jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces()); - jobtracker.storeCompletedJob(this); - jobtracker.finalizeJob(this); - - try { - // Definitely remove the local-disk copy of the job file - if (localJobFile != null) { - localFs.delete(localJobFile, true); - localJobFile = null; - } - - Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID()); - new CleanupQueue().addToQueue(new PathDeletionContext( - jobtracker.getFileSystem(), tempDir.toUri().getPath())); - } catch (IOException e) { - LOG.warn("Error cleaning up "+profile.getJobID()+": "+e); - } - - // free up the memory used by the data structures - this.nonRunningMapCache = null; - this.runningMapCache = null; - this.nonRunningReduces = null; - this.runningReduces = null; - - } - // remove jobs delegation tokens - if(conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true)) { - DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jobId); - } // else don't remove it.May be used by spawned tasks - } - - /** - * Return the TaskInProgress that matches the tipid. - */ - public synchronized TaskInProgress getTaskInProgress(TaskID tipid) { - if (tipid.getTaskType() == TaskType.MAP) { - // cleanup map tip - if (cleanup.length > 0 && tipid.equals(cleanup[0].getTIPId())) { - return cleanup[0]; - } - // setup map tip - if (setup.length > 0 && tipid.equals(setup[0].getTIPId())) { - return setup[0]; - } - for (int i = 0; i < maps.length; i++) { - if (tipid.equals(maps[i].getTIPId())){ - return maps[i]; - } - } - } else { - // cleanup reduce tip - if (cleanup.length > 0 && tipid.equals(cleanup[1].getTIPId())) { - return cleanup[1]; - } - // setup reduce tip - if (setup.length > 0 && tipid.equals(setup[1].getTIPId())) { - return setup[1]; - } - for (int i = 0; i < reduces.length; i++) { - if (tipid.equals(reduces[i].getTIPId())){ - return reduces[i]; - } - } - } - return null; - } - - /** - * Find the details of someplace where a map has finished - * @param mapId the id of the map - * @return the task status of the completed task - */ - public synchronized TaskStatus findFinishedMap(int mapId) { - TaskInProgress tip = maps[mapId]; - if (tip.isComplete()) { - TaskStatus[] statuses = tip.getTaskStatuses(); - for(int i=0; i < statuses.length; i++) { - if (statuses[i].getRunState() == TaskStatus.State.SUCCEEDED) { - return statuses[i]; - } - } - } - return null; - } - - synchronized int getNumTaskCompletionEvents() { - return taskCompletionEvents.size(); - } - - synchronized public TaskCompletionEvent[] getTaskCompletionEvents( - int fromEventId, int maxEvents) { - TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY; - if (taskCompletionEvents.size() > fromEventId) { - int actualMax = Math.min(maxEvents, - (taskCompletionEvents.size() - fromEventId)); - events = taskCompletionEvents.subList(fromEventId, actualMax + fromEventId).toArray(events); - } - return events; - } - - synchronized void fetchFailureNotification(TaskInProgress tip, - TaskAttemptID mapTaskId, - String mapTrackerName, - TaskAttemptID reduceTaskId, - String reduceTrackerName) { - Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId); - fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1); - mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures); - LOG.info("Failed fetch notification #" + fetchFailures + " for map task: " - + mapTaskId + " running on tracker: " + mapTrackerName - + " and reduce task: " + reduceTaskId + " running on tracker: " - + reduceTrackerName); - - float failureRate = (float)fetchFailures / runningReduceTasks; - // declare faulty if fetch-failures >= max-allowed-failures - boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT) - ? true - : false; - if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS - && isMapFaulty) { - LOG.info("Too many fetch-failures for output of task: " + mapTaskId - + " ... killing it"); - - failedTask(tip, mapTaskId, "Too many fetch-failures", - (tip.isMapTask() ? TaskStatus.Phase.MAP : - TaskStatus.Phase.REDUCE), - TaskStatus.State.FAILED, mapTrackerName); - - mapTaskIdToFetchFailuresMap.remove(mapTaskId); - } - } - - /** - * @return The JobID of this JobInProgress. - */ - public JobID getJobID() { - return jobId; - } - - public synchronized Object getSchedulingInfo() { - return this.schedulingInfo; - } - - public synchronized void setSchedulingInfo(Object schedulingInfo) { - this.schedulingInfo = schedulingInfo; - this.status.setSchedulingInfo(schedulingInfo.toString()); - } - - /** - * To keep track of kill and initTasks status of this job. initTasks() take - * a lock on JobInProgress object. kill should avoid waiting on - * JobInProgress lock since it may take a while to do initTasks(). - */ - private static class JobInitKillStatus { - //flag to be set if kill is called - boolean killed; - - boolean initStarted; - boolean initDone; - } - - boolean isComplete() { - return status.isJobComplete(); - } - - /** - * Get the task type for logging it to {@link JobHistory}. - */ - private TaskType getTaskType(TaskInProgress tip) { - if (tip.isJobCleanupTask()) { - return TaskType.JOB_CLEANUP; - } else if (tip.isJobSetupTask()) { - return TaskType.JOB_SETUP; - } else if (tip.isMapTask()) { - return TaskType.MAP; - } else { - return TaskType.REDUCE; - } - } - - /** - * Get the level of locality that a given task would have if launched on - * a particular TaskTracker. Returns 0 if the task has data on that machine, - * 1 if it has data on the same rack, etc (depending on number of levels in - * the network hierarchy). - */ - int getLocalityLevel(TaskInProgress tip, TaskTrackerStatus tts) { - Node tracker = jobtracker.getNode(tts.getHost()); - int level = this.maxLevel; - // find the right level across split locations - for (String local : maps[tip.getIdWithinJob()].getSplitLocations()) { - Node datanode = jobtracker.getNode(local); - int newLevel = this.maxLevel; - if (tracker != null && datanode != null) { - newLevel = getMatchingLevelForNodes(tracker, datanode); - } - if (newLevel < level) { - level = newLevel; - // an optimization - if (level == 0) { - break; - } - } - } - return level; - } - - /** - * Test method to set the cluster sizes - */ - void setClusterSize(int clusterSize) { - this.clusterSize = clusterSize; - } - - static class JobSummary { - static final Log LOG = LogFactory.getLog(JobSummary.class); - - // Escape sequences - static final char EQUALS = '='; - static final char[] charsToEscape = - {StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR}; - - static class SummaryBuilder { - final StringBuilder buffer = new StringBuilder(); - - // A little optimization for a very common case - SummaryBuilder add(String key, long value) { - return _add(key, Long.toString(value)); - } - - SummaryBuilder add(String key, T value) { - return _add(key, StringUtils.escapeString(String.valueOf(value), - StringUtils.ESCAPE_CHAR, charsToEscape)); - } - - SummaryBuilder add(SummaryBuilder summary) { - if (buffer.length() > 0) buffer.append(StringUtils.COMMA); - buffer.append(summary.buffer); - return this; - } - - SummaryBuilder _add(String key, String value) { - if (buffer.length() > 0) buffer.append(StringUtils.COMMA); - buffer.append(key).append(EQUALS).append(value); - return this; - } - - @Override public String toString() { - return buffer.toString(); - } - } - - static SummaryBuilder getTaskLaunchTimesSummary(JobInProgress job) { - SummaryBuilder summary = new SummaryBuilder(); - Map timeMap = job.getFirstTaskLaunchTimes(); - - synchronized(timeMap) { - for (Map.Entry e : timeMap.entrySet()) { - summary.add("first"+ StringUtils.camelize(e.getKey().name()) + - "TaskLaunchTime", e.getValue().longValue()); - } - } - return summary; - } - - /** - * Log a summary of the job's runtime. - * - * @param job {@link JobInProgress} whose summary is to be logged, cannot - * be null. - * @param cluster {@link ClusterStatus} of the cluster on which the job was - * run, cannot be null - */ - public static void logJobSummary(JobInProgress job, ClusterStatus cluster) { - JobStatus status = job.getStatus(); - JobProfile profile = job.getProfile(); - Counters jobCounters = job.getJobCounters(); - long mapSlotSeconds = - (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_MAPS) + - jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_MAPS)) / 1000; - long reduceSlotSeconds = - (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_REDUCES) + - jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000; - - SummaryBuilder summary = new SummaryBuilder() - .add("jobId", job.getJobID()) - .add("submitTime", job.getStartTime()) - .add("launchTime", job.getLaunchTime()) - .add(getTaskLaunchTimesSummary(job)) - .add("finishTime", job.getFinishTime()) - .add("numMaps", job.getTasks(TaskType.MAP).length) - .add("numSlotsPerMap", job.getNumSlotsPerMap()) - .add("numReduces", job.getTasks(TaskType.REDUCE).length) - .add("numSlotsPerReduce", job.getNumSlotsPerReduce()) - .add("user", profile.getUser()) - .add("queue", profile.getQueueName()) - .add("status", JobStatus.getJobRunState(status.getRunState())) - .add("mapSlotSeconds", mapSlotSeconds) - .add("reduceSlotsSeconds", reduceSlotSeconds) - .add("clusterMapCapacity", cluster.getMaxMapTasks()) - .add("clusterReduceCapacity", cluster.getMaxReduceTasks()); - - LOG.info(summary); - } - } - - /** - * Creates the localized copy of job conf - * @param jobConf - * @param id - */ - void setUpLocalizedJobConf(JobConf jobConf, - org.apache.hadoop.mapreduce.JobID id) { - String localJobFilePath = jobtracker.getLocalJobFilePath(id); - File localJobFile = new File(localJobFilePath); - FileOutputStream jobOut = null; - try { - jobOut = new FileOutputStream(localJobFile); - jobConf.writeXml(jobOut); - if (LOG.isDebugEnabled()) { - LOG.debug("Job conf for " + id + " stored at " - + localJobFile.getAbsolutePath()); - } - } catch (IOException ioe) { - LOG.error("Failed to store job conf on the local filesystem ", ioe); - } finally { - if (jobOut != null) { - try { - jobOut.close(); - } catch (IOException ie) { - LOG.info("Failed to close the job configuration file " - + StringUtils.stringifyException(ie)); - } - } - } - } - - /** - * Deletes localized copy of job conf - */ - void cleanupLocalizedJobConf(org.apache.hadoop.mapreduce.JobID id) { - String localJobFilePath = jobtracker.getLocalJobFilePath(id); - File f = new File (localJobFilePath); - LOG.info("Deleting localized job conf at " + f); - if (!f.delete()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to delete file " + f); - } - } - } - - /** - * generate job token and save it into the file - * @throws IOException - */ - private void generateAndStoreTokens() throws IOException{ - Path jobDir = jobtracker.getSystemDirectoryForJob(jobId); - Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE); - - if (tokenStorage == null) { - tokenStorage = new Credentials(); - } - - //create JobToken file and write token to it - JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId - .toString())); - Token token = new Token(identifier, - jobtracker.getJobTokenSecretManager()); - token.setService(identifier.getJobId()); - - TokenCache.setJobToken(token, tokenStorage); - - // write TokenStorage out - tokenStorage.writeTokenStorageFile(keysFile, jobtracker.getConf()); - LOG.info("jobToken generated and stored with users keys in " - + keysFile.toUri().getPath()); - } - - public String getJobSubmitHostAddress() { - return submitHostAddress; - } - - public String getJobSubmitHostName() { - return submitHostName; - } -}