MAPREDUCE-2751. Modified NodeManager to stop leaving around local files after application finishes. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1180071 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-07 15:23:20 +00:00
parent 639121c743
commit 02a81203bd
10 changed files with 501 additions and 26 deletions

View File

@ -1540,6 +1540,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3141. Fix the broken MRAppMaster to work over YARN in security MAPREDUCE-3141. Fix the broken MRAppMaster to work over YARN in security
mode.(vinodkv) mode.(vinodkv)
MAPREDUCE-2751. Modified NodeManager to stop leaving around local files
after application finishes. (Siddharth Seth via vinodkv)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -199,13 +199,18 @@ public void deleteAsUser(String user, Path subDir, Path... baseDirs)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (baseDirs == null || baseDirs.length == 0) { if (baseDirs == null || baseDirs.length == 0) {
LOG.info("Deleting absolute path : " + subDir); LOG.info("Deleting absolute path : " + subDir);
lfs.delete(subDir, true); if (!lfs.delete(subDir, true)) {
//Maybe retry
LOG.warn("delete returned false for path: [" + subDir + "]");
}
return; return;
} }
for (Path baseDir : baseDirs) { for (Path baseDir : baseDirs) {
Path del = subDir == null ? baseDir : new Path(baseDir, subDir); Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
LOG.info("Deleting path : " + del); LOG.info("Deleting path : " + del);
lfs.delete(del, true); if (!lfs.delete(del, true)) {
LOG.warn("delete returned false for path: [" + del + "]");
}
} }
} }

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*; import static java.util.concurrent.TimeUnit.*;
@ -125,6 +124,7 @@ public void run() {
} }
} else { } else {
try { try {
LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
exec.deleteAsUser(user, subDir, baseDirs); exec.deleteAsUser(user, subDir, baseDirs);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to delete as user " + user, e); LOG.warn("Failed to delete as user " + user, e);

View File

@ -286,6 +286,8 @@ public StartContainerResponse startContainer(StartContainerRequest request)
StartContainerResponse response = StartContainerResponse response =
recordFactory.newRecordInstance(StartContainerResponse.class); recordFactory.newRecordInstance(StartContainerResponse.class);
response.addAllServiceResponse(auxiluaryServices.getMeta()); response.addAllServiceResponse(auxiluaryServices.getMeta());
// TODO launchedContainer misplaced -> doesn't necessarily mean a container
// launch. A finished Application will not launch containers.
metrics.launchedContainer(); metrics.launchedContainer();
metrics.allocateContainer(launchContext.getResource()); metrics.allocateContainer(launchContext.getResource());
return response; return response;

View File

@ -152,6 +152,7 @@ ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
/** /**
* Notify services of new application. * Notify services of new application.
*/ */
@SuppressWarnings("unchecked")
static class AppInitTransition implements static class AppInitTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> { SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override @Override
@ -180,6 +181,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
} }
} }
@SuppressWarnings("unchecked")
static class AppInitDoneTransition implements static class AppInitDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> { SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override @Override
@ -199,6 +201,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
} }
} }
@SuppressWarnings("unchecked")
static class DuplicateAppInitTransition implements static class DuplicateAppInitTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> { SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override @Override
@ -229,6 +232,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
} }
} }
@SuppressWarnings("unchecked")
void handleAppFinishWithContainersCleanedup() { void handleAppFinishWithContainersCleanedup() {
// Delete Application level resources // Delete Application level resources
this.dispatcher.getEventHandler().handle( this.dispatcher.getEventHandler().handle(
@ -238,6 +242,7 @@ void handleAppFinishWithContainersCleanedup() {
// TODO: Trigger the LogsManager // TODO: Trigger the LogsManager
} }
@SuppressWarnings("unchecked")
static class AppFinishTriggeredTransition static class AppFinishTriggeredTransition
implements implements
MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> { MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
@ -286,6 +291,7 @@ public ApplicationState transition(ApplicationImpl app,
} }
@SuppressWarnings("unchecked")
static class AppCompletelyDoneTransition implements static class AppCompletelyDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> { SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override @Override

View File

@ -27,7 +27,6 @@
import java.io.PrintStream; import java.io.PrintStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -124,19 +123,18 @@ public Integer call() {
FileContext lfs = FileContext.getLocalFSFileContext(); FileContext lfs = FileContext.getLocalFSFileContext();
LocalDirAllocator lDirAllocator = LocalDirAllocator lDirAllocator =
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); // TODO new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); // TODO
Path nmPrivateContainerScriptPath = Path nmPrivateContainerScriptPath =
lDirAllocator.getLocalPathForWrite( lDirAllocator.getLocalPathForWrite(
ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ appIdStr + Path.SEPARATOR + containerIdStr + CONTAINER_SCRIPT, this.conf);
+ Path.SEPARATOR + CONTAINER_SCRIPT, this.conf);
Path nmPrivateTokensPath = Path nmPrivateTokensPath =
lDirAllocator.getLocalPathForWrite( lDirAllocator.getLocalPathForWrite(
ResourceLocalizationService.NM_PRIVATE_DIR getContainerPrivateDir(appIdStr, containerIdStr)
+ Path.SEPARATOR
+ containerIdStr
+ Path.SEPARATOR + Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
containerIdStr), this.conf); containerIdStr), this.conf);
DataOutputStream containerScriptOutStream = null; DataOutputStream containerScriptOutStream = null;
DataOutputStream tokensOutStream = null; DataOutputStream tokensOutStream = null;
@ -229,6 +227,16 @@ public Integer call() {
return 0; return 0;
} }
private String getContainerPrivateDir(String appIdStr, String containerIdStr) {
return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
+ Path.SEPARATOR;
}
private String getAppPrivateDir(String appIdStr) {
return ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+ appIdStr;
}
private static class ShellScriptBuilder { private static class ShellScriptBuilder {
private final StringBuilder sb; private final StringBuilder sb;

View File

@ -20,9 +20,12 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@ -37,6 +40,9 @@
class LocalResourcesTrackerImpl implements LocalResourcesTracker { class LocalResourcesTrackerImpl implements LocalResourcesTracker {
static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class); static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
private static final String RANDOM_DIR_REGEX = "-?\\d+";
private static final Pattern RANDOM_DIR_PATTERN = Pattern
.compile(RANDOM_DIR_REGEX);
private final String user; private final String user;
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
@ -83,28 +89,44 @@ public boolean contains(LocalResourceRequest resource) {
@Override @Override
public boolean remove(LocalizedResource rem, DeletionService delService) { public boolean remove(LocalizedResource rem, DeletionService delService) {
// current synchronization guaranteed by crude RLS event for cleanup // current synchronization guaranteed by crude RLS event for cleanup
LocalizedResource rsrc = localrsrc.get(rem.getRequest()); LocalizedResource rsrc = localrsrc.get(rem.getRequest());
if (null == rsrc) { if (null == rsrc) {
LOG.error("Attempt to remove absent resource: " + rem.getRequest() + LOG.error("Attempt to remove absent resource: " + rem.getRequest()
" from " + getUser()); + " from " + getUser());
return true; return true;
} }
if (rsrc.getRefCount() > 0 if (rsrc.getRefCount() > 0
|| ResourceState.DOWNLOADING.equals(rsrc.getState()) || ResourceState.DOWNLOADING.equals(rsrc.getState()) || rsrc != rem) {
|| rsrc != rem) {
// internal error // internal error
LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount"); LOG.error("Attempt to remove resource: " + rsrc
+ " with non-zero refcount");
assert false; assert false;
return false; return false;
} }
localrsrc.remove(rem.getRequest());
if (ResourceState.LOCALIZED.equals(rsrc.getState())) { if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), rsrc.getLocalPath()); delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
} }
return true; return true;
} }
/**
* Returns the path upto the random directory component.
*/
private Path getPathToDelete(Path localPath) {
Path delPath = localPath.getParent();
String name = delPath.getName();
Matcher matcher = RANDOM_DIR_PATTERN.matcher(name);
if (matcher.matches()) {
return delPath;
} else {
LOG.warn("Random directroy component did not match. " +
"Deleting localized path only");
return localPath;
}
}
@Override @Override
public String getUser() { public String getUser() {
return user; return user;
@ -114,5 +136,4 @@ public String getUser() {
public Iterator<LocalizedResource> iterator() { public Iterator<LocalizedResource> iterator() {
return localrsrc.values().iterator(); return localrsrc.values().iterator();
} }
} }

View File

@ -304,6 +304,7 @@ public void handle(LocalizationEvent event) {
retain.addResources(t); retain.addResources(t);
LOG.debug("Resource cleanup " + t.getUser() + ":" + retain); LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
} }
//TODO Check if appRsrcs should also be added to the retention set.
break; break;
case CLEANUP_CONTAINER_RESOURCES: case CLEANUP_CONTAINER_RESOURCES:
ContainerLocalizationCleanupEvent rsrcCleanup = ContainerLocalizationCleanupEvent rsrcCleanup =
@ -336,6 +337,7 @@ public void handle(LocalizationEvent event) {
delService.delete(userName, containerDir, new Path[] {}); delService.delete(userName, containerDir, new Path[] {});
// Delete the nmPrivate container-dir // Delete the nmPrivate container-dir
Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr); Path appSysDir = new Path(sysDir, appIDStr);
Path containerSysDir = new Path(appSysDir, containerIDStr); Path containerSysDir = new Path(appSysDir, containerIDStr);
@ -762,14 +764,16 @@ LocalizerHeartbeatResponse update(
@Override @Override
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
public void run() { public void run() {
Path nmPrivateCTokensPath = null;
try { try {
// Use LocalDirAllocator to get nmPrivateDir // Use LocalDirAllocator to get nmPrivateDir
Path nmPrivateCTokensPath = nmPrivateCTokensPath =
localDirsSelector.getLocalPathForWrite( localDirsSelector.getLocalPathForWrite(
NM_PRIVATE_DIR NM_PRIVATE_DIR
+ Path.SEPARATOR + Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
localizerId), getConfig()); localizerId), getConfig());
// 0) init queue, etc. // 0) init queue, etc.
// 1) write credentials to private dir // 1) write credentials to private dir
DataOutputStream tokenOut = null; DataOutputStream tokenOut = null;
@ -811,6 +815,7 @@ public void run() {
for (LocalizerResourceRequestEvent event : scheduled.values()) { for (LocalizerResourceRequestEvent event : scheduled.values()) {
event.getResource().unlock(); event.getResource().unlock();
} }
delService.delete(null, nmPrivateCTokensPath, new Path[] {});
} }
} }

View File

@ -0,0 +1,418 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
public class TestApplication {
/**
* All container start events before application running.
*/
@Test
public void testApplicationInit1() {
WrappedApplication wa = null;
try {
wa = new WrappedApplication(1, 314159265358979L, "yak", 3);
wa.initApplication(1);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
assertEquals(1, wa.app.getContainers().size());
wa.initApplication(0);
wa.initApplication(2);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
assertEquals(3, wa.app.getContainers().size());
wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
for (int i = 0; i < wa.containers.size(); i++) {
verify(wa.containerBus).handle(
argThat(new ContainerInitMatcher(wa.containers.get(i)
.getContainerID())));
}
} finally {
if (wa != null)
wa.finished();
}
}
/**
* Container start events after Application Running
*/
@Test
public void testApplicationInit2() {
WrappedApplication wa = null;
try {
wa = new WrappedApplication(2, 314159265358979L, "yak", 3);
wa.initApplication(0);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
assertEquals(1, wa.app.getContainers().size());
wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
verify(wa.containerBus).handle(
argThat(new ContainerInitMatcher(wa.containers.get(0)
.getContainerID())));
wa.initApplication(1);
wa.initApplication(2);
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
assertEquals(3, wa.app.getContainers().size());
for (int i = 1; i < wa.containers.size(); i++) {
verify(wa.containerBus).handle(
argThat(new ContainerInitMatcher(wa.containers.get(i)
.getContainerID())));
}
} finally {
if (wa != null)
wa.finished();
}
}
/**
* App state RUNNING after all containers complete, before RM sends
* APP_FINISHED
*/
@Test
public void testAppRunningAfterContainersComplete() {
WrappedApplication wa = null;
try {
wa = new WrappedApplication(3, 314159265358979L, "yak", 3);
wa.initApplication(-1);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
wa.containerFinished(0);
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
assertEquals(2, wa.app.getContainers().size());
wa.containerFinished(1);
wa.containerFinished(2);
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
assertEquals(0, wa.app.getContainers().size());
} finally {
if (wa != null)
wa.finished();
}
}
@Test
@SuppressWarnings("unchecked")
public void testAppFinishedOnRunningContainers() {
WrappedApplication wa = null;
try {
wa = new WrappedApplication(4, 314159265358979L, "yak", 3);
wa.initApplication(-1);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
wa.containerFinished(0);
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
assertEquals(2, wa.app.getContainers().size());
wa.appFinished();
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
wa.app.getApplicationState());
assertEquals(2, wa.app.getContainers().size());
for (int i = 1; i < wa.containers.size(); i++) {
verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(i)
.getContainerID())));
}
wa.containerFinished(1);
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
wa.app.getApplicationState());
assertEquals(1, wa.app.getContainers().size());
reset(wa.localizerBus);
wa.containerFinished(2);
// All containers finished. Cleanup should be called.
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
wa.app.getApplicationState());
assertEquals(0, wa.app.getContainers().size());
verify(wa.localizerBus).handle(
refEq(new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
wa.appResourcesCleanedup();
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
} finally {
if (wa != null)
wa.finished();
}
}
@Test
@SuppressWarnings("unchecked")
public void testAppFinishedOnCompletedContainers() {
WrappedApplication wa = null;
try {
wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
wa.initApplication(-1);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
reset(wa.localizerBus);
wa.containerFinished(0);
wa.containerFinished(1);
wa.containerFinished(2);
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
assertEquals(0, wa.app.getContainers().size());
wa.appFinished();
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
wa.app.getApplicationState());
verify(wa.localizerBus).handle(
refEq(new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
wa.appResourcesCleanedup();
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
} finally {
if (wa != null)
wa.finished();
}
}
//TODO Re-work after Application transitions are changed.
// @Test
@SuppressWarnings("unchecked")
public void testStartContainerAfterAppFinished() {
WrappedApplication wa = null;
try {
wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
wa.initApplication(-1);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
reset(wa.localizerBus);
wa.containerFinished(0);
wa.containerFinished(1);
wa.containerFinished(2);
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
assertEquals(0, wa.app.getContainers().size());
wa.appFinished();
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
wa.app.getApplicationState());
verify(wa.localizerBus).handle(
refEq(new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
wa.appResourcesCleanedup();
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
} finally {
if (wa != null)
wa.finished();
}
}
//TODO Re-work after Application transitions are changed.
// @Test
@SuppressWarnings("unchecked")
public void testAppFinishedOnIniting() {
// AM may send a startContainer() - AM APP_FINIHSED processed after
// APP_FINISHED on another NM
WrappedApplication wa = null;
try {
wa = new WrappedApplication(1, 314159265358979L, "yak", 3);
wa.initApplication(0);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
assertEquals(1, wa.app.getContainers().size());
reset(wa.localizerBus);
wa.appFinished();
verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(0)
.getContainerID())));
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
wa.app.getApplicationState());
wa.containerFinished(0);
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
wa.app.getApplicationState());
verify(wa.localizerBus).handle(
refEq(new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
wa.initApplication(1);
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
wa.app.getApplicationState());
assertEquals(0, wa.app.getContainers().size());
wa.appResourcesCleanedup();
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
} finally {
if (wa != null)
wa.finished();
}
}
private class ContainerKillMatcher extends ArgumentMatcher<ContainerEvent> {
private ContainerId cId;
public ContainerKillMatcher(ContainerId cId) {
this.cId = cId;
}
@Override
public boolean matches(Object argument) {
if (argument instanceof ContainerKillEvent) {
ContainerKillEvent event = (ContainerKillEvent) argument;
return event.getContainerID().equals(cId);
}
return false;
}
}
private class ContainerInitMatcher extends ArgumentMatcher<ContainerEvent> {
private ContainerId cId;
public ContainerInitMatcher(ContainerId cId) {
this.cId = cId;
}
@Override
public boolean matches(Object argument) {
if (argument instanceof ContainerInitEvent) {
ContainerInitEvent event = (ContainerInitEvent) argument;
return event.getContainerID().equals(cId);
}
return false;
}
}
@SuppressWarnings("unchecked")
private class WrappedApplication {
final DrainDispatcher dispatcher;
final EventHandler<LocalizationEvent> localizerBus;
final EventHandler<ContainersLauncherEvent> launcherBus;
final EventHandler<ContainersMonitorEvent> monitorBus;
final EventHandler<AuxServicesEvent> auxBus;
final EventHandler<ContainerEvent> containerBus;
final EventHandler<LogAggregatorEvent> logAggregationBus;
final String user;
final List<Container> containers;
final ApplicationId appId;
final Application app;
WrappedApplication(int id, long timestamp, String user, int numContainers) {
dispatcher = new DrainDispatcher();
dispatcher.init(null);
localizerBus = mock(EventHandler.class);
launcherBus = mock(EventHandler.class);
monitorBus = mock(EventHandler.class);
auxBus = mock(EventHandler.class);
containerBus = mock(EventHandler.class);
logAggregationBus = mock(EventHandler.class);
dispatcher.register(LocalizationEventType.class, localizerBus);
dispatcher.register(ContainersLauncherEventType.class, launcherBus);
dispatcher.register(ContainersMonitorEventType.class, monitorBus);
dispatcher.register(AuxServicesEventType.class, auxBus);
dispatcher.register(ContainerEventType.class, containerBus);
dispatcher.register(LogAggregatorEventType.class, logAggregationBus);
this.user = user;
this.appId = BuilderUtils.newApplicationId(timestamp, id);
app = new ApplicationImpl(dispatcher, this.user, appId, null);
containers = new ArrayList<Container>();
for (int i = 0; i < numContainers; i++) {
containers.add(createMockedContainer(this.appId, i));
}
dispatcher.start();
}
private void drainDispatcherEvents() {
dispatcher.await();
}
public void finished() {
dispatcher.stop();
}
public void initApplication(int containerNum) {
if (containerNum == -1) {
for (int i = 0; i < containers.size(); i++) {
app.handle(new ApplicationInitEvent(containers.get(i)));
}
} else {
app.handle(new ApplicationInitEvent(containers.get(containerNum)));
}
drainDispatcherEvents();
}
public void containerFinished(int containerNum) {
app.handle(new ApplicationContainerFinishedEvent(containers.get(
containerNum).getContainerID()));
drainDispatcherEvents();
}
public void applicationInited() {
app.handle(new ApplicationInitedEvent(appId));
drainDispatcherEvents();
}
public void appFinished() {
app.handle(new ApplicationEvent(appId,
ApplicationEventType.FINISH_APPLICATION));
drainDispatcherEvents();
}
public void appResourcesCleanedup() {
app.handle(new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
drainDispatcherEvents();
}
}
private Container createMockedContainer(ApplicationId appId, int containerId) {
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, containerId);
Container c = mock(Container.class);
when(c.getContainerID()).thenReturn(cId);
return c;
}
}

View File

@ -83,6 +83,7 @@
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@ -355,7 +356,8 @@ public void testLocalizationHeartbeat() throws Exception {
dispatcher.register(ContainerEventType.class, containerBus); dispatcher.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class); ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = new DeletionService(exec); DeletionService delServiceReal = new DeletionService(exec);
DeletionService delService = spy(delServiceReal);
delService.init(null); delService.init(null);
delService.start(); delService.start();
@ -407,12 +409,14 @@ public boolean matches(Object o) {
rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req)); rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs)); spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible // Sigh. Thread init of private localizer not accessible
Thread.sleep(500); Thread.sleep(1000);
dispatcher.await(); dispatcher.await();
String appStr = ConverterUtils.toString(appId); String appStr = ConverterUtils.toString(appId);
String ctnrStr = c.getContainerID().toString(); String ctnrStr = c.getContainerID().toString();
verify(exec).startLocalizer(isA(Path.class), isA(InetSocketAddress.class), ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class)); verify(exec).startLocalizer(tokenPathCaptor.capture(), isA(InetSocketAddress.class),
eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
Path localizationTokenPath = tokenPathCaptor.getValue();
// heartbeat from localizer // heartbeat from localizer
LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class); LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
@ -454,10 +458,13 @@ public boolean matches(Object o) {
}; };
dispatcher.await(); dispatcher.await();
verify(containerBus).handle(argThat(matchesContainerLoc)); verify(containerBus).handle(argThat(matchesContainerLoc));
// Verify deletion of localization token.
verify(delService).delete((String)isNull(), eq(localizationTokenPath));
} finally { } finally {
delService.stop();
dispatcher.stop();
spyService.stop(); spyService.stop();
dispatcher.stop();
delService.stop();
} }
} }