diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 68ca51e3e9..aa2943d7e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -115,6 +115,16 @@ + + + + + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index f977cf01a1..6577adbf6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; import org.apache.hadoop.yarn.server.timeline.NameValuePair; @@ -524,9 +525,32 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId, "container_" + appId.getClusterTimestamp() + "_000" + appId.getId() + "_01_000001" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - verifyEntityTypeFileExists(basePath, + File containerEntityFile = verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_CONTAINER.toString(), containerMetricsTimestampFileName); + Assert.assertEquals( + "Container created event needs to be published atleast once", + 1, + getNumOfStringOccurences(containerEntityFile, + ContainerMetricsConstants.CREATED_EVENT_TYPE)); + + // to avoid race condition of testcase, atleast check 4 times with sleep + // of 500ms + long numOfContainerFinishedOccurences = 0; + for (int i = 0; i < 4; i++) { + numOfContainerFinishedOccurences = + getNumOfStringOccurences(containerEntityFile, + ContainerMetricsConstants.FINISHED_EVENT_TYPE); + if (numOfContainerFinishedOccurences > 0) { + break; + } else { + Thread.sleep(500l); + } + } + Assert.assertEquals( + "Container finished event needs to be published atleast once", + 1, + numOfContainerFinishedOccurences); // Verify RM posting Application life cycle Events are getting published String appMetricsTimestampFileName = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 208c3f6c51..2df06cbc3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; /** @@ -120,4 +121,8 @@ interface QueuingContext { boolean isDistributedSchedulingEnabled(); OpportunisticContainerAllocator getContainerAllocator(); + + void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher); + + NMTimelinePublisher getNMTimelinePublisher(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index f2fd65b3e8..c9caea9063 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -506,6 +507,8 @@ public static class NMContext implements Context { private final QueuingContext queuingContext; + private NMTimelinePublisher nmTimelinePublisher; + public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, @@ -678,6 +681,16 @@ public void addRegisteredCollectors( Map newRegisteredCollectors) { this.registeredCollectors.putAll(newRegisteredCollectors); } + + @Override + public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) { + this.nmTimelinePublisher = nmMetricsPublisher; + } + + @Override + public NMTimelinePublisher getNMTimelinePublisher() { + return nmTimelinePublisher; + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 6fd1f550dc..3995a19143 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -127,6 +127,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService; @@ -144,6 +145,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -190,6 +192,8 @@ public class ContainerManagerImpl extends CompositeService implements private long waitForContainersOnShutdownMillis; + private final NMTimelinePublisher nmMetricsPublisher; + public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) { @@ -216,6 +220,8 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); + nmMetricsPublisher = createNMTimelinePublisher(context); + context.setNMTimelinePublisher(nmMetricsPublisher); this.containersMonitor = createContainersMonitor(exec); addService(this.containersMonitor); @@ -223,13 +229,16 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, new ContainerEventDispatcher()); dispatcher.register(ApplicationEventType.class, createApplicationEventDispatcher()); - dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc); + dispatcher.register(LocalizationEventType.class, + new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc, + nmMetricsPublisher)); dispatcher.register(AuxServicesEventType.class, auxiliaryServices); dispatcher.register(ContainersMonitorEventType.class, containersMonitor); dispatcher.register(ContainersLauncherEventType.class, containersLauncher); addService(dispatcher); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -430,6 +439,13 @@ protected SharedCacheUploadService createSharedCacheUploaderService() { return new SharedCacheUploadService(); } + @VisibleForTesting + protected NMTimelinePublisher createNMTimelinePublisher(Context context) { + NMTimelinePublisher nmTimelinePublisherLocal = new NMTimelinePublisher(context); + addIfService(nmTimelinePublisherLocal); + return nmTimelinePublisherLocal; + } + protected ContainersLauncher createContainersLauncher(Context context, ContainerExecutor exec) { return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this); @@ -982,9 +998,9 @@ protected void startContainerInternal( logAggregationContext)); } - this.context.getNMStateStore().storeContainer(containerId, request); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); + this.context.getNMStateStore().storeContainer(containerId, request); this.context.getContainerTokenSecretManager().startContainerSuccessful( containerTokenIdentifier); @@ -1317,6 +1333,7 @@ public void handle(ContainerEvent event) { Container c = containers.get(event.getContainerID()); if (c != null) { c.handle(event); + nmMetricsPublisher.publishContainerEvent(event); } else { LOG.warn("Event " + event + " sent to absent container " + event.getContainerID()); @@ -1325,7 +1342,6 @@ public void handle(ContainerEvent event) { } class ApplicationEventDispatcher implements EventHandler { - @Override public void handle(ApplicationEvent event) { Application app = @@ -1333,6 +1349,7 @@ public void handle(ApplicationEvent event) { event.getApplicationID()); if (app != null) { app.handle(event); + nmMetricsPublisher.publishApplicationEvent(event); } else { LOG.warn("Event " + event + " sent to absent application " + event.getApplicationID()); @@ -1340,6 +1357,25 @@ public void handle(ApplicationEvent event) { } } + private static final class LocalizationEventHandlerWrapper implements + EventHandler { + + private EventHandler origLocalizationEventHandler; + private NMTimelinePublisher timelinePublisher; + + LocalizationEventHandlerWrapper(EventHandler handler, + NMTimelinePublisher publisher) { + this.origLocalizationEventHandler = handler; + this.timelinePublisher = publisher; + } + + @Override + public void handle(LocalizationEvent event) { + origLocalizationEventHandler.handle(event); + timelinePublisher.publishLocalizationEvent(event); + } + } + @SuppressWarnings("unchecked") @Override public void handle(ContainerManagerEvent event) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java index 6b8007f3fe..9cd34ccf58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java @@ -19,18 +19,23 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; public class ApplicationContainerFinishedEvent extends ApplicationEvent { - private ContainerId containerID; + private ContainerStatus containerStatus; - public ApplicationContainerFinishedEvent( - ContainerId containerID) { - super(containerID.getApplicationAttemptId().getApplicationId(), + public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) { + super(containerStatus.getContainerId().getApplicationAttemptId().getApplicationId(), ApplicationEventType.APPLICATION_CONTAINER_FINISHED); - this.containerID = containerID; + this.containerStatus = containerStatus; } public ContainerId getContainerID() { - return this.containerID; + return containerStatus.getContainerId(); } + + public ContainerStatus getContainerStatus() { + return containerStatus; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 7571964d93..2278786860 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; @@ -69,4 +70,6 @@ public interface Container extends EventHandler { String toString(); + Priority getPriority(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 7a6e1cfa20..00bd56bd0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; @@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -197,6 +200,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, } stateMachine = stateMachineFactory.make(this); + this.context = context; } // constructor for a recovered container @@ -442,6 +446,10 @@ public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() { } } + public NMTimelinePublisher getNMTimelinePublisher() { + return context.getNMTimelinePublisher(); + } + @Override public String getUser() { this.readLock.lock(); @@ -575,7 +583,10 @@ private void sendFinishedEvents() { // Inform the application @SuppressWarnings("rawtypes") EventHandler eventHandler = dispatcher.getEventHandler(); - eventHandler.handle(new ApplicationContainerFinishedEvent(containerId)); + + ContainerStatus containerStatus = cloneAndGetContainerStatus(); + eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus)); + // Remove the container from the resource-monitor eventHandler.handle(new ContainerStopMonitoringEvent(containerId)); // Tell the logService too @@ -1187,7 +1198,8 @@ public void transition(ContainerImpl container, ContainerEvent event) { container.containerMetrics.finished(); } container.sendFinishedEvents(); - //if the current state is NEW it means the CONTAINER_INIT was never + + // if the current state is NEW it means the CONTAINER_INIT was never // sent for the event, thus no need to send the CONTAINER_STOP if (container.getCurrentState() != org.apache.hadoop.yarn.api.records.ContainerState.NEW) { @@ -1384,4 +1396,9 @@ private static boolean shouldBeUploadedToSharedCache(ContainerImpl container, ContainerRetryContext getContainerRetryContext() { return containerRetryContext; } + + @Override + public Priority getPriority() { + return containerTokenIdentifier.getPriority(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index ce00129110..df7e34d0e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -18,13 +18,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; -import java.io.IOException; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; @@ -36,10 +32,6 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -48,13 +40,13 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; public class ContainersMonitorImpl extends AbstractService implements ContainersMonitor { @@ -86,17 +78,11 @@ public class ContainersMonitorImpl extends AbstractService implements private boolean pmemCheckEnabled; private boolean vmemCheckEnabled; private boolean containersMonitorEnabled; - - private boolean publishContainerMetricsToTimelineService; private long maxVCoresAllottedForContainers; private static final long UNKNOWN_MEMORY_LIMIT = -1L; private int nodeCpuPercentageForYARN; - - // For posting entities in new timeline service in a non-blocking way - // TODO replace with event loop in TimelineClient. - private static ExecutorService threadPool; @Private public static enum ContainerMetric { @@ -215,22 +201,6 @@ protected void serviceInit(Configuration conf) throws Exception { 1) + "). Thrashing might happen."); } } - - publishContainerMetricsToTimelineService = - YarnConfiguration.systemMetricsPublisherEnabled(conf); - - if (publishContainerMetricsToTimelineService) { - LOG.info("NodeManager has been configured to publish container " + - "metrics to Timeline Service V2."); - threadPool = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); - } else { - LOG.warn("NodeManager has not been configured to publish container " + - "metrics to Timeline Service V2."); - } - super.serviceInit(conf); } @@ -274,29 +244,8 @@ protected void serviceStop() throws Exception { } } - shutdownAndAwaitTermination(); - super.serviceStop(); } - - // TODO remove threadPool after adding non-blocking call in TimelineClient - private static void shutdownAndAwaitTermination() { - if (threadPool == null) { - return; - } - threadPool.shutdown(); - try { - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { - threadPool.shutdownNow(); - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) - LOG.error("ThreadPool did not terminate"); - } - } catch (InterruptedException ie) { - threadPool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } public static class ProcessTreeInfo { private ContainerId containerId; @@ -474,9 +423,6 @@ public void run() { ContainerId containerId = entry.getKey(); ProcessTreeInfo ptInfo = entry.getValue(); - ContainerEntity entity = new ContainerEntity(); - entity.setId(containerId.toString()); - try { String pId = ptInfo.getPID(); @@ -569,26 +515,6 @@ public void run() { containerMetricsUnregisterDelayMs).recordCpuUsage ((int)cpuUsagePercentPerCore, milliVcoresUsed); } - - if (publishContainerMetricsToTimelineService) { - // if currentPmemUsage data is available - if (currentPmemUsage != - ResourceCalculatorProcessTree.UNAVAILABLE) { - TimelineMetric memoryMetric = new TimelineMetric(); - memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId); - memoryMetric.addValue(currentTime, currentPmemUsage); - entity.addMetric(memoryMetric); - } - // if cpuUsageTotalCoresPercentage data is available - if (cpuUsageTotalCoresPercentage != - ResourceCalculatorProcessTree.UNAVAILABLE) { - TimelineMetric cpuMetric = new TimelineMetric(); - cpuMetric.setId(ContainerMetric.CPU.toString() + pId); - cpuMetric.addValue(currentTime, - cpuUsageTotalCoresPercentage); - entity.addMetric(cpuMetric); - } - } boolean isMemoryOverLimit = false; String msg = ""; @@ -645,23 +571,16 @@ && isProcessTreeOverLimit(containerId.toString(), LOG.info("Removed ProcessTree with root " + pId); } + ContainerImpl container = + (ContainerImpl) context.getContainers().get(containerId); + container.getNMTimelinePublisher().reportContainerResourceUsage( + container, currentTime, pId, currentPmemUsage, + cpuUsageTotalCoresPercentage); } catch (Exception e) { // Log the exception and proceed to the next container. LOG.warn("Uncaught exception in ContainersMonitorImpl " + "while monitoring resource of " + containerId, e); } - - if (publishContainerMetricsToTimelineService) { - try { - TimelineClient timelineClient = context.getApplications().get( - containerId.getApplicationAttemptId().getApplicationId()). - getTimelineClient(); - putEntityWithoutBlocking(timelineClient, entity); - } catch (Exception e) { - LOG.error("Exception in ContainersMonitorImpl in putting " + - "resource usage metrics to timeline service.", e); - } - } } if (LOG.isDebugEnabled()) { LOG.debug("Total Resource Usage stats in NM by all containers : " @@ -684,20 +603,6 @@ && isProcessTreeOverLimit(containerId.toString(), } } } - - private void putEntityWithoutBlocking(final TimelineClient timelineClient, - final TimelineEntity entity) { - Runnable publishWrapper = new Runnable() { - public void run() { - try { - timelineClient.putEntities(entity); - } catch (IOException|YarnException e) { - LOG.error("putEntityNonBlocking get failed: " + e); - } - } - }; - threadPool.execute(publishWrapper); - } private String formatErrorMessage(String memTypeExceeded, long currentVmemUsage, long vmemLimit, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java new file mode 100644 index 0000000000..af8d94c011 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.timelineservice; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class NMTimelineEvent extends AbstractEvent { + public NMTimelineEvent(NMTimelineEventType type) { + super(type); + } + + public NMTimelineEvent(NMTimelineEventType type, long timestamp) { + super(type, timestamp); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java new file mode 100644 index 0000000000..c1129af76b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.timelineservice; + +public enum NMTimelineEventType { + // Publish the NM Timeline entity + TIMELINE_ENTITY_PUBLISH, +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java new file mode 100644 index 0000000000..2c5c300015 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -0,0 +1,376 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.timelineservice; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +public class NMTimelinePublisher extends CompositeService { + + private static final Log LOG = LogFactory.getLog(NMTimelinePublisher.class); + + private Dispatcher dispatcher; + private boolean publishSystemMetrics; + + private Context context; + + private NodeId nodeId; + + private String httpAddress; + + public NMTimelinePublisher(Context context) { + super(NMTimelinePublisher.class.getName()); + this.context = context; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + publishSystemMetrics = + YarnConfiguration.systemMetricsPublisherEnabled(conf); + + if (publishSystemMetrics) { + dispatcher = new AsyncDispatcher(); + dispatcher.register(NMTimelineEventType.class, + new ForwardingEventHandler()); + dispatcher + .register(ContainerEventType.class, new ContainerEventHandler()); + dispatcher.register(ApplicationEventType.class, + new ApplicationEventHandler()); + dispatcher.register(LocalizationEventType.class, + new LocalizationEventDispatcher()); + addIfService(dispatcher); + LOG.info("YARN system metrics publishing service is enabled"); + } else { + LOG.info("YARN system metrics publishing service is not enabled"); + } + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + // context will be updated after containerManagerImpl is started + // hence NMMetricsPublisher is added subservice of containerManagerImpl + this.nodeId = context.getNodeId(); + this.httpAddress = nodeId.getHost() + ":" + context.getHttpPort(); + } + + protected void handleNMTimelineEvent(NMTimelineEvent event) { + switch (event.getType()) { + case TIMELINE_ENTITY_PUBLISH: + putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(), + ((TimelinePublishEvent) event).getApplicationId()); + break; + default: + LOG.error("Unknown NMTimelineEvent type: " + event.getType()); + } + } + + @SuppressWarnings("unchecked") + public void reportContainerResourceUsage(Container container, + long createdTime, String pId, Long pmemUsage, + Float cpuUsageTotalCoresPercentage) { + if (publishSystemMetrics + && (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE)) { + ContainerEntity entity = + createContainerEntity(container.getContainerId()); + long currentTimeMillis = System.currentTimeMillis(); + if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) { + TimelineMetric memoryMetric = new TimelineMetric(); + memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId); + memoryMetric.addValue(currentTimeMillis, pmemUsage); + entity.addMetric(memoryMetric); + } + if (cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE) { + TimelineMetric cpuMetric = new TimelineMetric(); + cpuMetric.setId(ContainerMetric.CPU.toString() + pId); + cpuMetric.addValue(currentTimeMillis, cpuUsageTotalCoresPercentage); + entity.addMetric(cpuMetric); + } + dispatcher.getEventHandler().handle( + new TimelinePublishEvent(entity, container.getContainerId() + .getApplicationAttemptId().getApplicationId())); + } + } + + private void publishContainerCreatedEvent(ContainerEntity entity, + ContainerId containerId, Resource resource, Priority priority, + long timestamp) { + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + resource.getMemory()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, + resource.getVirtualCores()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + nodeId.getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + nodeId.getPort()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + priority.toString()); + entityInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + httpAddress); + entity.setInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(timestamp); + + entity.addEvent(tEvent); + putEntity(entity, containerId.getApplicationAttemptId().getApplicationId()); + } + + private void publishContainerFinishedEvent(ContainerStatus containerStatus, + long timeStamp) { + ContainerId containerId = containerStatus.getContainerId(); + TimelineEntity entity = createContainerEntity(containerId); + + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + containerStatus.getDiagnostics()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + containerStatus.getExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, containerStatus + .getState().toString()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(timeStamp); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity, containerId.getApplicationAttemptId().getApplicationId()); + } + + private static ContainerEntity createContainerEntity(ContainerId containerId) { + ContainerEntity entity = new ContainerEntity(); + entity.setId(containerId.toString()); + Identifier parentIdentifier = new Identifier(); + parentIdentifier + .setType(TimelineEntityType.YARN_APPLICATION_ATTEMPT.name()); + parentIdentifier.setId(containerId.getApplicationAttemptId().toString()); + entity.setParent(parentIdentifier); + return entity; + } + + private void putEntity(TimelineEntity entity, ApplicationId appId) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing the entity " + entity + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + TimelineClient timelineClient = + context.getApplications().get(appId).getTimelineClient(); + timelineClient.putEntities(entity); + } catch (Exception e) { + LOG.error("Error when publishing entity " + entity, e); + } + } + + public void publishApplicationEvent(ApplicationEvent event) { + if (!publishSystemMetrics) { + return; + } + // publish only when the desired event is received + switch (event.getType()) { + case INIT_APPLICATION: + case FINISH_APPLICATION: + case APPLICATION_CONTAINER_FINISHED: + case APPLICATION_LOG_HANDLING_FAILED: + dispatcher.getEventHandler().handle(event); + break; + + default: + if (LOG.isDebugEnabled()) { + LOG.debug(event.getType() + + " is not a desired ApplicationEvent which needs to be published by" + + " NMTimelinePublisher"); + } + break; + } + } + + public void publishContainerEvent(ContainerEvent event) { + if (!publishSystemMetrics) { + return; + } + // publish only when the desired event is received + switch (event.getType()) { + case INIT_CONTAINER: + dispatcher.getEventHandler().handle(event); + break; + + default: + if (LOG.isDebugEnabled()) { + LOG.debug(event.getType() + + " is not a desired ContainerEvent which needs to be published by" + + " NMTimelinePublisher"); + } + break; + } + } + + public void publishLocalizationEvent(LocalizationEvent event) { + if (!publishSystemMetrics) { + return; + } + // publish only when the desired event is received + switch (event.getType()) { + case CONTAINER_RESOURCES_LOCALIZED: + case INIT_CONTAINER_RESOURCES: + dispatcher.getEventHandler().handle(event); + break; + + default: + if (LOG.isDebugEnabled()) { + LOG.debug(event.getType() + + " is not a desired LocalizationEvent which needs to be published" + + " by NMTimelinePublisher"); + } + break; + } + } + + private class ApplicationEventHandler implements + EventHandler { + @Override + public void handle(ApplicationEvent event) { + switch (event.getType()) { + case APPLICATION_CONTAINER_FINISHED: + // this is actually used to publish the container Event + ApplicationContainerFinishedEvent evnt = + (ApplicationContainerFinishedEvent) event; + publishContainerFinishedEvent(evnt.getContainerStatus(), + event.getTimestamp()); + break; + default: + LOG.error("Seems like event type is captured only in " + + "publishApplicationEvent method and not handled here"); + break; + } + } + } + + private class ContainerEventHandler implements EventHandler { + @Override + public void handle(ContainerEvent event) { + ContainerId containerId = event.getContainerID(); + Container container = context.getContainers().get(containerId); + long timestamp = event.getTimestamp(); + ContainerEntity entity = createContainerEntity(containerId); + + switch (event.getType()) { + case INIT_CONTAINER: + publishContainerCreatedEvent(entity, containerId, + container.getResource(), container.getPriority(), timestamp); + break; + default: + LOG.error("Seems like event type is captured only in " + + "publishContainerEvent method and not handled here"); + break; + } + } + } + + private static final class LocalizationEventDispatcher implements + EventHandler { + @Override + public void handle(LocalizationEvent event) { + switch (event.getType()) { + case INIT_CONTAINER_RESOURCES: + case CONTAINER_RESOURCES_LOCALIZED: + // TODO after priority based flush jira is finished + break; + default: + LOG.error("Seems like event type is captured only in " + + "publishLocalizationEvent method and not handled here"); + break; + } + } + } + + /** + * EventHandler implementation which forward events to NMMetricsPublisher. + * Making use of it, NMMetricsPublisher can avoid to have a public handle + * method. + */ + private final class ForwardingEventHandler implements + EventHandler { + + @Override + public void handle(NMTimelineEvent event) { + handleNMTimelineEvent(event); + } + } + + private static class TimelinePublishEvent extends NMTimelineEvent { + private ApplicationId appId; + private TimelineEntity entityToPublish; + + public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) { + super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, System + .currentTimeMillis()); + this.appId = appId; + this.entityToPublish = entity; + } + + public ApplicationId getApplicationId() { + return appId; + } + + public TimelineEntity getTimelineEntityToPublish() { + return entityToPublish; + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 413cdd290c..8cec5efd09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerContext; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -1215,6 +1216,7 @@ public void testCompletedContainersIsRecentlyStopped() throws Exception { BuilderUtils.newContainerToken(containerId, "host", 1234, "user", BuilderUtils.newResource(1024, 1), 0, 123, "password".getBytes(), 0); + Container completedContainer = new ContainerImpl(conf, null, null, null, null, BuilderUtils.newContainerTokenIdentifier(containerToken), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index d4a95ab208..ae0cbc3e96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.Records; import org.junit.After; @@ -709,5 +710,13 @@ public boolean isDistributedSchedulingEnabled() { public OpportunisticContainerAllocator getContainerAllocator() { return null; } + + public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) { + } + + @Override + public NMTimelinePublisher getNMTimelinePublisher() { + return null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 524ea1c7e6..63f44c924f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -101,10 +101,12 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class TestContainerManagerRecovery extends BaseContainerManagerTest { @@ -722,6 +724,12 @@ public void setBlockNewContainerRequests( boolean blockNewContainerRequests) { // do nothing } + + @Override + public NMTimelinePublisher createNMTimelinePublisher(Context context) { + NMTimelinePublisher timelinePublisher = mock(NMTimelinePublisher.class); + return timelinePublisher; + } }; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index dc599e7d59..3f84cd5d65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -34,7 +34,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -599,7 +602,7 @@ public void initContainer(int containerNum) { public void containerFinished(int containerNum) { app.handle(new ApplicationContainerFinishedEvent(containers.get( - containerNum).getContainerId())); + containerNum).cloneAndGetContainerStatus())); drainDispatcherEvents(); } @@ -643,6 +646,8 @@ private Container createMockedContainer(ApplicationId appId, int containerId) { when(c.getLaunchContext()).thenReturn(launchContext); when(launchContext.getApplicationACLs()).thenReturn( new HashMap()); + when(c.cloneAndGetContainerStatus()).thenReturn(BuilderUtils.newContainerStatus(cId, + ContainerState.NEW, "", 0, Resource.newInstance(1024, 1))); return c; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 0b95dba465..b21ba4b998 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -172,4 +173,8 @@ public String getLogDir() { @Override public void setLogDir(String logDir) { } + + public Priority getPriority() { + return Priority.UNDEFINED; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index befaa83ba6..4147d42c00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -137,9 +137,19 @@ public void stopContainer(ContainerTerminationContext context) { // intercept the event of the AM container being stopped and remove the app // level collector service if (context.getContainerType() == ContainerType.APPLICATION_MASTER) { - ApplicationId appId = context.getContainerId(). - getApplicationAttemptId().getApplicationId(); - removeApplication(appId); + final ApplicationId appId = + context.getContainerId().getApplicationAttemptId().getApplicationId(); + new Thread(new Runnable() { + public void run() { + try { + // TODO Temporary Fix until solution for YARN-3995 is finalized. + Thread.sleep(1000l); + } catch (InterruptedException e) { + e.printStackTrace(); + } + removeApplication(appId); + } + }).start(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java index 7cc612d920..dafc76e896 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java @@ -98,6 +98,15 @@ public void testRemoveApplication() throws Exception { when(context.getContainerType()).thenReturn( ContainerType.APPLICATION_MASTER); auxService.stopContainer(context); + + // TODO Temporary Fix until solution for YARN-3995 is finalized + for (int i = 0; i < 4; i++) { + Thread.sleep(500l); + if (!auxService.hasApplication(appAttemptId.getApplicationId())) { + break; + } + } + // auxService should not have that app assertFalse(auxService.hasApplication(appAttemptId.getApplicationId())); auxService.close();