diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0124fe22bf..0d25d4aa0e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -359,6 +359,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2988. Reenabled TestLinuxContainerExecutor reflecting the current NodeManager code. (Robert Joseph Evans via vinodkv) + MAPREDUCE-3161. Improved some javadocs and fixed some typos in + YARN. (Todd Lipcon via vinodkv) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java index cb6b441743..9c1be8602f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java @@ -135,9 +135,9 @@ public void contextualize(Configuration conf, AppContext context) { lambda = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS, - MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMNOOTH_LAMBDA_MS); + MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS); smoothedValue - = conf.getBoolean(MRJobConfig.MR_AM_TASK_EXTIMATOR_EXPONENTIAL_RATE_ENABLE, true) + = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true) ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index a3e5a6cf61..88051c5f32 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -384,11 +384,11 @@ public interface MRJobConfig { MR_AM_PREFIX + "job.task.estimator.exponential.smooth.lambda-ms"; - public static final long DEFAULT_MR_AM_TASK_ESTIMATOR_SMNOOTH_LAMBDA_MS = + public static final long DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS = 1000L * 60; /** true if the smoothing rate should be exponential.*/ - public static final String MR_AM_TASK_EXTIMATOR_EXPONENTIAL_RATE_ENABLE = + public static final String MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE = MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate"; /** The number of threads used to handle task RPC calls.*/ diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 23ee648212..3efa3fcc10 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -104,7 +104,7 @@ public class ContainerManagerImpl extends CompositeService implements private Server server; private final ResourceLocalizationService rsrcLocalizationSrvc; private final ContainersLauncher containersLauncher; - private final AuxServices auxiluaryServices; + private final AuxServices auxiliaryServices; private final NodeManagerMetrics metrics; private final NodeStatusUpdater nodeStatusUpdater; @@ -137,9 +137,9 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, this.containerTokenSecretManager = containerTokenSecretManager; // Start configurable services - auxiluaryServices = new AuxServices(); - auxiluaryServices.register(this); - addService(auxiluaryServices); + auxiliaryServices = new AuxServices(); + auxiliaryServices.register(this); + addService(auxiliaryServices); this.containersMonitor = new ContainersMonitorImpl(exec, dispatcher, this.context); @@ -154,7 +154,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, dispatcher.register(ApplicationEventType.class, new ApplicationEventDispatcher()); dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc); - dispatcher.register(AuxServicesEventType.class, auxiluaryServices); + dispatcher.register(AuxServicesEventType.class, auxiliaryServices); dispatcher.register(ContainersMonitorEventType.class, containersMonitor); dispatcher.register(ContainersLauncherEventType.class, containersLauncher); dispatcher.register(LogAggregatorEventType.class, logAggregationService); @@ -213,8 +213,8 @@ public void start() { @Override public void stop() { - if (auxiluaryServices.getServiceState() == STARTED) { - auxiluaryServices.unregister(this); + if (auxiliaryServices.getServiceState() == STARTED) { + auxiliaryServices.unregister(this); } if (server != null) { server.close(); @@ -285,7 +285,7 @@ public StartContainerResponse startContainer(StartContainerRequest request) StartContainerResponse response = recordFactory.newRecordInstance(StartContainerResponse.class); - response.addAllServiceResponse(auxiluaryServices.getMeta()); + response.addAllServiceResponse(auxiliaryServices.getMeta()); // TODO launchedContainer misplaced -> doesn't necessarily mean a container // launch. A finished Application will not launch containers. metrics.launchedContainer(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index eebd1f152e..6ea9c041ad 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy; @@ -43,6 +44,10 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.ConverterUtils; +/** + * The state machine for the representation of an Application + * within the NodeManager. + */ public class ApplicationImpl implements Application { final Dispatcher dispatcher; @@ -151,6 +156,9 @@ ApplicationEventType.INIT_APPLICATION, new AppInitTransition()) /** * Notify services of new application. + * + * In particular, this requests that the {@link ResourceLocalizationService} + * localize the application-scoped resources. */ @SuppressWarnings("unchecked") static class AppInitTransition implements diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 8d3f3fe084..9b9d0e8fa2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -431,6 +431,20 @@ public void transition(ContainerImpl container, ContainerEvent event) { } + /** + * State transition when a NEW container receives the INIT_CONTAINER + * message. + * + * If there are resources to localize, sends a + * ContainerLocalizationRequest (INIT_CONTAINER_RESOURCES) + * to the ResourceLocalizationManager and enters LOCALIZING state. + * + * If there are no resources to localize, sends LAUNCH_CONTAINER event + * and enters LOCALIZED state directly. + * + * If there are any invalid resources specified, enters LOCALIZATION_FAILED + * directly. + */ @SuppressWarnings("unchecked") // dispatcher not typed static class RequestResourcesTransition implements MultipleArcTransition { @@ -513,6 +527,10 @@ public ContainerState transition(ContainerImpl container, } } + /** + * Transition when one of the requested resources for this container + * has been successfully localized. + */ @SuppressWarnings("unchecked") // dispatcher not typed static class LocalizedTransition implements MultipleArcTransition { @@ -540,6 +558,10 @@ public ContainerState transition(ContainerImpl container, } } + /** + * Transition from LOCALIZED state to RUNNING state upon receiving + * a CONTAINER_LAUNCHED event + */ @SuppressWarnings("unchecked") // dispatcher not typed static class LaunchTransition extends ContainerTransition { @Override @@ -556,6 +578,10 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Transition from RUNNING or KILLING state to EXITED_WITH_SUCCESS state + * upon EXITED_WITH_SUCCESS message. + */ @SuppressWarnings("unchecked") // dispatcher not typed static class ExitedWithSuccessTransition extends ContainerTransition { @@ -582,6 +608,10 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Transition to EXITED_WITH_FAILURE state upon + * CONTAINER_EXITED_WITH_FAILURE state. + **/ @SuppressWarnings("unchecked") // dispatcher not typed static class ExitedWithFailureTransition extends ContainerTransition { @@ -609,6 +639,9 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST + */ static class KilledExternallyTransition extends ExitedWithFailureTransition { KilledExternallyTransition() { super(true); @@ -621,6 +654,10 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Transition from LOCALIZING to LOCALIZATION_FAILED upon receiving + * RESOURCE_FAILED event. + */ static class ResourceFailedTransition implements SingleArcTransition { @Override @@ -638,7 +675,11 @@ public void transition(ContainerImpl container, ContainerEvent event) { container.metrics.endInitingContainer(); } } - + + /** + * Transition from LOCALIZING to KILLING upon receiving + * KILL_CONTAINER event. + */ static class KillDuringLocalizationTransition implements SingleArcTransition { @Override @@ -652,6 +693,10 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Remain in KILLING state when receiving a RESOURCE_LOCALIZED request + * while in the process of killing. + */ static class LocalizedResourceDuringKillTransition implements SingleArcTransition { @Override @@ -669,6 +714,11 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Transitions upon receiving KILL_CONTAINER: + * - LOCALIZED -> KILLING + * - RUNNING -> KILLING + */ @SuppressWarnings("unchecked") // dispatcher not typed static class KillTransition implements SingleArcTransition { @@ -683,6 +733,10 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL + * upon receiving CONTAINER_KILLED_ON_REQUEST. + */ static class ContainerKilledTransition implements SingleArcTransition { @Override @@ -696,6 +750,13 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Handle the following transitions: + * - NEW -> DONE upon KILL_CONTAINER + * - {LOCALIZATION_FAILED, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, + * KILLING, CONTAINER_CLEANEDUP_AFTER_KILL} + * -> DONE upon CONTAINER_RESOURCES_CLEANEDUP + */ static class ContainerDoneTransition implements SingleArcTransition { @Override @@ -703,7 +764,10 @@ public void transition(ContainerImpl container, ContainerEvent event) { container.finished(); } } - + + /** + * Update diagnostics, staying in the same state. + */ static class ContainerDiagnosticsUpdateTransition implements SingleArcTransition { @Override diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index d758e885a8..98f665404f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -112,7 +112,7 @@ public boolean remove(LocalizedResource rem, DeletionService delService) { /** - * Returns the path upto the random directory component. + * Returns the path up to the random directory component. */ private Path getPathToDelete(Path localPath) { Path delPath = localPath.getParent(); @@ -121,7 +121,7 @@ private Path getPathToDelete(Path localPath) { if (matcher.matches()) { return delPath; } else { - LOG.warn("Random directroy component did not match. " + + LOG.warn("Random directory component did not match. " + "Deleting localized path only"); return localPath; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index c6b68215a8..09d756ea6c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -133,8 +133,18 @@ public class ResourceLocalizationService extends CompositeService private final ScheduledExecutorService cacheCleanup; private final LocalResourcesTracker publicRsrc; + + /** + * Map of LocalResourceTrackers keyed by username, for private + * resources. + */ private final ConcurrentMap privateRsrc = new ConcurrentHashMap(); + + /** + * Map of LocalResourceTrackers keyed by appid, for application + * resources. + */ private final ConcurrentMap appRsrc = new ConcurrentHashMap(); @@ -251,140 +261,167 @@ public void stop() { } @Override - @SuppressWarnings("unchecked") // dispatcher not typed public void handle(LocalizationEvent event) { - String userName; - String appIDStr; - Container c; - Map> rsrcs; - LocalResourcesTracker tracker; // TODO: create log dir as $logdir/$user/$appId switch (event.getType()) { case INIT_APPLICATION_RESOURCES: - Application app = - ((ApplicationLocalizationEvent)event).getApplication(); - // 0) Create application tracking structs - userName = app.getUser(); - privateRsrc.putIfAbsent(userName, - new LocalResourcesTrackerImpl(userName, dispatcher)); - if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()), - new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) { - LOG.warn("Initializing application " + app + " already present"); - assert false; // TODO: FIXME assert doesn't help - // ^ The condition is benign. Tests should fail and it - // should appear in logs, but it's an internal error - // that should have no effect on applications - } - // 1) Signal container init - dispatcher.getEventHandler().handle(new ApplicationInitedEvent( - app.getAppId())); + handleInitApplicationResources( + ((ApplicationLocalizationEvent)event).getApplication()); break; case INIT_CONTAINER_RESOURCES: - ContainerLocalizationRequestEvent rsrcReqs = - (ContainerLocalizationRequestEvent) event; - c = rsrcReqs.getContainer(); - LocalizerContext ctxt = new LocalizerContext( - c.getUser(), c.getContainerID(), c.getCredentials()); - rsrcs = rsrcReqs.getRequestedResources(); - for (Map.Entry> e : - rsrcs.entrySet()) { - tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), - c.getContainerID().getApplicationAttemptId().getApplicationId()); - for (LocalResourceRequest req : e.getValue()) { - tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt)); - } - } + handleInitContainerResources((ContainerLocalizationRequestEvent) event); break; case CACHE_CLEANUP: - ResourceRetentionSet retain = - new ResourceRetentionSet(delService, cacheTargetSize); - retain.addResources(publicRsrc); - LOG.debug("Resource cleanup (public) " + retain); - for (LocalResourcesTracker t : privateRsrc.values()) { - retain.addResources(t); - LOG.debug("Resource cleanup " + t.getUser() + ":" + retain); - } - //TODO Check if appRsrcs should also be added to the retention set. + handleCacheCleanup(event); break; case CLEANUP_CONTAINER_RESOURCES: - ContainerLocalizationCleanupEvent rsrcCleanup = - (ContainerLocalizationCleanupEvent) event; - c = rsrcCleanup.getContainer(); - rsrcs = rsrcCleanup.getResources(); - for (Map.Entry> e : - rsrcs.entrySet()) { - tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), - c.getContainerID().getApplicationAttemptId().getApplicationId()); - for (LocalResourceRequest req : e.getValue()) { - tracker.handle(new ResourceReleaseEvent(req, c.getContainerID())); - } - } - - // Delete the container directories - userName = c.getUser(); - String containerIDStr = c.toString(); - appIDStr = - ConverterUtils.toString( - c.getContainerID().getApplicationAttemptId().getApplicationId()); - for (Path localDir : localDirs) { - - // Delete the user-owned container-dir - Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE); - Path userdir = new Path(usersdir, userName); - Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); - Path appDir = new Path(allAppsdir, appIDStr); - Path containerDir = new Path(appDir, containerIDStr); - delService.delete(userName, containerDir, new Path[] {}); - - // Delete the nmPrivate container-dir - - Path sysDir = new Path(localDir, NM_PRIVATE_DIR); - Path appSysDir = new Path(sysDir, appIDStr); - Path containerSysDir = new Path(appSysDir, containerIDStr); - delService.delete(null, containerSysDir, new Path[] {}); - } - - dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(), - ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); + handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event); break; case DESTROY_APPLICATION_RESOURCES: - - Application application = - ((ApplicationLocalizationEvent) event).getApplication(); - LocalResourcesTracker appLocalRsrcsTracker = - appRsrc.remove(ConverterUtils.toString(application.getAppId())); - if (null == appLocalRsrcsTracker) { - LOG.warn("Removing uninitialized application " + application); - } - // TODO: What to do with appLocalRsrcsTracker? - - // Delete the application directories - userName = application.getUser(); - appIDStr = application.toString(); - for (Path localDir : localDirs) { - - // Delete the user-owned app-dir - Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE); - Path userdir = new Path(usersdir, userName); - Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); - Path appDir = new Path(allAppsdir, appIDStr); - delService.delete(userName, appDir, new Path[] {}); - - // Delete the nmPrivate app-dir - Path sysDir = new Path(localDir, NM_PRIVATE_DIR); - Path appSysDir = new Path(sysDir, appIDStr); - delService.delete(null, appSysDir, new Path[] {}); - } - - // TODO: decrement reference counts of all resources associated with this - // app - - dispatcher.getEventHandler().handle(new ApplicationEvent( - application.getAppId(), - ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)); + handleDestroyApplicationResources( + ((ApplicationLocalizationEvent)event).getApplication()); break; + default: + throw new YarnException("Unknown localization event: " + event); } } + + /** + * Handle event received the first time any container is scheduled + * by a given application. + */ + @SuppressWarnings("unchecked") + private void handleInitApplicationResources(Application app) { + // 0) Create application tracking structs + String userName = app.getUser(); + privateRsrc.putIfAbsent(userName, + new LocalResourcesTrackerImpl(userName, dispatcher)); + if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()), + new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) { + LOG.warn("Initializing application " + app + " already present"); + assert false; // TODO: FIXME assert doesn't help + // ^ The condition is benign. Tests should fail and it + // should appear in logs, but it's an internal error + // that should have no effect on applications + } + // 1) Signal container init + // + // This is handled by the ApplicationImpl state machine and allows + // containers to proceed with launching. + dispatcher.getEventHandler().handle(new ApplicationInitedEvent( + app.getAppId())); + } + + private void handleInitContainerResources( + ContainerLocalizationRequestEvent rsrcReqs) { + Container c = rsrcReqs.getContainer(); + LocalizerContext ctxt = new LocalizerContext( + c.getUser(), c.getContainerID(), c.getCredentials()); + Map> rsrcs = + rsrcReqs.getRequestedResources(); + for (Map.Entry> e : + rsrcs.entrySet()) { + LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), + c.getContainerID().getApplicationAttemptId().getApplicationId()); + for (LocalResourceRequest req : e.getValue()) { + tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt)); + } + } + } + + private void handleCacheCleanup(LocalizationEvent event) { + ResourceRetentionSet retain = + new ResourceRetentionSet(delService, cacheTargetSize); + retain.addResources(publicRsrc); + LOG.debug("Resource cleanup (public) " + retain); + for (LocalResourcesTracker t : privateRsrc.values()) { + retain.addResources(t); + LOG.debug("Resource cleanup " + t.getUser() + ":" + retain); + } + //TODO Check if appRsrcs should also be added to the retention set. + } + + + @SuppressWarnings("unchecked") + private void handleCleanupContainerResources( + ContainerLocalizationCleanupEvent rsrcCleanup) { + Container c = rsrcCleanup.getContainer(); + Map> rsrcs = + rsrcCleanup.getResources(); + for (Map.Entry> e : + rsrcs.entrySet()) { + LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), + c.getContainerID().getApplicationAttemptId().getApplicationId()); + for (LocalResourceRequest req : e.getValue()) { + tracker.handle(new ResourceReleaseEvent(req, c.getContainerID())); + } + } + + // Delete the container directories + String userName = c.getUser(); + String containerIDStr = c.toString(); + String appIDStr = ConverterUtils.toString( + c.getContainerID().getApplicationAttemptId().getApplicationId()); + for (Path localDir : localDirs) { + + // Delete the user-owned container-dir + Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE); + Path userdir = new Path(usersdir, userName); + Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); + Path appDir = new Path(allAppsdir, appIDStr); + Path containerDir = new Path(appDir, containerIDStr); + delService.delete(userName, containerDir, new Path[] {}); + + // Delete the nmPrivate container-dir + + Path sysDir = new Path(localDir, NM_PRIVATE_DIR); + Path appSysDir = new Path(sysDir, appIDStr); + Path containerSysDir = new Path(appSysDir, containerIDStr); + delService.delete(null, containerSysDir, new Path[] {}); + } + + dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(), + ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); + } + + + @SuppressWarnings({"unchecked"}) + private void handleDestroyApplicationResources(Application application) { + String userName; + String appIDStr; + LocalResourcesTracker appLocalRsrcsTracker = + appRsrc.remove(ConverterUtils.toString(application.getAppId())); + if (null == appLocalRsrcsTracker) { + LOG.warn("Removing uninitialized application " + application); + } + // TODO: What to do with appLocalRsrcsTracker? + + // Delete the application directories + userName = application.getUser(); + appIDStr = application.toString(); + for (Path localDir : localDirs) { + + // Delete the user-owned app-dir + Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE); + Path userdir = new Path(usersdir, userName); + Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); + Path appDir = new Path(allAppsdir, appIDStr); + delService.delete(userName, appDir, new Path[] {}); + + // Delete the nmPrivate app-dir + Path sysDir = new Path(localDir, NM_PRIVATE_DIR); + Path appSysDir = new Path(sysDir, appIDStr); + delService.delete(null, appSysDir, new Path[] {}); + } + + // TODO: decrement reference counts of all resources associated with this + // app + + dispatcher.getEventHandler().handle(new ApplicationEvent( + application.getAppId(), + ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)); + } + LocalResourcesTracker getLocalResourcesTracker( LocalResourceVisibility visibility, String user, ApplicationId appId) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java index 4cb2e5cd19..11bb25e943 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java @@ -22,8 +22,15 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +/** + * Event that requests that the {@link ResourceLocalizationService} localize + * a set of resources for the given container. This is generated by + * {@link ContainerImpl} during container initialization. + */ public class ContainerLocalizationRequestEvent extends ContainerLocalizationEvent { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java index 59ed0bbf29..417935e823 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java @@ -19,7 +19,11 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +/** + * Events handled by {@link ResourceLocalizationService} + */ public class LocalizationEvent extends AbstractEvent { public LocalizationEvent(LocalizationEventType event) {