MAPREDUCE-5196. Add bookkeeping for managing checkpoints of task state.

Contributed by Carlo Curino


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1553939 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Christopher Douglas 2013-12-28 21:58:33 +00:00
parent cc4c74be09
commit 47cca0cb6d
20 changed files with 1098 additions and 118 deletions

View File

@ -77,6 +77,9 @@ Trunk (Unreleased)
MAPREDUCE-5189. Add policies and wiring to respond to preemption requests
from YARN. (Carlo Curino via cdouglas)
MAPREDUCE-5196. Add bookkeeping for managing checkpoints of task state.
(Carlo Curino via cdouglas)
BUG FIXES
MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.

View File

@ -36,7 +36,9 @@
import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@ -45,8 +47,8 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
@ -228,6 +230,22 @@ public void commitPending(TaskAttemptID taskAttemptID, TaskStatus taskStatsu)
TaskAttemptEventType.TA_COMMIT_PENDING));
}
@Override
public void preempted(TaskAttemptID taskAttemptID, TaskStatus taskStatus)
throws IOException, InterruptedException {
LOG.info("Preempted state update from " + taskAttemptID.toString());
// An attempt is telling us that it got preempted.
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
preemptionPolicy.reportSuccessfulPreemption(attemptID);
taskHeartbeatHandler.progressing(attemptID);
context.getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_PREEMPTED));
}
@Override
public void done(TaskAttemptID taskAttemptID) throws IOException {
LOG.info("Done acknowledgement from " + taskAttemptID.toString());
@ -250,6 +268,10 @@ public void fatalError(TaskAttemptID taskAttemptID, String msg)
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
// handling checkpoints
preemptionPolicy.handleFailedContainer(attemptID);
context.getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
}
@ -264,6 +286,10 @@ public void fsError(TaskAttemptID taskAttemptID, String message)
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
// handling checkpoints
preemptionPolicy.handleFailedContainer(attemptID);
context.getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
}
@ -293,12 +319,6 @@ public MapTaskCompletionEventsUpdate getMapCompletionEvents(
return new MapTaskCompletionEventsUpdate(events, shouldReset);
}
@Override
public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
LOG.info("Ping from " + taskAttemptID.toString());
return true;
}
@Override
public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
throws IOException {
@ -321,11 +341,33 @@ public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticI
}
@Override
public boolean statusUpdate(TaskAttemptID taskAttemptID,
public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
TaskStatus taskStatus) throws IOException, InterruptedException {
LOG.info("Status update from " + taskAttemptID.toString());
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
TypeConverter.toYarn(taskAttemptID);
AMFeedback feedback = new AMFeedback();
feedback.setTaskFound(true);
// Propagating preemption to the task if TASK_PREEMPTION is enabled
if (getConfig().getBoolean(MRJobConfig.TASK_PREEMPTION, false)
&& preemptionPolicy.isPreempted(yarnAttemptID)) {
feedback.setPreemption(true);
LOG.info("Setting preemption bit for task: "+ yarnAttemptID
+ " of type " + yarnAttemptID.getTaskId().getTaskType());
}
if (taskStatus == null) {
//We are using statusUpdate only as a simple ping
LOG.info("Ping from " + taskAttemptID.toString());
taskHeartbeatHandler.progressing(yarnAttemptID);
return feedback;
}
// if we are here there is an actual status update to be processed
LOG.info("Status update from " + taskAttemptID.toString());
taskHeartbeatHandler.progressing(yarnAttemptID);
TaskAttemptStatus taskAttemptStatus =
new TaskAttemptStatus();
@ -386,7 +428,7 @@ public boolean statusUpdate(TaskAttemptID taskAttemptID,
context.getEventHandler().handle(
new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
taskAttemptStatus));
return true;
return feedback;
}
@Override
@ -494,4 +536,18 @@ public ProtocolSignature getProtocolSignature(String protocol,
return ProtocolSignature.getProtocolSignature(this,
protocol, clientVersion, clientMethodsHash);
}
// task checkpoint bookeeping
@Override
public TaskCheckpointID getCheckpointID(TaskID taskId) {
TaskId tid = TypeConverter.toYarn(taskId);
return preemptionPolicy.getCheckpointID(tid);
}
@Override
public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
TaskId tid = TypeConverter.toYarn(taskId);
preemptionPolicy.setCheckpointID(tid, cid);
}
}

View File

@ -47,6 +47,7 @@ public enum TaskAttemptEventType {
TA_FAILMSG,
TA_UPDATE,
TA_TIMED_OUT,
TA_PREEMPTED,
//Producer:TaskCleaner
TA_CLEANUP_DONE,

View File

@ -304,6 +304,9 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION)
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_PREEMPTED, new PreemptedTransition())
// Transitions from COMMIT_PENDING state
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
@ -437,6 +440,7 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_PREEMPTED,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@ -1874,6 +1878,27 @@ public void transition(TaskAttemptImpl taskAttempt,
}
}
private static class PreemptedTransition implements
SingleArcTransition<TaskAttemptImpl,TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
taskAttempt.setFinishTime();
taskAttempt.taskAttemptListener.unregister(
taskAttempt.attemptId, taskAttempt.jvmID);
taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
taskAttempt.attemptId,
taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(),
taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
}
}
private static class CleanupContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")

View File

@ -347,7 +347,7 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) {
}
} else if (
event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
LOG.info("Processing the event " + event.toString());

View File

@ -19,10 +19,9 @@
import java.util.List;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.yarn.api.records.Container;
@ -81,7 +80,7 @@ public abstract class Context {
* successfully preempted (for bookeeping, counters, etc..)
* @param attemptID Task attempt that preempted
*/
public void reportSuccessfulPreemption(TaskAttemptID attemptID);
public void reportSuccessfulPreemption(TaskAttemptId attemptID);
/**
* Callback informing the policy of containers exiting with a failure. This
@ -98,20 +97,20 @@ public abstract class Context {
public void handleCompletedContainer(TaskAttemptId attemptID);
/**
* Method to retrieve the latest checkpoint for a given {@link TaskID}
* Method to retrieve the latest checkpoint for a given {@link TaskId}
* @param taskId TaskID
* @return CheckpointID associated with this task or null
*/
public TaskCheckpointID getCheckpointID(TaskID taskId);
public TaskCheckpointID getCheckpointID(TaskId taskId);
/**
* Method to store the latest {@link
* org.apache.hadoop.mapreduce.checkpoint.CheckpointID} for a given {@link
* TaskID}. Assigning a null is akin to remove all previous checkpoints for
* TaskId}. Assigning a null is akin to remove all previous checkpoints for
* this task.
* @param taskId TaskID
* @param cid Checkpoint to assign or <tt>null</tt> to remove it.
*/
public void setCheckpointID(TaskID taskId, TaskCheckpointID cid);
public void setCheckpointID(TaskId taskId, TaskCheckpointID cid);
}

View File

@ -0,0 +1,290 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.rm.preemption;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.event.EventHandler;
/**
* This policy works in combination with an implementation of task
* checkpointing. It computes the tasks to be preempted in response to the RM
* request for preemption. For strict requests, it maps containers to
* corresponding tasks; for fungible requests, it attempts to pick the best
* containers to preempt (reducers in reverse allocation order). The
* TaskAttemptListener will interrogate this policy when handling a task
* heartbeat to check whether the task should be preempted or not. When handling
* fungible requests, the policy discount the RM ask by the amount of currently
* in-flight preemptions (i.e., tasks that are checkpointing).
*
* This class it is also used to maintain the list of checkpoints for existing
* tasks. Centralizing this functionality here, allows us to have visibility on
* preemption and checkpoints in a single location, thus coordinating preemption
* and checkpoint management decisions in a single policy.
*/
public class CheckpointAMPreemptionPolicy implements AMPreemptionPolicy {
// task attempts flagged for preemption
private final Set<TaskAttemptId> toBePreempted;
private final Set<TaskAttemptId> countedPreemptions;
private final Map<TaskId,TaskCheckpointID> checkpoints;
private final Map<TaskAttemptId,Resource> pendingFlexiblePreemptions;
@SuppressWarnings("rawtypes")
private EventHandler eventHandler;
static final Log LOG = LogFactory
.getLog(CheckpointAMPreemptionPolicy.class);
public CheckpointAMPreemptionPolicy() {
this(Collections.synchronizedSet(new HashSet<TaskAttemptId>()),
Collections.synchronizedSet(new HashSet<TaskAttemptId>()),
Collections.synchronizedMap(new HashMap<TaskId,TaskCheckpointID>()),
Collections.synchronizedMap(new HashMap<TaskAttemptId,Resource>()));
}
CheckpointAMPreemptionPolicy(Set<TaskAttemptId> toBePreempted,
Set<TaskAttemptId> countedPreemptions,
Map<TaskId,TaskCheckpointID> checkpoints,
Map<TaskAttemptId,Resource> pendingFlexiblePreemptions) {
this.toBePreempted = toBePreempted;
this.countedPreemptions = countedPreemptions;
this.checkpoints = checkpoints;
this.pendingFlexiblePreemptions = pendingFlexiblePreemptions;
}
@Override
public void init(AppContext context) {
this.eventHandler = context.getEventHandler();
}
@Override
public void preempt(Context ctxt, PreemptionMessage preemptionRequests) {
if (preemptionRequests != null) {
// handling non-negotiable preemption
StrictPreemptionContract cStrict = preemptionRequests.getStrictContract();
if (cStrict != null
&& cStrict.getContainers() != null
&& cStrict.getContainers().size() > 0) {
LOG.info("strict preemption :" +
preemptionRequests.getStrictContract().getContainers().size() +
" containers to kill");
// handle strict preemptions. These containers are non-negotiable
for (PreemptionContainer c :
preemptionRequests.getStrictContract().getContainers()) {
ContainerId reqCont = c.getId();
TaskAttemptId reqTask = ctxt.getTaskAttempt(reqCont);
if (reqTask != null) {
// ignore requests for preempting containers running maps
if (org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE
.equals(reqTask.getTaskId().getTaskType())) {
toBePreempted.add(reqTask);
LOG.info("preempting " + reqCont + " running task:" + reqTask);
} else {
LOG.info("NOT preempting " + reqCont + " running task:" + reqTask);
}
}
}
}
// handling negotiable preemption
PreemptionContract cNegot = preemptionRequests.getContract();
if (cNegot != null
&& cNegot.getResourceRequest() != null
&& cNegot.getResourceRequest().size() > 0
&& cNegot.getContainers() != null
&& cNegot.getContainers().size() > 0) {
LOG.info("negotiable preemption :" +
preemptionRequests.getContract().getResourceRequest().size() +
" resourceReq, " +
preemptionRequests.getContract().getContainers().size() +
" containers");
// handle fungible preemption. Here we only look at the total amount of
// resources to be preempted and pick enough of our containers to
// satisfy that. We only support checkpointing for reducers for now.
List<PreemptionResourceRequest> reqResources =
preemptionRequests.getContract().getResourceRequest();
// compute the total amount of pending preemptions (to be discounted
// from current request)
int pendingPreemptionRam = 0;
int pendingPreemptionCores = 0;
for (Resource r : pendingFlexiblePreemptions.values()) {
pendingPreemptionRam += r.getMemory();
pendingPreemptionCores += r.getVirtualCores();
}
// discount preemption request based on currently pending preemption
for (PreemptionResourceRequest rr : reqResources) {
ResourceRequest reqRsrc = rr.getResourceRequest();
if (!ResourceRequest.ANY.equals(reqRsrc.getResourceName())) {
// For now, only respond to aggregate requests and ignore locality
continue;
}
LOG.info("ResourceRequest:" + reqRsrc);
int reqCont = reqRsrc.getNumContainers();
int reqMem = reqRsrc.getCapability().getMemory();
int totalMemoryToRelease = reqCont * reqMem;
int reqCores = reqRsrc.getCapability().getVirtualCores();
int totalCoresToRelease = reqCont * reqCores;
// remove
if (pendingPreemptionRam > 0) {
// if goes negative we simply exit
totalMemoryToRelease -= pendingPreemptionRam;
// decrement pending resources if zero or negatve we will
// ignore it while processing next PreemptionResourceRequest
pendingPreemptionRam -= totalMemoryToRelease;
}
if (pendingPreemptionCores > 0) {
totalCoresToRelease -= pendingPreemptionCores;
pendingPreemptionCores -= totalCoresToRelease;
}
// reverse order of allocation (for now)
List<Container> listOfCont = ctxt.getContainers(TaskType.REDUCE);
Collections.sort(listOfCont, new Comparator<Container>() {
@Override
public int compare(final Container o1, final Container o2) {
return o2.getId().getId() - o1.getId().getId();
}
});
// preempt reducers first
for (Container cont : listOfCont) {
if (totalMemoryToRelease <= 0 && totalCoresToRelease<=0) {
break;
}
TaskAttemptId reduceId = ctxt.getTaskAttempt(cont.getId());
int cMem = cont.getResource().getMemory();
int cCores = cont.getResource().getVirtualCores();
if (!toBePreempted.contains(reduceId)) {
totalMemoryToRelease -= cMem;
totalCoresToRelease -= cCores;
toBePreempted.add(reduceId);
pendingFlexiblePreemptions.put(reduceId, cont.getResource());
}
LOG.info("ResourceRequest:" + reqRsrc + " satisfied preempting "
+ reduceId);
}
// if map was preemptable we would do add them to toBePreempted here
}
}
}
}
@Override
public void handleFailedContainer(TaskAttemptId attemptID) {
toBePreempted.remove(attemptID);
checkpoints.remove(attemptID.getTaskId());
}
@Override
public void handleCompletedContainer(TaskAttemptId attemptID){
LOG.info(" task completed:" + attemptID);
toBePreempted.remove(attemptID);
pendingFlexiblePreemptions.remove(attemptID);
}
@Override
public boolean isPreempted(TaskAttemptId yarnAttemptID) {
if (toBePreempted.contains(yarnAttemptID)) {
updatePreemptionCounters(yarnAttemptID);
return true;
}
return false;
}
@Override
public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
// ignore
}
@Override
public TaskCheckpointID getCheckpointID(TaskId taskId) {
return checkpoints.get(taskId);
}
@Override
public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
checkpoints.put(taskId, cid);
if (cid != null) {
updateCheckpointCounters(taskId, cid);
}
}
@SuppressWarnings({ "unchecked" })
private void updateCheckpointCounters(TaskId taskId, TaskCheckpointID cid) {
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
jce.addCounterUpdate(JobCounter.CHECKPOINTS, 1);
eventHandler.handle(jce);
jce = new JobCounterUpdateEvent(taskId.getJobId());
jce.addCounterUpdate(JobCounter.CHECKPOINT_BYTES, cid.getCheckpointBytes());
eventHandler.handle(jce);
jce = new JobCounterUpdateEvent(taskId.getJobId());
jce.addCounterUpdate(JobCounter.CHECKPOINT_TIME, cid.getCheckpointTime());
eventHandler.handle(jce);
}
@SuppressWarnings({ "unchecked" })
private void updatePreemptionCounters(TaskAttemptId yarnAttemptID) {
if (!countedPreemptions.contains(yarnAttemptID)) {
countedPreemptions.add(yarnAttemptID);
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(yarnAttemptID
.getTaskId().getJobId());
jce.addCounterUpdate(JobCounter.TASKS_REQ_PREEMPT, 1);
eventHandler.handle(jce);
}
}
}

View File

@ -19,11 +19,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@ -89,17 +88,17 @@ public boolean isPreempted(TaskAttemptId yarnAttemptID) {
}
@Override
public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) {
public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
// ignore
}
@Override
public TaskCheckpointID getCheckpointID(TaskID taskId) {
public TaskCheckpointID getCheckpointID(TaskId taskId) {
return null;
}
@Override
public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
// ignore
}

View File

@ -17,10 +17,9 @@
*/
package org.apache.hadoop.mapreduce.v2.app.rm.preemption;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@ -50,17 +49,17 @@ public boolean isPreempted(TaskAttemptId yarnAttemptID) {
}
@Override
public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) {
public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
// ignore
}
@Override
public TaskCheckpointID getCheckpointID(TaskID taskId) {
public TaskCheckpointID getCheckpointID(TaskId taskId) {
return null;
}
@Override
public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
// ignore
}

View File

@ -17,26 +17,23 @@
*/
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import junit.framework.Assert;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@ -46,21 +43,31 @@
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
public class TestTaskAttemptListenerImpl {
public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl {
public static class MockTaskAttemptListenerImpl
extends TaskAttemptListenerImpl {
public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler,
TaskHeartbeatHandler hbHandler) {
super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
TaskHeartbeatHandler hbHandler,
AMPreemptionPolicy policy) {
super(context, jobTokenSecretManager, rmHeartbeatHandler, policy);
this.taskHeartbeatHandler = hbHandler;
}
@ -87,9 +94,16 @@ public void testGetTask() throws IOException {
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
Dispatcher dispatcher = mock(Dispatcher.class);
EventHandler ea = mock(EventHandler.class);
when(dispatcher.getEventHandler()).thenReturn(ea);
when(appCtx.getEventHandler()).thenReturn(ea);
CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
policy.init(appCtx);
MockTaskAttemptListenerImpl listener =
new MockTaskAttemptListenerImpl(appCtx, secret,
rmHeartbeatHandler, hbHandler);
rmHeartbeatHandler, hbHandler, policy);
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
@ -144,7 +158,7 @@ public void testGetTask() throws IOException {
assertNotNull(jvmid);
try {
JVMId.forName("jvm_001_002_m_004_006");
Assert.fail();
fail();
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(),
"TaskId string : jvm_001_002_m_004_006 is not properly formed");
@ -190,8 +204,14 @@ public void testGetMapCompletionEvents() throws IOException {
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptListenerImpl listener =
new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
Dispatcher dispatcher = mock(Dispatcher.class);
EventHandler ea = mock(EventHandler.class);
when(dispatcher.getEventHandler()).thenReturn(ea);
when(appCtx.getEventHandler()).thenReturn(ea);
CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
policy.init(appCtx);
TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
@ -219,7 +239,8 @@ private static TaskAttemptCompletionEvent createTce(int eventId,
isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
: org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(eventId);
@ -244,8 +265,14 @@ public void testCommitWindow() throws IOException {
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
TaskAttemptListenerImpl listener =
new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
Dispatcher dispatcher = mock(Dispatcher.class);
EventHandler ea = mock(EventHandler.class);
when(dispatcher.getEventHandler()).thenReturn(ea);
when(appCtx.getEventHandler()).thenReturn(ea);
CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
policy.init(appCtx);
TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
@ -270,4 +297,88 @@ protected void registerHeartbeatHandler(Configuration conf) {
listener.stop();
}
@Test
public void testCheckpointIDTracking()
throws IOException, InterruptedException{
SystemClock clock = new SystemClock();
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
Job mockJob = mock(Job.class);
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
Dispatcher dispatcher = mock(Dispatcher.class);
EventHandler ea = mock(EventHandler.class);
when(dispatcher.getEventHandler()).thenReturn(ea);
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
AppContext appCtx = mock(AppContext.class);
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
when(appCtx.getClock()).thenReturn(clock);
when(appCtx.getEventHandler()).thenReturn(ea);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
when(appCtx.getEventHandler()).thenReturn(ea);
CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
policy.init(appCtx);
TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
}
};
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.TASK_PREEMPTION, true);
//conf.setBoolean("preemption.reduce", true);
listener.init(conf);
listener.start();
TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
List<Path> partialOut = new ArrayList<Path>();
partialOut.add(new Path("/prev1"));
partialOut.add(new Path("/prev2"));
Counters counters = mock(Counters.class);
final long CBYTES = 64L * 1024 * 1024;
final long CTIME = 4344L;
final Path CLOC = new Path("/test/1");
Counter cbytes = mock(Counter.class);
when(cbytes.getValue()).thenReturn(CBYTES);
Counter ctime = mock(Counter.class);
when(ctime.getValue()).thenReturn(CTIME);
when(counters.findCounter(eq(EnumCounter.CHECKPOINT_BYTES)))
.thenReturn(cbytes);
when(counters.findCounter(eq(EnumCounter.CHECKPOINT_MS)))
.thenReturn(ctime);
// propagating a taskstatus that contains a checkpoint id
TaskCheckpointID incid = new TaskCheckpointID(new FSCheckpointID(
CLOC), partialOut, counters);
listener.setCheckpointID(
org.apache.hadoop.mapred.TaskID.downgrade(tid.getTaskID()), incid);
// and try to get it back
CheckpointID outcid = listener.getCheckpointID(tid.getTaskID());
TaskCheckpointID tcid = (TaskCheckpointID) outcid;
assertEquals(CBYTES, tcid.getCheckpointBytes());
assertEquals(CTIME, tcid.getCheckpointTime());
assertTrue(partialOut.containsAll(tcid.getPartialCommittedOutput()));
assertTrue(tcid.getPartialCommittedOutput().containsAll(partialOut));
//assert it worked
assert outcid == incid;
listener.stop();
}
}

View File

@ -0,0 +1,329 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.util.resource.Resources;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.junit.Before;
import org.junit.Test;
public class TestCheckpointPreemptionPolicy {
TaskAttemptListenerImpl pel= null;
RMContainerAllocator r;
JobId jid;
RunningAppContext mActxt;
Set<ContainerId> preemptedContainers = new HashSet<ContainerId>();
Map<ContainerId,TaskAttemptId> assignedContainers =
new HashMap<ContainerId, TaskAttemptId>();
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
HashMap<ContainerId,Resource> contToResourceMap =
new HashMap<ContainerId, Resource>();
private int minAlloc = 1024;
@Before
@SuppressWarnings("rawtypes") // mocked generics
public void setup() {
ApplicationId appId = ApplicationId.newInstance(200, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
jid = MRBuilderUtils.newJobId(appId, 1);
mActxt = mock(RunningAppContext.class);
EventHandler ea = mock(EventHandler.class);
when(mActxt.getEventHandler()).thenReturn(ea);
for (int i = 0; i < 40; ++i) {
ContainerId cId = ContainerId.newInstance(appAttemptId, i);
if (0 == i % 7) {
preemptedContainers.add(cId);
}
TaskId tId = 0 == i % 2
? MRBuilderUtils.newTaskId(jid, i / 2, TaskType.MAP)
: MRBuilderUtils.newTaskId(jid, i / 2 + 1, TaskType.REDUCE);
assignedContainers.put(cId, MRBuilderUtils.newTaskAttemptId(tId, 0));
contToResourceMap.put(cId, Resource.newInstance(2 * minAlloc, 2));
}
for (Map.Entry<ContainerId,TaskAttemptId> ent :
assignedContainers.entrySet()) {
System.out.println("cont:" + ent.getKey().getId() +
" type:" + ent.getValue().getTaskId().getTaskType() +
" res:" + contToResourceMap.get(ent.getKey()).getMemory() + "MB" );
}
}
@Test
public void testStrictPreemptionContract() {
final Map<ContainerId,TaskAttemptId> containers = assignedContainers;
AMPreemptionPolicy.Context mPctxt = new AMPreemptionPolicy.Context() {
@Override
public TaskAttemptId getTaskAttempt(ContainerId cId) {
return containers.get(cId);
}
@Override
public List<Container> getContainers(TaskType t) {
List<Container> p = new ArrayList<Container>();
for (Map.Entry<ContainerId,TaskAttemptId> ent :
assignedContainers.entrySet()) {
if (ent.getValue().getTaskId().getTaskType().equals(t)) {
p.add(Container.newInstance(ent.getKey(), null, null,
contToResourceMap.get(ent.getKey()),
Priority.newInstance(0), null));
}
}
return p;
}
};
PreemptionMessage pM = generatePreemptionMessage(preemptedContainers,
contToResourceMap, Resource.newInstance(1024, 1), true);
CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
policy.init(mActxt);
policy.preempt(mPctxt, pM);
for (ContainerId c : preemptedContainers) {
TaskAttemptId t = assignedContainers.get(c);
if (TaskType.MAP.equals(t.getTaskId().getTaskType())) {
assert policy.isPreempted(t) == false;
} else {
assert policy.isPreempted(t);
}
}
}
@Test
public void testPreemptionContract() {
final Map<ContainerId,TaskAttemptId> containers = assignedContainers;
AMPreemptionPolicy.Context mPctxt = new AMPreemptionPolicy.Context() {
@Override
public TaskAttemptId getTaskAttempt(ContainerId cId) {
return containers.get(cId);
}
@Override
public List<Container> getContainers(TaskType t) {
List<Container> p = new ArrayList<Container>();
for (Map.Entry<ContainerId,TaskAttemptId> ent :
assignedContainers.entrySet()){
if(ent.getValue().getTaskId().getTaskType().equals(t)){
p.add(Container.newInstance(ent.getKey(), null, null,
contToResourceMap.get(ent.getKey()),
Priority.newInstance(0), null));
}
}
return p;
}
};
PreemptionMessage pM = generatePreemptionMessage(preemptedContainers,
contToResourceMap, Resource.newInstance(minAlloc, 1), false);
CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
policy.init(mActxt);
int supposedMemPreemption = pM.getContract().getResourceRequest()
.get(0).getResourceRequest().getCapability().getMemory()
* pM.getContract().getResourceRequest().get(0).getResourceRequest()
.getNumContainers();
// first round of preemption
policy.preempt(mPctxt, pM);
List<TaskAttemptId> preempting =
validatePreemption(pM, policy, supposedMemPreemption);
// redundant message
policy.preempt(mPctxt, pM);
List<TaskAttemptId> preempting2 =
validatePreemption(pM, policy, supposedMemPreemption);
// check that nothing got added
assert preempting2.equals(preempting);
// simulate 2 task completions/successful preemption
policy.handleCompletedContainer(preempting.get(0));
policy.handleCompletedContainer(preempting.get(1));
// remove from assignedContainers
Iterator<Map.Entry<ContainerId,TaskAttemptId>> it =
assignedContainers.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<ContainerId,TaskAttemptId> ent = it.next();
if (ent.getValue().equals(preempting.get(0)) ||
ent.getValue().equals(preempting.get(1)))
it.remove();
}
// one more message asking for preemption
policy.preempt(mPctxt, pM);
// triggers preemption of 2 more containers (i.e., the preemption set changes)
List<TaskAttemptId> preempting3 =
validatePreemption(pM, policy, supposedMemPreemption);
assert preempting3.equals(preempting2) == false;
}
private List<TaskAttemptId> validatePreemption(PreemptionMessage pM,
CheckpointAMPreemptionPolicy policy, int supposedMemPreemption) {
Resource effectivelyPreempted = Resource.newInstance(0, 0);
List<TaskAttemptId> preempting = new ArrayList<TaskAttemptId>();
for (Map.Entry<ContainerId, TaskAttemptId> ent :
assignedContainers.entrySet()) {
if (policy.isPreempted(ent.getValue())) {
Resources.addTo(effectivelyPreempted,contToResourceMap.get(ent.getKey()));
// preempt only reducers
if (policy.isPreempted(ent.getValue())){
assertEquals(TaskType.REDUCE, ent.getValue().getTaskId().getTaskType());
preempting.add(ent.getValue());
}
}
}
// preempt enough
assert (effectivelyPreempted.getMemory() >= supposedMemPreemption)
: " preempted: " + effectivelyPreempted.getMemory();
// preempt not too much enough
assert effectivelyPreempted.getMemory() <= supposedMemPreemption + minAlloc;
return preempting;
}
private PreemptionMessage generatePreemptionMessage(
Set<ContainerId> containerToPreempt,
HashMap<ContainerId, Resource> resPerCont,
Resource minimumAllocation, boolean strict) {
Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
new HashSet<ContainerId>(containerToPreempt));
containerToPreempt.clear();
Resource tot = Resource.newInstance(0, 0);
for(ContainerId c : currentContPreemption){
Resources.addTo(tot,
resPerCont.get(c));
}
int numCont = (int) Math.ceil(tot.getMemory() /
(double) minimumAllocation.getMemory());
ResourceRequest rr = ResourceRequest.newInstance(
Priority.newInstance(0), ResourceRequest.ANY,
minimumAllocation, numCont);
if (strict) {
return generatePreemptionMessage(new Allocation(null, null,
currentContPreemption, null, null));
}
return generatePreemptionMessage(new Allocation(null, null,
null, currentContPreemption,
Collections.singletonList(rr)));
}
private PreemptionMessage generatePreemptionMessage(Allocation allocation) {
PreemptionMessage pMsg = null;
// assemble strict preemption request
if (allocation.getStrictContainerPreemptions() != null) {
pMsg = recordFactory.newRecordInstance(PreemptionMessage.class);
StrictPreemptionContract pStrict =
recordFactory.newRecordInstance(StrictPreemptionContract.class);
Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
PreemptionContainer pc =
recordFactory.newRecordInstance(PreemptionContainer.class);
pc.setId(cId);
pCont.add(pc);
}
pStrict.setContainers(pCont);
pMsg.setStrictContract(pStrict);
}
// assemble negotiable preemption request
if (allocation.getResourcePreemptions() != null &&
allocation.getResourcePreemptions().size() > 0 &&
allocation.getContainerPreemptions() != null &&
allocation.getContainerPreemptions().size() > 0) {
if (pMsg == null) {
pMsg = recordFactory.newRecordInstance(PreemptionMessage.class);
}
PreemptionContract contract =
recordFactory.newRecordInstance(PreemptionContract.class);
Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
for (ContainerId cId : allocation.getContainerPreemptions()) {
PreemptionContainer pc =
recordFactory.newRecordInstance(PreemptionContainer.class);
pc.setId(cId);
pCont.add(pc);
}
List<PreemptionResourceRequest> pRes =
new ArrayList<PreemptionResourceRequest>();
for (ResourceRequest crr : allocation.getResourcePreemptions()) {
PreemptionResourceRequest prr =
recordFactory.newRecordInstance(PreemptionResourceRequest.class);
prr.setResourceRequest(crr);
pRes.add(prr);
}
contract.setContainers(pCont);
contract.setResourceRequest(pRes);
pMsg.setContract(contract);
}
return pMsg;
}
}

View File

@ -53,6 +53,7 @@
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@ -575,10 +576,17 @@ public void run() {
// TaskUmbilicalProtocol methods
@Override
public JvmTask getTask(JvmContext context) { return null; }
public synchronized boolean statusUpdate(TaskAttemptID taskId,
@Override
public synchronized AMFeedback statusUpdate(TaskAttemptID taskId,
TaskStatus taskStatus) throws IOException, InterruptedException {
AMFeedback feedback = new AMFeedback();
feedback.setTaskFound(true);
if (null == taskStatus) {
return feedback;
}
// Serialize as we would if distributed in order to make deep copy
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
@ -618,7 +626,7 @@ public synchronized boolean statusUpdate(TaskAttemptID taskId,
}
// ignore phase
return true;
return feedback;
}
/** Return the current values of the counters for this job,
@ -654,24 +662,24 @@ public void commitPending(TaskAttemptID taskid,
statusUpdate(taskid, taskStatus);
}
@Override
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
// Ignore for now
}
@Override
public void reportNextRecordRange(TaskAttemptID taskid,
SortedRanges.Range range) throws IOException {
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
}
public boolean ping(TaskAttemptID taskid) throws IOException {
return true;
}
@Override
public boolean canCommit(TaskAttemptID taskid)
throws IOException {
return true;
}
@Override
public void done(TaskAttemptID taskId) throws IOException {
int taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
@ -681,11 +689,13 @@ public void done(TaskAttemptID taskId) throws IOException {
}
}
@Override
public synchronized void fsError(TaskAttemptID taskId, String message)
throws IOException {
LOG.fatal("FSError: "+ message + "from task: " + taskId);
}
@Override
public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
}
@ -695,12 +705,30 @@ public synchronized void fatalError(TaskAttemptID taskId, String msg)
LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
}
@Override
public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
return new MapTaskCompletionEventsUpdate(
org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false);
}
@Override
public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
// ignore
}
@Override
public TaskCheckpointID getCheckpointID(TaskID taskId) {
// ignore
return null;
}
@Override
public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
// ignore
}
}
public LocalJobRunner(Configuration conf) throws IOException {

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* This class is a simple struct to include both the taskFound information and
* a possible preemption request coming from the AM.
*/
public class AMFeedback implements Writable {
boolean taskFound;
boolean preemption;
public void setTaskFound(boolean t){
taskFound=t;
}
public boolean getTaskFound(){
return taskFound;
}
public void setPreemption(boolean preemption) {
this.preemption=preemption;
}
public boolean getPreemption() {
return preemption;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(taskFound);
out.writeBoolean(preemption);
}
@Override
public void readFields(DataInput in) throws IOException {
taskFound = in.readBoolean();
preemption = in.readBoolean();
}
}

View File

@ -187,6 +187,7 @@ static synchronized String getOutputName(int partition) {
protected SecretKey tokenSecret;
protected SecretKey shuffleSecret;
protected GcTimeUpdater gcUpdater;
final AtomicBoolean mustPreempt = new AtomicBoolean(false);
////////////////////////////////////////////
// Constructors
@ -711,6 +712,7 @@ public void run() {
}
try {
boolean taskFound = true; // whether TT knows about this task
AMFeedback amFeedback = null;
// sleep for a bit
synchronized(lock) {
if (taskDone.get()) {
@ -728,12 +730,14 @@ public void run() {
taskStatus.statusUpdate(taskProgress.get(),
taskProgress.toString(),
counters);
taskFound = umbilical.statusUpdate(taskId, taskStatus);
amFeedback = umbilical.statusUpdate(taskId, taskStatus);
taskFound = amFeedback.getTaskFound();
taskStatus.clearStatus();
}
else {
// send ping
taskFound = umbilical.ping(taskId);
amFeedback = umbilical.statusUpdate(taskId, null);
taskFound = amFeedback.getTaskFound();
}
// if Task Tracker is not aware of our task ID (probably because it died and
@ -744,6 +748,17 @@ public void run() {
System.exit(66);
}
// Set a flag that says we should preempt this is read by
// ReduceTasks in places of the execution where it is
// safe/easy to preempt
boolean lastPreempt = mustPreempt.get();
mustPreempt.set(mustPreempt.get() || amFeedback.getPreemption());
if (lastPreempt ^ mustPreempt.get()) {
LOG.info("PREEMPTION TASK: setting mustPreempt to " +
mustPreempt.get() + " given " + amFeedback.getPreemption() +
" for "+ taskId + " task status: " +taskStatus.getPhase());
}
sendProgress = resetProgressFlag();
remainingRetries = MAX_RETRIES;
}
@ -992,10 +1007,17 @@ private void updateHeapUsageCounter() {
public void done(TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, InterruptedException {
LOG.info("Task:" + taskId + " is done."
+ " And is in the process of committing");
updateCounters();
if (taskStatus.getRunState() == TaskStatus.State.PREEMPTED ) {
// If we are preempted, do no output promotion; signal done and exit
committer.commitTask(taskContext);
umbilical.preempted(taskId, taskStatus);
taskDone.set(true);
reporter.stopCommunicationThread();
return;
}
LOG.info("Task:" + taskId + " is done."
+ " And is in the process of committing");
boolean commitRequired = isCommitRequired();
if (commitRequired) {
int retries = MAX_RETRIES;
@ -1054,7 +1076,7 @@ public void statusUpdate(TaskUmbilicalProtocol umbilical)
int retries = MAX_RETRIES;
while (true) {
try {
if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
if (!umbilical.statusUpdate(getTaskID(), taskStatus).getTaskFound()) {
LOG.warn("Parent died. Exiting "+taskId);
System.exit(66);
}
@ -1098,8 +1120,8 @@ private long calculateOutputSize() throws IOException {
if (isMapTask() && conf.getNumReduceTasks() > 0) {
try {
Path mapOutput = mapOutputFile.getOutputFile();
FileSystem localFS = FileSystem.getLocal(conf);
return localFS.getFileStatus(mapOutput).getLen();
FileSystem fs = mapOutput.getFileSystem(conf);
return fs.getFileStatus(mapOutput).getLen();
} catch (IOException e) {
LOG.warn ("Could not find output size " , e);
}

View File

@ -51,7 +51,7 @@ public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN, PREEMPTED}
private final TaskAttemptID taskid;
private float progress;

View File

@ -24,6 +24,9 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.mapred.JvmTask;
import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSelector;
import org.apache.hadoop.security.token.TokenInfo;
@ -64,9 +67,10 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol {
* Version 17 Modified TaskID to be aware of the new TaskTypes
* Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
* Version 19 Added fatalError for child to communicate fatal errors to TT
* Version 20 Added methods to manage checkpoints
* */
public static final long versionID = 19L;
public static final long versionID = 20L;
/**
* Called when a child task process starts, to get its task.
@ -78,7 +82,8 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol {
JvmTask getTask(JvmContext context) throws IOException;
/**
* Report child's progress to parent.
* Report child's progress to parent. Also invoked to report still alive (used
* to be in ping). It reports an AMFeedback used to propagate preemption requests.
*
* @param taskId task-id of the child
* @param taskStatus status of the child
@ -86,7 +91,7 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol {
* @throws InterruptedException
* @return True if the task is known
*/
boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException;
/** Report error messages back to parent. Calls should be sparing, since all
@ -105,11 +110,6 @@ boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range)
throws IOException;
/** Periodically called by child to check if parent is still alive.
* @return True if the task is known
*/
boolean ping(TaskAttemptID taskid) throws IOException;
/** Report that the task is successfully completed. Failure is assumed if
* the task process exits without calling this.
* @param taskid task's id
@ -161,4 +161,33 @@ MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
TaskAttemptID id)
throws IOException;
/**
* Report to the AM that the task has been succesfully preempted.
*
* @param taskId task's id
* @param taskStatus status of the child
* @throws IOException
*/
void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException;
/**
* Return the latest CheckpointID for the given TaskID. This provides
* the task with a way to locate the checkpointed data and restart from
* that point in the computation.
*
* @param taskID task's id
* @return the most recent checkpoint (if any) for this task
* @throws IOException
*/
TaskCheckpointID getCheckpointID(TaskID taskID);
/**
* Send a CheckpointID for a given TaskID to be stored in the AM,
* to later restart a task from this checkpoint.
* @param tid
* @param cid
*/
void setCheckpointID(TaskID tid, TaskCheckpointID cid);
}

View File

@ -34,37 +34,31 @@
* cost of checkpoints and other counters. This is sent by the task to the AM
* to be stored and provided to the next execution of the same task.
*/
public class TaskCheckpointID implements CheckpointID{
public class TaskCheckpointID implements CheckpointID {
FSCheckpointID rawId;
private List<Path> partialOutput;
private Counters counters;
final FSCheckpointID rawId;
private final List<Path> partialOutput;
private final Counters counters;
public TaskCheckpointID() {
this.rawId = new FSCheckpointID();
this.partialOutput = new ArrayList<Path>();
this(new FSCheckpointID(), new ArrayList<Path>(), new Counters());
}
public TaskCheckpointID(FSCheckpointID rawId, List<Path> partialOutput,
Counters counters) {
this.rawId = rawId;
this.counters = counters;
if(partialOutput == null)
this.partialOutput = new ArrayList<Path>();
else
this.partialOutput = partialOutput;
this.partialOutput = null == partialOutput
? new ArrayList<Path>()
: partialOutput;
}
@Override
public void write(DataOutput out) throws IOException {
counters.write(out);
if (partialOutput == null) {
WritableUtils.writeVLong(out, 0L);
} else {
WritableUtils.writeVLong(out, partialOutput.size());
for(Path p:partialOutput){
Text.writeString(out, p.toString());
}
WritableUtils.writeVLong(out, partialOutput.size());
for (Path p : partialOutput) {
Text.writeString(out, p.toString());
}
rawId.write(out);
}
@ -74,21 +68,22 @@ public void readFields(DataInput in) throws IOException {
partialOutput.clear();
counters.readFields(in);
long numPout = WritableUtils.readVLong(in);
for(int i=0;i<numPout;i++)
for (int i = 0; i < numPout; i++) {
partialOutput.add(new Path(Text.readString(in)));
}
rawId.readFields(in);
}
@Override
public boolean equals(Object other){
public boolean equals(Object other) {
if (other instanceof TaskCheckpointID){
return this.rawId.equals(((TaskCheckpointID)other).rawId) &&
this.counters.equals(((TaskCheckpointID) other).counters) &&
this.partialOutput.containsAll(((TaskCheckpointID) other).partialOutput) &&
((TaskCheckpointID) other).partialOutput.containsAll(this.partialOutput);
} else {
return false;
TaskCheckpointID o = (TaskCheckpointID) other;
return rawId.equals(o.rawId) &&
counters.equals(o.counters) &&
partialOutput.containsAll(o.partialOutput) &&
o.partialOutput.containsAll(partialOutput);
}
return false;
}
@Override
@ -110,7 +105,7 @@ public long getCheckpointTime() {
return counters.findCounter(EnumCounter.CHECKPOINT_MS).getValue();
}
public String toString(){
public String toString() {
return rawId.toString() + " counters:" + counters;
}

View File

@ -20,7 +20,6 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import junit.framework.TestCase;
@ -29,20 +28,17 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.util.ReflectionUtils;
/**
@ -110,11 +106,16 @@ public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
statusUpdate(taskId, taskStatus);
}
public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
statusUpdate(taskId, taskStatus);
}
public boolean canCommit(TaskAttemptID taskid) throws IOException {
return true;
}
public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
public AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
StringBuffer buf = new StringBuffer("Task ");
buf.append(taskId);
@ -128,7 +129,9 @@ public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
LOG.info(buf.toString());
// ignore phase
// ignore counters
return true;
AMFeedback a = new AMFeedback();
a.setTaskFound(true);
return a;
}
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
@ -145,6 +148,17 @@ public void reportNextRecordRange(TaskAttemptID taskid,
SortedRanges.Range range) throws IOException {
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
}
@Override
public TaskCheckpointID getCheckpointID(TaskID taskId) {
// do nothing
return null;
}
@Override
public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
// do nothing
}
}
private FileSystem fs = null;

View File

@ -27,6 +27,10 @@
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
public class TestTaskCommit extends HadoopTestCase {
Path rootDir =
@ -131,11 +135,6 @@ public JvmTask getTask(JvmContext context) throws IOException {
return null;
}
@Override
public boolean ping(TaskAttemptID taskid) throws IOException {
return true;
}
@Override
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace)
throws IOException {
@ -152,9 +151,11 @@ public void shuffleError(TaskAttemptID taskId, String message)
}
@Override
public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
public AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
return true;
AMFeedback a = new AMFeedback();
a.setTaskFound(true);
return a;
}
@Override
@ -168,6 +169,22 @@ public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return null;
}
@Override
public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
fail("Task should not go to commit-pending");
}
@Override
public TaskCheckpointID getCheckpointID(TaskID taskId) {
return null;
}
@Override
public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
// ignore
}
}
/**

View File

@ -115,7 +115,7 @@ public Object run() throws Exception {
proxy = (TaskUmbilicalProtocol) RPC.getProxy(
TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID,
addr, conf);
proxy.ping(null);
proxy.statusUpdate(null, null);
} finally {
server.stop();
if (proxy != null) {