MAPREDUCE-3006. Fixed MapReduce AM to exit only after properly writing out history file. (vinodkv)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1172206 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7985d3904c
commit
61900651b1
@ -1339,6 +1339,9 @@ Release 0.23.0 - Unreleased
|
||||
MAPREDUCE-2987. Fixed display of logged user on RM Web-UI. (Thomas Graves
|
||||
via acmurthy)
|
||||
|
||||
MAPREDUCE-3006. Fixed MapReduce AM to exit only after properly writing out
|
||||
history file. (vinodkv)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -74,7 +74,7 @@ public class JobHistoryEventHandler extends AbstractService
|
||||
|
||||
private BlockingQueue<JobHistoryEvent> eventQueue =
|
||||
new LinkedBlockingQueue<JobHistoryEvent>();
|
||||
private Thread eventHandlingThread;
|
||||
protected Thread eventHandlingThread;
|
||||
private volatile boolean stopped;
|
||||
private final Object lock = new Object();
|
||||
|
||||
|
@ -56,12 +56,14 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
|
||||
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
|
||||
@ -83,6 +85,7 @@
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.service.Service;
|
||||
|
||||
@ -126,6 +129,7 @@ public class MRAppMaster extends CompositeService {
|
||||
private TaskAttemptListener taskAttemptListener;
|
||||
private JobTokenSecretManager jobTokenSecretManager =
|
||||
new JobTokenSecretManager();
|
||||
private JobEventDispatcher jobEventDispatcher;
|
||||
|
||||
private Job job;
|
||||
|
||||
@ -148,7 +152,7 @@ public MRAppMaster(ApplicationId applicationId, Clock clock, int startCount) {
|
||||
|
||||
@Override
|
||||
public void init(final Configuration conf) {
|
||||
context = new RunningAppContext();
|
||||
context = new RunningAppContext(conf);
|
||||
|
||||
// Job name is the same as the app name util we support DAG of jobs
|
||||
// for an app later
|
||||
@ -182,18 +186,17 @@ public void init(final Configuration conf) {
|
||||
//service to log job history events
|
||||
EventHandler<JobHistoryEvent> historyService =
|
||||
createJobHistoryHandler(context);
|
||||
addIfService(historyService);
|
||||
dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
|
||||
historyService);
|
||||
|
||||
JobEventDispatcher synchronousJobEventDispatcher = new JobEventDispatcher();
|
||||
this.jobEventDispatcher = new JobEventDispatcher();
|
||||
|
||||
//register the event dispatchers
|
||||
dispatcher.register(JobEventType.class, synchronousJobEventDispatcher);
|
||||
dispatcher.register(JobEventType.class, jobEventDispatcher);
|
||||
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
|
||||
dispatcher.register(TaskAttemptEventType.class,
|
||||
new TaskAttemptEventDispatcher());
|
||||
dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
|
||||
dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
|
||||
historyService);
|
||||
|
||||
if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|
||||
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
|
||||
@ -203,10 +206,34 @@ public void init(final Configuration conf) {
|
||||
}
|
||||
|
||||
dispatcher.register(Speculator.EventType.class,
|
||||
new SpeculatorEventDispatcher());
|
||||
new SpeculatorEventDispatcher(conf));
|
||||
|
||||
// service to allocate containers from RM (if non-uber) or to fake it (uber)
|
||||
containerAllocator = createContainerAllocator(clientService, context);
|
||||
addIfService(containerAllocator);
|
||||
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
|
||||
|
||||
// corresponding service to launch allocated containers via NodeManager
|
||||
containerLauncher = createContainerLauncher(context);
|
||||
addIfService(containerLauncher);
|
||||
dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
|
||||
|
||||
// Add the JobHistoryEventHandler last so that it is properly stopped first.
|
||||
// This will guarantee that all history-events are flushed before AM goes
|
||||
// ahead with shutdown.
|
||||
// Note: Even though JobHistoryEventHandler is started last, if any
|
||||
// component creates a JobHistoryEvent in the meanwhile, it will be just be
|
||||
// queued inside the JobHistoryEventHandler
|
||||
addIfService(historyService);
|
||||
|
||||
super.init(conf);
|
||||
} // end of init()
|
||||
|
||||
/** Create and initialize (but don't start) a single job. */
|
||||
protected Job createJob(Configuration conf) {
|
||||
|
||||
// ////////// Obtain the tokens needed by the job. //////////
|
||||
Credentials fsTokens = new Credentials();
|
||||
|
||||
UserGroupInformation currentUser = null;
|
||||
|
||||
try {
|
||||
@ -234,66 +261,12 @@ public void init(final Configuration conf) {
|
||||
} catch (IOException e) {
|
||||
throw new YarnException(e);
|
||||
}
|
||||
|
||||
super.init(conf);
|
||||
|
||||
//---- start of what used to be startJobs() code:
|
||||
|
||||
Configuration config = getConfig();
|
||||
|
||||
job = createJob(config, fsTokens, currentUser.getUserName());
|
||||
|
||||
/** create a job event for job intialization */
|
||||
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
|
||||
/** send init to the job (this does NOT trigger job execution) */
|
||||
synchronousJobEventDispatcher.handle(initJobEvent);
|
||||
|
||||
// send init to speculator. This won't yest start as dispatcher isn't
|
||||
// started yet.
|
||||
dispatcher.getEventHandler().handle(
|
||||
new SpeculatorEvent(job.getID(), clock.getTime()));
|
||||
|
||||
// JobImpl's InitTransition is done (call above is synchronous), so the
|
||||
// "uber-decision" (MR-1220) has been made. Query job and switch to
|
||||
// ubermode if appropriate (by registering different container-allocator
|
||||
// and container-launcher services/event-handlers).
|
||||
|
||||
if (job.isUber()) {
|
||||
LOG.info("MRAppMaster uberizing job " + job.getID()
|
||||
+ " in local container (\"uber-AM\").");
|
||||
} else {
|
||||
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
|
||||
+ "job " + job.getID() + ".");
|
||||
}
|
||||
|
||||
// service to allocate containers from RM (if non-uber) or to fake it (uber)
|
||||
containerAllocator =
|
||||
createContainerAllocator(clientService, context, job.isUber());
|
||||
addIfService(containerAllocator);
|
||||
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
|
||||
if (containerAllocator instanceof Service) {
|
||||
((Service) containerAllocator).init(config);
|
||||
}
|
||||
|
||||
// corresponding service to launch allocated containers via NodeManager
|
||||
containerLauncher = createContainerLauncher(context, job.isUber());
|
||||
addIfService(containerLauncher);
|
||||
dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
|
||||
if (containerLauncher instanceof Service) {
|
||||
((Service) containerLauncher).init(config);
|
||||
}
|
||||
|
||||
} // end of init()
|
||||
|
||||
/** Create and initialize (but don't start) a single job.
|
||||
* @param fsTokens */
|
||||
protected Job createJob(Configuration conf, Credentials fsTokens,
|
||||
String user) {
|
||||
// ////////// End of obtaining the tokens needed by the job. //////////
|
||||
|
||||
// create single job
|
||||
Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
|
||||
taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount,
|
||||
completedTasksFromPreviousRun, metrics, user);
|
||||
completedTasksFromPreviousRun, metrics, currentUser.getUserName());
|
||||
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
|
||||
|
||||
dispatcher.register(JobFinishEvent.Type.class,
|
||||
@ -388,19 +361,13 @@ protected TaskCleaner createTaskCleaner(AppContext context) {
|
||||
}
|
||||
|
||||
protected ContainerAllocator createContainerAllocator(
|
||||
ClientService clientService, AppContext context, boolean isLocal) {
|
||||
//return new StaticContainerAllocator(context);
|
||||
return isLocal
|
||||
? new LocalContainerAllocator(clientService, context)
|
||||
: new RMContainerAllocator(clientService, context);
|
||||
final ClientService clientService, final AppContext context) {
|
||||
return new ContainerAllocatorRouter(clientService, context);
|
||||
}
|
||||
|
||||
protected ContainerLauncher createContainerLauncher(AppContext context,
|
||||
boolean isLocal) {
|
||||
return isLocal
|
||||
? new LocalContainerLauncher(context,
|
||||
(TaskUmbilicalProtocol) taskAttemptListener)
|
||||
: new ContainerLauncherImpl(context);
|
||||
protected ContainerLauncher
|
||||
createContainerLauncher(final AppContext context) {
|
||||
return new ContainerLauncherRouter(context);
|
||||
}
|
||||
|
||||
//TODO:should have an interface for MRClientService
|
||||
@ -440,9 +407,96 @@ public TaskAttemptListener getTaskAttemptListener() {
|
||||
return taskAttemptListener;
|
||||
}
|
||||
|
||||
class RunningAppContext implements AppContext {
|
||||
/**
|
||||
* By the time life-cycle of this router starts, job-init would have already
|
||||
* happened.
|
||||
*/
|
||||
private final class ContainerAllocatorRouter extends AbstractService
|
||||
implements ContainerAllocator {
|
||||
private final ClientService clientService;
|
||||
private final AppContext context;
|
||||
private ContainerAllocator containerAllocator;
|
||||
|
||||
private Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
|
||||
ContainerAllocatorRouter(ClientService clientService,
|
||||
AppContext context) {
|
||||
super(ContainerAllocatorRouter.class.getName());
|
||||
this.clientService = clientService;
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
if (job.isUber()) {
|
||||
this.containerAllocator = new LocalContainerAllocator(
|
||||
this.clientService, this.context);
|
||||
} else {
|
||||
this.containerAllocator = new RMContainerAllocator(
|
||||
this.clientService, this.context);
|
||||
}
|
||||
((Service)this.containerAllocator).init(getConfig());
|
||||
((Service)this.containerAllocator).start();
|
||||
super.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
((Service)this.containerAllocator).stop();
|
||||
super.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ContainerAllocatorEvent event) {
|
||||
this.containerAllocator.handle(event);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* By the time life-cycle of this router starts, job-init would have already
|
||||
* happened.
|
||||
*/
|
||||
private final class ContainerLauncherRouter extends AbstractService
|
||||
implements ContainerLauncher {
|
||||
private final AppContext context;
|
||||
private ContainerLauncher containerLauncher;
|
||||
|
||||
ContainerLauncherRouter(AppContext context) {
|
||||
super(ContainerLauncherRouter.class.getName());
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
if (job.isUber()) {
|
||||
this.containerLauncher = new LocalContainerLauncher(context,
|
||||
(TaskUmbilicalProtocol) taskAttemptListener);
|
||||
} else {
|
||||
this.containerLauncher = new ContainerLauncherImpl(context);
|
||||
}
|
||||
((Service)this.containerLauncher).init(getConfig());
|
||||
((Service)this.containerLauncher).start();
|
||||
super.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ContainerLauncherEvent event) {
|
||||
this.containerLauncher.handle(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
((Service)this.containerLauncher).stop();
|
||||
super.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private class RunningAppContext implements AppContext {
|
||||
|
||||
private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
|
||||
private final Configuration conf;
|
||||
|
||||
public RunningAppContext(Configuration config) {
|
||||
this.conf = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationAttemptId getApplicationAttemptId() {
|
||||
@ -481,7 +535,7 @@ public EventHandler getEventHandler() {
|
||||
|
||||
@Override
|
||||
public CharSequence getUser() {
|
||||
return getConfig().get(MRJobConfig.USER_NAME);
|
||||
return this.conf.get(MRJobConfig.USER_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -492,13 +546,45 @@ public Clock getClock() {
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
///////////////////// Create the job itself.
|
||||
job = createJob(getConfig());
|
||||
// End of creating the job.
|
||||
|
||||
// metrics system init is really init & start.
|
||||
// It's more test friendly to put it here.
|
||||
DefaultMetricsSystem.initialize("MRAppMaster");
|
||||
|
||||
startJobs();
|
||||
/** create a job event for job intialization */
|
||||
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
|
||||
/** send init to the job (this does NOT trigger job execution) */
|
||||
// This is a synchronous call, not an event through dispatcher. We want
|
||||
// job-init to be done completely here.
|
||||
jobEventDispatcher.handle(initJobEvent);
|
||||
|
||||
// send init to speculator. This won't yest start as dispatcher isn't
|
||||
// started yet.
|
||||
dispatcher.getEventHandler().handle(
|
||||
new SpeculatorEvent(job.getID(), clock.getTime()));
|
||||
|
||||
// JobImpl's InitTransition is done (call above is synchronous), so the
|
||||
// "uber-decision" (MR-1220) has been made. Query job and switch to
|
||||
// ubermode if appropriate (by registering different container-allocator
|
||||
// and container-launcher services/event-handlers).
|
||||
|
||||
if (job.isUber()) {
|
||||
LOG.info("MRAppMaster uberizing job " + job.getID()
|
||||
+ " in local container (\"uber-AM\").");
|
||||
} else {
|
||||
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
|
||||
+ "job " + job.getID() + ".");
|
||||
}
|
||||
|
||||
//start all the components
|
||||
super.start();
|
||||
|
||||
// All components have started, start the job.
|
||||
startJobs();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -546,10 +632,14 @@ public void handle(TaskAttemptEvent event) {
|
||||
|
||||
private class SpeculatorEventDispatcher implements
|
||||
EventHandler<SpeculatorEvent> {
|
||||
private final Configuration conf;
|
||||
public SpeculatorEventDispatcher(Configuration config) {
|
||||
this.conf = config;
|
||||
}
|
||||
@Override
|
||||
public void handle(SpeculatorEvent event) {
|
||||
if (getConfig().getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|
||||
|| getConfig().getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
|
||||
if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|
||||
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
|
||||
// Speculator IS enabled, direct the event to there.
|
||||
speculator.handle(event);
|
||||
}
|
||||
|
@ -63,6 +63,7 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.Clock;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
@ -234,11 +235,16 @@ public void verifyCompleted() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Job createJob(Configuration conf, Credentials fsTokens,
|
||||
String user) {
|
||||
Job newJob = new TestJob(getAppID(), getDispatcher().getEventHandler(),
|
||||
protected Job createJob(Configuration conf) {
|
||||
UserGroupInformation currentUser = null;
|
||||
try {
|
||||
currentUser = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException e) {
|
||||
throw new YarnException(e);
|
||||
}
|
||||
Job newJob = new TestJob(conf, getAppID(), getDispatcher().getEventHandler(),
|
||||
getTaskAttemptListener(), getContext().getClock(),
|
||||
user);
|
||||
currentUser.getUserName());
|
||||
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
|
||||
|
||||
getDispatcher().register(JobFinishEvent.Type.class,
|
||||
@ -279,8 +285,7 @@ public void handle(JobHistoryEvent event) {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainerLauncher createContainerLauncher(AppContext context,
|
||||
boolean isLocal) {
|
||||
protected ContainerLauncher createContainerLauncher(AppContext context) {
|
||||
return new MockContainerLauncher();
|
||||
}
|
||||
|
||||
@ -317,7 +322,7 @@ protected void attemptLaunched(TaskAttemptId attemptID) {
|
||||
|
||||
@Override
|
||||
protected ContainerAllocator createContainerAllocator(
|
||||
ClientService clientService, AppContext context, boolean isLocal) {
|
||||
ClientService clientService, AppContext context) {
|
||||
return new ContainerAllocator(){
|
||||
private int containerCount;
|
||||
@Override
|
||||
@ -369,12 +374,14 @@ public int getHttpPort() {
|
||||
|
||||
class TestJob extends JobImpl {
|
||||
//override the init transition
|
||||
private final TestInitTransition initTransition = new TestInitTransition(
|
||||
maps, reduces);
|
||||
StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory
|
||||
= stateMachineFactory.addTransition(JobState.NEW,
|
||||
EnumSet.of(JobState.INITED, JobState.FAILED),
|
||||
JobEventType.JOB_INIT,
|
||||
// This is abusive.
|
||||
new TestInitTransition(getConfig(), maps, reduces));
|
||||
initTransition);
|
||||
|
||||
private final StateMachine<JobState, JobEventType, JobEvent>
|
||||
localStateMachine;
|
||||
@ -384,10 +391,10 @@ protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
|
||||
return localStateMachine;
|
||||
}
|
||||
|
||||
public TestJob(ApplicationId appID, EventHandler eventHandler,
|
||||
TaskAttemptListener taskAttemptListener, Clock clock,
|
||||
String user) {
|
||||
super(appID, new Configuration(), eventHandler, taskAttemptListener,
|
||||
public TestJob(Configuration conf, ApplicationId appID,
|
||||
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
|
||||
Clock clock, String user) {
|
||||
super(appID, conf, eventHandler, taskAttemptListener,
|
||||
new JobTokenSecretManager(), new Credentials(), clock, getStartCount(),
|
||||
getCompletedTaskFromPreviousRun(), metrics, user);
|
||||
|
||||
@ -399,17 +406,14 @@ public TestJob(ApplicationId appID, EventHandler eventHandler,
|
||||
|
||||
//Override InitTransition to not look for split files etc
|
||||
static class TestInitTransition extends JobImpl.InitTransition {
|
||||
private Configuration config;
|
||||
private int maps;
|
||||
private int reduces;
|
||||
TestInitTransition(Configuration config, int maps, int reduces) {
|
||||
this.config = config;
|
||||
TestInitTransition(int maps, int reduces) {
|
||||
this.maps = maps;
|
||||
this.reduces = reduces;
|
||||
}
|
||||
@Override
|
||||
protected void setup(JobImpl job) throws IOException {
|
||||
job.conf = config;
|
||||
job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces);
|
||||
job.remoteJobConfFile = new Path("test");
|
||||
}
|
||||
|
@ -94,7 +94,7 @@ protected void attemptLaunched(TaskAttemptId attemptID) {
|
||||
|
||||
@Override
|
||||
protected ContainerAllocator createContainerAllocator(
|
||||
ClientService clientService, AppContext context, boolean isLocal) {
|
||||
ClientService clientService, AppContext context) {
|
||||
return new ThrottledContainerAllocator();
|
||||
}
|
||||
|
||||
|
@ -169,7 +169,7 @@ public void testTimedOutTask() throws Exception {
|
||||
|
||||
@Test
|
||||
public void testTaskFailWithUnusedContainer() throws Exception {
|
||||
MRApp app = new FailingTaskWithUnusedContainer();
|
||||
MRApp app = new MRAppWithFailingTaskAndUnusedContainer();
|
||||
Configuration conf = new Configuration();
|
||||
int maxAttempts = 1;
|
||||
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
|
||||
@ -194,21 +194,21 @@ public void testTaskFailWithUnusedContainer() throws Exception {
|
||||
app.waitForState(job, JobState.FAILED);
|
||||
}
|
||||
|
||||
static class FailingTaskWithUnusedContainer extends MRApp {
|
||||
static class MRAppWithFailingTaskAndUnusedContainer extends MRApp {
|
||||
|
||||
public FailingTaskWithUnusedContainer() {
|
||||
public MRAppWithFailingTaskAndUnusedContainer() {
|
||||
super(1, 0, false, "TaskFailWithUnsedContainer", true);
|
||||
}
|
||||
|
||||
protected ContainerLauncher createContainerLauncher(AppContext context,
|
||||
boolean isLocal) {
|
||||
@Override
|
||||
protected ContainerLauncher createContainerLauncher(AppContext context) {
|
||||
return new ContainerLauncherImpl(context) {
|
||||
@Override
|
||||
public void handle(ContainerLauncherEvent event) {
|
||||
|
||||
switch (event.getType()) {
|
||||
case CONTAINER_REMOTE_LAUNCH:
|
||||
super.handle(event);
|
||||
super.handle(event); // Unused event and container.
|
||||
break;
|
||||
case CONTAINER_REMOTE_CLEANUP:
|
||||
getContext().getEventHandler().handle(
|
||||
|
@ -24,10 +24,10 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
@ -195,6 +195,7 @@ public void checkTaskStateTypeConversion() {
|
||||
public static void main(String[] args) throws Exception {
|
||||
TestMRApp t = new TestMRApp();
|
||||
t.testMapReduce();
|
||||
t.testZeroMapReduces();
|
||||
t.testCommitPending();
|
||||
t.testCompletedMapsForReduceSlowstart();
|
||||
t.testJobError();
|
||||
|
@ -92,6 +92,60 @@ public void testHistoryEvents() throws Exception {
|
||||
parsedJob.getState());
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that all the events are flushed on stopping the HistoryHandler
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testEventsFlushOnStop() throws Exception {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(MRJobConfig.USER_NAME, "test");
|
||||
MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this
|
||||
.getClass().getName(), true);
|
||||
app.submit(conf);
|
||||
Job job = app.getContext().getAllJobs().values().iterator().next();
|
||||
JobId jobId = job.getID();
|
||||
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
|
||||
app.waitForState(job, JobState.SUCCEEDED);
|
||||
|
||||
// make sure all events are flushed
|
||||
app.waitForState(Service.STATE.STOPPED);
|
||||
/*
|
||||
* Use HistoryContext to read logged events and verify the number of
|
||||
* completed maps
|
||||
*/
|
||||
HistoryContext context = new JobHistory();
|
||||
((JobHistory) context).init(conf);
|
||||
Job parsedJob = context.getJob(jobId);
|
||||
Assert.assertEquals("CompletedMaps not correct", 1, parsedJob
|
||||
.getCompletedMaps());
|
||||
|
||||
Map<TaskId, Task> tasks = parsedJob.getTasks();
|
||||
Assert.assertEquals("No of tasks not correct", 1, tasks.size());
|
||||
verifyTask(tasks.values().iterator().next());
|
||||
|
||||
Map<TaskId, Task> maps = parsedJob.getTasks(TaskType.MAP);
|
||||
Assert.assertEquals("No of maps not correct", 1, maps.size());
|
||||
|
||||
Assert.assertEquals("Job state not currect", JobState.SUCCEEDED,
|
||||
parsedJob.getState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobHistoryEventHandlerIsFirstServiceToStop() {
|
||||
MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this
|
||||
.getClass().getName(), true);
|
||||
Configuration conf = new Configuration();
|
||||
app.init(conf);
|
||||
Service[] services = app.getServices().toArray(new Service[0]);
|
||||
// Verifying that it is the last to be added is same as verifying that it is
|
||||
// the first to be stopped. CompositeService related tests already validate
|
||||
// this.
|
||||
Assert.assertEquals("JobHistoryEventHandler",
|
||||
services[services.length - 1].getName());
|
||||
}
|
||||
|
||||
private void verifyTask(Task task) {
|
||||
Assert.assertEquals("Task state not currect", TaskState.SUCCEEDED,
|
||||
task.getState());
|
||||
@ -116,14 +170,43 @@ public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
|
||||
@Override
|
||||
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
||||
AppContext context) {
|
||||
JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
|
||||
getStartCount());
|
||||
JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(
|
||||
context, getStartCount());
|
||||
return eventHandler;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* MRapp with special HistoryEventHandler that writes events only during stop.
|
||||
* This is to simulate events that don't get written by the eventHandling
|
||||
* thread due to say a slow DFS and verify that they are flushed during stop.
|
||||
*/
|
||||
private static class MRAppWithSpecialHistoryHandler extends MRApp {
|
||||
|
||||
public MRAppWithSpecialHistoryHandler(int maps, int reduces,
|
||||
boolean autoComplete, String testName, boolean cleanOnStart) {
|
||||
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
||||
AppContext context) {
|
||||
return new JobHistoryEventHandler(context, getStartCount()) {
|
||||
@Override
|
||||
public void start() {
|
||||
// Don't start any event draining thread.
|
||||
super.eventHandlingThread = new Thread();
|
||||
super.eventHandlingThread.start();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
TestJobHistoryEvents t = new TestJobHistoryEvents();
|
||||
t.testHistoryEvents();
|
||||
t.testEventsFlushOnStop();
|
||||
t.testJobHistoryEventHandlerIsFirstServiceToStop();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user