MAPREDUCE-3161. Improved some javadocs and fixed some typos in YARN. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1181622 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-11 04:45:28 +00:00
parent f527f989af
commit 11b9dd4e84
10 changed files with 258 additions and 135 deletions

View File

@ -359,6 +359,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2988. Reenabled TestLinuxContainerExecutor reflecting the MAPREDUCE-2988. Reenabled TestLinuxContainerExecutor reflecting the
current NodeManager code. (Robert Joseph Evans via vinodkv) current NodeManager code. (Robert Joseph Evans via vinodkv)
MAPREDUCE-3161. Improved some javadocs and fixed some typos in
YARN. (Todd Lipcon via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -135,9 +135,9 @@ public void contextualize(Configuration conf, AppContext context) {
lambda lambda
= conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS, = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS,
MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMNOOTH_LAMBDA_MS); MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS);
smoothedValue smoothedValue
= conf.getBoolean(MRJobConfig.MR_AM_TASK_EXTIMATOR_EXPONENTIAL_RATE_ENABLE, true) = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS; ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
} }

View File

@ -384,11 +384,11 @@ public interface MRJobConfig {
MR_AM_PREFIX MR_AM_PREFIX
+ "job.task.estimator.exponential.smooth.lambda-ms"; + "job.task.estimator.exponential.smooth.lambda-ms";
public static final long DEFAULT_MR_AM_TASK_ESTIMATOR_SMNOOTH_LAMBDA_MS = public static final long DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS =
1000L * 60; 1000L * 60;
/** true if the smoothing rate should be exponential.*/ /** true if the smoothing rate should be exponential.*/
public static final String MR_AM_TASK_EXTIMATOR_EXPONENTIAL_RATE_ENABLE = public static final String MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE =
MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate"; MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate";
/** The number of threads used to handle task RPC calls.*/ /** The number of threads used to handle task RPC calls.*/

View File

@ -104,7 +104,7 @@ public class ContainerManagerImpl extends CompositeService implements
private Server server; private Server server;
private final ResourceLocalizationService rsrcLocalizationSrvc; private final ResourceLocalizationService rsrcLocalizationSrvc;
private final ContainersLauncher containersLauncher; private final ContainersLauncher containersLauncher;
private final AuxServices auxiluaryServices; private final AuxServices auxiliaryServices;
private final NodeManagerMetrics metrics; private final NodeManagerMetrics metrics;
private final NodeStatusUpdater nodeStatusUpdater; private final NodeStatusUpdater nodeStatusUpdater;
@ -137,9 +137,9 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
this.containerTokenSecretManager = containerTokenSecretManager; this.containerTokenSecretManager = containerTokenSecretManager;
// Start configurable services // Start configurable services
auxiluaryServices = new AuxServices(); auxiliaryServices = new AuxServices();
auxiluaryServices.register(this); auxiliaryServices.register(this);
addService(auxiluaryServices); addService(auxiliaryServices);
this.containersMonitor = this.containersMonitor =
new ContainersMonitorImpl(exec, dispatcher, this.context); new ContainersMonitorImpl(exec, dispatcher, this.context);
@ -154,7 +154,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
dispatcher.register(ApplicationEventType.class, dispatcher.register(ApplicationEventType.class,
new ApplicationEventDispatcher()); new ApplicationEventDispatcher());
dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc); dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
dispatcher.register(AuxServicesEventType.class, auxiluaryServices); dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor); dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher); dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
dispatcher.register(LogAggregatorEventType.class, logAggregationService); dispatcher.register(LogAggregatorEventType.class, logAggregationService);
@ -213,8 +213,8 @@ public void start() {
@Override @Override
public void stop() { public void stop() {
if (auxiluaryServices.getServiceState() == STARTED) { if (auxiliaryServices.getServiceState() == STARTED) {
auxiluaryServices.unregister(this); auxiliaryServices.unregister(this);
} }
if (server != null) { if (server != null) {
server.close(); server.close();
@ -285,7 +285,7 @@ public StartContainerResponse startContainer(StartContainerRequest request)
StartContainerResponse response = StartContainerResponse response =
recordFactory.newRecordInstance(StartContainerResponse.class); recordFactory.newRecordInstance(StartContainerResponse.class);
response.addAllServiceResponse(auxiluaryServices.getMeta()); response.addAllServiceResponse(auxiliaryServices.getMeta());
// TODO launchedContainer misplaced -> doesn't necessarily mean a container // TODO launchedContainer misplaced -> doesn't necessarily mean a container
// launch. A finished Application will not launch containers. // launch. A finished Application will not launch containers.
metrics.launchedContainer(); metrics.launchedContainer();

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
@ -43,6 +44,10 @@
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* The state machine for the representation of an Application
* within the NodeManager.
*/
public class ApplicationImpl implements Application { public class ApplicationImpl implements Application {
final Dispatcher dispatcher; final Dispatcher dispatcher;
@ -151,6 +156,9 @@ ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
/** /**
* Notify services of new application. * Notify services of new application.
*
* In particular, this requests that the {@link ResourceLocalizationService}
* localize the application-scoped resources.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
static class AppInitTransition implements static class AppInitTransition implements

View File

@ -431,6 +431,20 @@ public void transition(ContainerImpl container, ContainerEvent event) {
} }
/**
* State transition when a NEW container receives the INIT_CONTAINER
* message.
*
* If there are resources to localize, sends a
* ContainerLocalizationRequest (INIT_CONTAINER_RESOURCES)
* to the ResourceLocalizationManager and enters LOCALIZING state.
*
* If there are no resources to localize, sends LAUNCH_CONTAINER event
* and enters LOCALIZED state directly.
*
* If there are any invalid resources specified, enters LOCALIZATION_FAILED
* directly.
*/
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
static class RequestResourcesTransition implements static class RequestResourcesTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> { MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@ -513,6 +527,10 @@ public ContainerState transition(ContainerImpl container,
} }
} }
/**
* Transition when one of the requested resources for this container
* has been successfully localized.
*/
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
static class LocalizedTransition implements static class LocalizedTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> { MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@ -540,6 +558,10 @@ public ContainerState transition(ContainerImpl container,
} }
} }
/**
* Transition from LOCALIZED state to RUNNING state upon receiving
* a CONTAINER_LAUNCHED event
*/
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
static class LaunchTransition extends ContainerTransition { static class LaunchTransition extends ContainerTransition {
@Override @Override
@ -556,6 +578,10 @@ public void transition(ContainerImpl container, ContainerEvent event) {
} }
} }
/**
* Transition from RUNNING or KILLING state to EXITED_WITH_SUCCESS state
* upon EXITED_WITH_SUCCESS message.
*/
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithSuccessTransition extends ContainerTransition { static class ExitedWithSuccessTransition extends ContainerTransition {
@ -582,6 +608,10 @@ public void transition(ContainerImpl container, ContainerEvent event) {
} }
} }
/**
* Transition to EXITED_WITH_FAILURE state upon
* CONTAINER_EXITED_WITH_FAILURE state.
**/
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithFailureTransition extends ContainerTransition { static class ExitedWithFailureTransition extends ContainerTransition {
@ -609,6 +639,9 @@ public void transition(ContainerImpl container, ContainerEvent event) {
} }
} }
/**
* Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST
*/
static class KilledExternallyTransition extends ExitedWithFailureTransition { static class KilledExternallyTransition extends ExitedWithFailureTransition {
KilledExternallyTransition() { KilledExternallyTransition() {
super(true); super(true);
@ -621,6 +654,10 @@ public void transition(ContainerImpl container, ContainerEvent event) {
} }
} }
/**
* Transition from LOCALIZING to LOCALIZATION_FAILED upon receiving
* RESOURCE_FAILED event.
*/
static class ResourceFailedTransition implements static class ResourceFailedTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override @Override
@ -638,7 +675,11 @@ public void transition(ContainerImpl container, ContainerEvent event) {
container.metrics.endInitingContainer(); container.metrics.endInitingContainer();
} }
} }
/**
* Transition from LOCALIZING to KILLING upon receiving
* KILL_CONTAINER event.
*/
static class KillDuringLocalizationTransition implements static class KillDuringLocalizationTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override @Override
@ -652,6 +693,10 @@ public void transition(ContainerImpl container, ContainerEvent event) {
} }
} }
/**
* Remain in KILLING state when receiving a RESOURCE_LOCALIZED request
* while in the process of killing.
*/
static class LocalizedResourceDuringKillTransition implements static class LocalizedResourceDuringKillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override @Override
@ -669,6 +714,11 @@ public void transition(ContainerImpl container, ContainerEvent event) {
} }
} }
/**
* Transitions upon receiving KILL_CONTAINER:
* - LOCALIZED -> KILLING
* - RUNNING -> KILLING
*/
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
static class KillTransition implements static class KillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@ -683,6 +733,10 @@ public void transition(ContainerImpl container, ContainerEvent event) {
} }
} }
/**
* Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
* upon receiving CONTAINER_KILLED_ON_REQUEST.
*/
static class ContainerKilledTransition implements static class ContainerKilledTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override @Override
@ -696,6 +750,13 @@ public void transition(ContainerImpl container, ContainerEvent event) {
} }
} }
/**
* Handle the following transitions:
* - NEW -> DONE upon KILL_CONTAINER
* - {LOCALIZATION_FAILED, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE,
* KILLING, CONTAINER_CLEANEDUP_AFTER_KILL}
* -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class ContainerDoneTransition implements static class ContainerDoneTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override @Override
@ -703,7 +764,10 @@ public void transition(ContainerImpl container, ContainerEvent event) {
container.finished(); container.finished();
} }
} }
/**
* Update diagnostics, staying in the same state.
*/
static class ContainerDiagnosticsUpdateTransition implements static class ContainerDiagnosticsUpdateTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override @Override

View File

@ -112,7 +112,7 @@ public boolean remove(LocalizedResource rem, DeletionService delService) {
/** /**
* Returns the path upto the random directory component. * Returns the path up to the random directory component.
*/ */
private Path getPathToDelete(Path localPath) { private Path getPathToDelete(Path localPath) {
Path delPath = localPath.getParent(); Path delPath = localPath.getParent();
@ -121,7 +121,7 @@ private Path getPathToDelete(Path localPath) {
if (matcher.matches()) { if (matcher.matches()) {
return delPath; return delPath;
} else { } else {
LOG.warn("Random directroy component did not match. " + LOG.warn("Random directory component did not match. " +
"Deleting localized path only"); "Deleting localized path only");
return localPath; return localPath;
} }

View File

@ -133,8 +133,18 @@ public class ResourceLocalizationService extends CompositeService
private final ScheduledExecutorService cacheCleanup; private final ScheduledExecutorService cacheCleanup;
private final LocalResourcesTracker publicRsrc; private final LocalResourcesTracker publicRsrc;
/**
* Map of LocalResourceTrackers keyed by username, for private
* resources.
*/
private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc = private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
new ConcurrentHashMap<String,LocalResourcesTracker>(); new ConcurrentHashMap<String,LocalResourcesTracker>();
/**
* Map of LocalResourceTrackers keyed by appid, for application
* resources.
*/
private final ConcurrentMap<String,LocalResourcesTracker> appRsrc = private final ConcurrentMap<String,LocalResourcesTracker> appRsrc =
new ConcurrentHashMap<String,LocalResourcesTracker>(); new ConcurrentHashMap<String,LocalResourcesTracker>();
@ -251,140 +261,167 @@ public void stop() {
} }
@Override @Override
@SuppressWarnings("unchecked") // dispatcher not typed
public void handle(LocalizationEvent event) { public void handle(LocalizationEvent event) {
String userName;
String appIDStr;
Container c;
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs;
LocalResourcesTracker tracker;
// TODO: create log dir as $logdir/$user/$appId // TODO: create log dir as $logdir/$user/$appId
switch (event.getType()) { switch (event.getType()) {
case INIT_APPLICATION_RESOURCES: case INIT_APPLICATION_RESOURCES:
Application app = handleInitApplicationResources(
((ApplicationLocalizationEvent)event).getApplication(); ((ApplicationLocalizationEvent)event).getApplication());
// 0) Create application tracking structs
userName = app.getUser();
privateRsrc.putIfAbsent(userName,
new LocalResourcesTrackerImpl(userName, dispatcher));
if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
LOG.warn("Initializing application " + app + " already present");
assert false; // TODO: FIXME assert doesn't help
// ^ The condition is benign. Tests should fail and it
// should appear in logs, but it's an internal error
// that should have no effect on applications
}
// 1) Signal container init
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
app.getAppId()));
break; break;
case INIT_CONTAINER_RESOURCES: case INIT_CONTAINER_RESOURCES:
ContainerLocalizationRequestEvent rsrcReqs = handleInitContainerResources((ContainerLocalizationRequestEvent) event);
(ContainerLocalizationRequestEvent) event;
c = rsrcReqs.getContainer();
LocalizerContext ctxt = new LocalizerContext(
c.getUser(), c.getContainerID(), c.getCredentials());
rsrcs = rsrcReqs.getRequestedResources();
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
rsrcs.entrySet()) {
tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainerID().getApplicationAttemptId().getApplicationId());
for (LocalResourceRequest req : e.getValue()) {
tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
}
}
break; break;
case CACHE_CLEANUP: case CACHE_CLEANUP:
ResourceRetentionSet retain = handleCacheCleanup(event);
new ResourceRetentionSet(delService, cacheTargetSize);
retain.addResources(publicRsrc);
LOG.debug("Resource cleanup (public) " + retain);
for (LocalResourcesTracker t : privateRsrc.values()) {
retain.addResources(t);
LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
}
//TODO Check if appRsrcs should also be added to the retention set.
break; break;
case CLEANUP_CONTAINER_RESOURCES: case CLEANUP_CONTAINER_RESOURCES:
ContainerLocalizationCleanupEvent rsrcCleanup = handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
(ContainerLocalizationCleanupEvent) event;
c = rsrcCleanup.getContainer();
rsrcs = rsrcCleanup.getResources();
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
rsrcs.entrySet()) {
tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainerID().getApplicationAttemptId().getApplicationId());
for (LocalResourceRequest req : e.getValue()) {
tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
}
}
// Delete the container directories
userName = c.getUser();
String containerIDStr = c.toString();
appIDStr =
ConverterUtils.toString(
c.getContainerID().getApplicationAttemptId().getApplicationId());
for (Path localDir : localDirs) {
// Delete the user-owned container-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(allAppsdir, appIDStr);
Path containerDir = new Path(appDir, containerIDStr);
delService.delete(userName, containerDir, new Path[] {});
// Delete the nmPrivate container-dir
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr);
Path containerSysDir = new Path(appSysDir, containerIDStr);
delService.delete(null, containerSysDir, new Path[] {});
}
dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
break; break;
case DESTROY_APPLICATION_RESOURCES: case DESTROY_APPLICATION_RESOURCES:
handleDestroyApplicationResources(
Application application = ((ApplicationLocalizationEvent)event).getApplication());
((ApplicationLocalizationEvent) event).getApplication();
LocalResourcesTracker appLocalRsrcsTracker =
appRsrc.remove(ConverterUtils.toString(application.getAppId()));
if (null == appLocalRsrcsTracker) {
LOG.warn("Removing uninitialized application " + application);
}
// TODO: What to do with appLocalRsrcsTracker?
// Delete the application directories
userName = application.getUser();
appIDStr = application.toString();
for (Path localDir : localDirs) {
// Delete the user-owned app-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(allAppsdir, appIDStr);
delService.delete(userName, appDir, new Path[] {});
// Delete the nmPrivate app-dir
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr);
delService.delete(null, appSysDir, new Path[] {});
}
// TODO: decrement reference counts of all resources associated with this
// app
dispatcher.getEventHandler().handle(new ApplicationEvent(
application.getAppId(),
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
break; break;
default:
throw new YarnException("Unknown localization event: " + event);
} }
} }
/**
* Handle event received the first time any container is scheduled
* by a given application.
*/
@SuppressWarnings("unchecked")
private void handleInitApplicationResources(Application app) {
// 0) Create application tracking structs
String userName = app.getUser();
privateRsrc.putIfAbsent(userName,
new LocalResourcesTrackerImpl(userName, dispatcher));
if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
LOG.warn("Initializing application " + app + " already present");
assert false; // TODO: FIXME assert doesn't help
// ^ The condition is benign. Tests should fail and it
// should appear in logs, but it's an internal error
// that should have no effect on applications
}
// 1) Signal container init
//
// This is handled by the ApplicationImpl state machine and allows
// containers to proceed with launching.
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
app.getAppId()));
}
private void handleInitContainerResources(
ContainerLocalizationRequestEvent rsrcReqs) {
Container c = rsrcReqs.getContainer();
LocalizerContext ctxt = new LocalizerContext(
c.getUser(), c.getContainerID(), c.getCredentials());
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
rsrcReqs.getRequestedResources();
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
rsrcs.entrySet()) {
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainerID().getApplicationAttemptId().getApplicationId());
for (LocalResourceRequest req : e.getValue()) {
tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
}
}
}
private void handleCacheCleanup(LocalizationEvent event) {
ResourceRetentionSet retain =
new ResourceRetentionSet(delService, cacheTargetSize);
retain.addResources(publicRsrc);
LOG.debug("Resource cleanup (public) " + retain);
for (LocalResourcesTracker t : privateRsrc.values()) {
retain.addResources(t);
LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
}
//TODO Check if appRsrcs should also be added to the retention set.
}
@SuppressWarnings("unchecked")
private void handleCleanupContainerResources(
ContainerLocalizationCleanupEvent rsrcCleanup) {
Container c = rsrcCleanup.getContainer();
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
rsrcCleanup.getResources();
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
rsrcs.entrySet()) {
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainerID().getApplicationAttemptId().getApplicationId());
for (LocalResourceRequest req : e.getValue()) {
tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
}
}
// Delete the container directories
String userName = c.getUser();
String containerIDStr = c.toString();
String appIDStr = ConverterUtils.toString(
c.getContainerID().getApplicationAttemptId().getApplicationId());
for (Path localDir : localDirs) {
// Delete the user-owned container-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(allAppsdir, appIDStr);
Path containerDir = new Path(appDir, containerIDStr);
delService.delete(userName, containerDir, new Path[] {});
// Delete the nmPrivate container-dir
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr);
Path containerSysDir = new Path(appSysDir, containerIDStr);
delService.delete(null, containerSysDir, new Path[] {});
}
dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
}
@SuppressWarnings({"unchecked"})
private void handleDestroyApplicationResources(Application application) {
String userName;
String appIDStr;
LocalResourcesTracker appLocalRsrcsTracker =
appRsrc.remove(ConverterUtils.toString(application.getAppId()));
if (null == appLocalRsrcsTracker) {
LOG.warn("Removing uninitialized application " + application);
}
// TODO: What to do with appLocalRsrcsTracker?
// Delete the application directories
userName = application.getUser();
appIDStr = application.toString();
for (Path localDir : localDirs) {
// Delete the user-owned app-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(allAppsdir, appIDStr);
delService.delete(userName, appDir, new Path[] {});
// Delete the nmPrivate app-dir
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr);
delService.delete(null, appSysDir, new Path[] {});
}
// TODO: decrement reference counts of all resources associated with this
// app
dispatcher.getEventHandler().handle(new ApplicationEvent(
application.getAppId(),
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
}
LocalResourcesTracker getLocalResourcesTracker( LocalResourcesTracker getLocalResourcesTracker(
LocalResourceVisibility visibility, String user, ApplicationId appId) { LocalResourceVisibility visibility, String user, ApplicationId appId) {

View File

@ -22,8 +22,15 @@
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
/**
* Event that requests that the {@link ResourceLocalizationService} localize
* a set of resources for the given container. This is generated by
* {@link ContainerImpl} during container initialization.
*/
public class ContainerLocalizationRequestEvent extends public class ContainerLocalizationRequestEvent extends
ContainerLocalizationEvent { ContainerLocalizationEvent {

View File

@ -19,7 +19,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
/**
* Events handled by {@link ResourceLocalizationService}
*/
public class LocalizationEvent extends AbstractEvent<LocalizationEventType> { public class LocalizationEvent extends AbstractEvent<LocalizationEventType> {
public LocalizationEvent(LocalizationEventType event) { public LocalizationEvent(LocalizationEventType event) {