From 5712b8f9fd1859fe046b482889239bd164ed7dab Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Mon, 6 Apr 2015 09:31:24 -0700 Subject: [PATCH] YARN-3334. NM uses timeline client to publish container metrics to new timeline service. Contributed by Junping Du. --- .../HierarchicalTimelineEntity.java | 4 +- .../hadoop/yarn/conf/YarnConfiguration.java | 8 ++ .../distributedshell/ApplicationMaster.java | 32 +++-- .../TestDistributedShell.java | 47 +++++-- .../client/api/impl/TimelineClientImpl.java | 11 +- .../yarn/server/nodemanager/Context.java | 10 +- .../yarn/server/nodemanager/NodeManager.java | 35 +++-- .../nodemanager/NodeStatusUpdaterImpl.java | 50 +++++++- .../collectormanager/NMCollectorService.java | 11 +- .../application/Application.java | 3 + .../application/ApplicationImpl.java | 26 +++- .../monitor/ContainersMonitorImpl.java | 121 +++++++++++++++++- .../server/nodemanager/TestEventFlow.java | 2 +- .../nodemanager/TestNodeStatusUpdater.java | 9 +- .../amrmproxy/BaseAMRMProxyTest.java | 10 +- .../BaseContainerManagerTest.java | 3 +- .../TestContainerManagerRecovery.java | 4 +- .../launcher/TestContainerLaunch.java | 3 +- .../TestLocalCacheDirectoryManager.java | 2 +- .../TestResourceLocalizationService.java | 5 +- .../server/nodemanager/webapp/MockApp.java | 8 +- .../webapp/TestContainerLogsPage.java | 15 ++- .../nodemanager/webapp/TestNMAppsPage.java | 2 +- .../nodemanager/webapp/TestNMWebServer.java | 10 +- .../nodemanager/webapp/TestNMWebServices.java | 2 +- .../webapp/TestNMWebServicesApps.java | 2 +- .../webapp/TestNMWebServicesContainers.java | 2 +- .../ResourceTrackerService.java | 20 ++- .../TimelineCollectorWebService.java | 2 +- 29 files changed, 344 insertions(+), 115 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/HierarchicalTimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/HierarchicalTimelineEntity.java index 01d85cfb1a..49576dee50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/HierarchicalTimelineEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/HierarchicalTimelineEntity.java @@ -58,7 +58,9 @@ public void setParent(String type, String id) { // required by JAXB @InterfaceAudience.Private - @XmlElement(name = "children") + // comment out XmlElement here because it cause UnrecognizedPropertyException + // TODO we need a better fix + //@XmlElement(name = "children") public HashMap> getChildrenJAXB() { return children; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0a3ab3e4f0..4f43e3d9ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2749,6 +2749,14 @@ public static String getClusterId(Configuration conf) { } return clusterId; } + + public static boolean systemMetricsPublisherEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) + && conf.getBoolean( + YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); + } /* For debugging. mp configurations to system output as XML format. */ public static void main(String[] args) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 46724d693b..f301bd24ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -331,19 +331,8 @@ public static void main(String[] args) { } appMaster.run(); result = appMaster.finish(); - - threadPool.shutdown(); - - while (!threadPool.isTerminated()) { // wait for all posting thread to finish - try { - if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) { - threadPool.shutdownNow(); // send interrupt to hurry them along - } - } catch (InterruptedException e) { - LOG.warn("Timeline client service stop interrupted!"); - break; - } - } + + shutdownAndAwaitTermination(); } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); LogManager.shutdown(); @@ -357,6 +346,23 @@ public static void main(String[] args) { System.exit(2); } } + + //TODO remove threadPool after adding non-blocking call in TimelineClient + private static void shutdownAndAwaitTermination() { + threadPool.shutdown(); + try { + // Wait a while for existing tasks to terminate + if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); + if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) + LOG.error("ThreadPool did not terminate"); + } + } catch (InterruptedException ie) { + threadPool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } /** * Dump out contents of $CWD and the environment to stdout for debugging diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 6cb344a8c2..f2afd2cc2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.client.api.impl.TimelineWriter; import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient; import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -82,7 +83,10 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; +import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -140,6 +144,16 @@ private void setupInternal(int numNodeManager, float timelineVersion) conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); conf.set("mapreduce.jobhistory.address", "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10)); + // Enable ContainersMonitorImpl + conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, + LinuxResourceCalculatorPlugin.class.getName()); + conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, + ProcfsBasedProcessTree.class.getName()); + conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, + true); + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); // ATS version specific settings if (timelineVersion == 1.0f) { @@ -470,15 +484,14 @@ private void checkTimelineV2( File tmpRootFolder = new File(tmpRoot); try { Assert.assertTrue(tmpRootFolder.isDirectory()); - - // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs - String outputDirApp = tmpRoot + + String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + UserGroupInformation.getCurrentUser().getShortUserName() + (defaultFlow ? "/" + TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) + - "/0/" : "/test_flow_id/12345678/") + - appId.toString() + "/DS_APP_ATTEMPT/"; + "/0/" : "/test_flow_id/12345678/") + appId.toString(); + // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs + String outputDirApp = basePath + "/DS_APP_ATTEMPT/"; File entityFolder = new File(outputDirApp); Assert.assertTrue(entityFolder.isDirectory()); @@ -491,13 +504,7 @@ private void checkTimelineV2( File appAttemptFile = new File(appAttemptFileName); Assert.assertTrue(appAttemptFile.exists()); - String outputDirContainer = tmpRoot + - YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + - UserGroupInformation.getCurrentUser().getShortUserName() + - (defaultFlow ? "/" + - TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) + - "/0/" : "/test_flow_id/12345678/") + - appId.toString() + "/DS_CONTAINER/"; + String outputDirContainer = basePath + "/DS_CONTAINER/"; File containerFolder = new File(outputDirContainer); Assert.assertTrue(containerFolder.isDirectory()); @@ -509,6 +516,22 @@ private void checkTimelineV2( Assert.assertTrue(containerFile.exists()); String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId() + "_"; + + // Verify NM posting container metrics info. + String outputDirContainerMetrics = basePath + "/" + + TimelineEntityType.YARN_CONTAINER + "/"; + File containerMetricsFolder = new File(outputDirContainerMetrics); + Assert.assertTrue(containerMetricsFolder.isDirectory()); + + String containerMetricsTimestampFileName = "container_" + + appId.getClusterTimestamp() + "_000" + appId.getId() + + "_01_000001.thist"; + String containerMetricsFileName = outputDirContainerMetrics + + containerMetricsTimestampFileName; + + File containerMetricsFile = new File(containerMetricsFileName); + Assert.assertTrue(containerMetricsFile.exists()); + } finally { FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index d4335009b2..29883616d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -482,14 +482,11 @@ private void putObjects( } if (resp == null || resp.getClientResponseStatus() != ClientResponse.Status.OK) { - String msg = - "Failed to get the response from the timeline server."; + String msg = "Response from the timeline server is " + + ((resp == null) ? "null": + "not successful," + " HTTP error code: " + resp.getStatus() + + ", Server response:\n" + resp.getEntity(String.class)); LOG.error(msg); - if (LOG.isDebugEnabled() && resp != null) { - String output = resp.getEntity(String.class); - LOG.debug("HTTP error code: " + resp.getStatus() - + " Server response:\n" + output); - } throw new YarnException(msg); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index af896c853c..208c3f6c51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -77,13 +78,6 @@ interface QueuingContext { */ Map getRegisteredCollectors(); - /** - * Return the known collectors which get from RM for all active applications - * running on this NM. - * @return known collectors. - */ - Map getKnownCollectors(); - ConcurrentMap getContainers(); ConcurrentMap @@ -106,6 +100,8 @@ interface QueuingContext { NMStateStoreService getNMStateStore(); boolean getDecommissioned(); + + Configuration getConf(); void setDecommissioned(boolean isDecommissioned); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 0792c370aa..f2fd65b3e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -202,9 +202,10 @@ protected DeletionService createDeletionService(ContainerExecutor exec) { protected NMContext createNMContext( NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, - NMStateStoreService stateStore, boolean isDistSchedulerEnabled) { + NMStateStoreService stateStore, boolean isDistSchedulerEnabled, + Configuration conf) { return new NMContext(containerTokenSecretManager, nmTokenSecretManager, - dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled); + dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled, conf); } protected void doSecureLogin() throws IOException { @@ -337,7 +338,7 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); this.context = createNMContext(containerTokenSecretManager, - nmTokenSecretManager, nmStore, isDistSchedulingEnabled); + nmTokenSecretManager, nmStore, isDistSchedulingEnabled, conf); nodeLabelsProvider = createNodeLabelsProvider(conf); @@ -466,6 +467,9 @@ public void run() { public static class NMContext implements Context { private NodeId nodeId = null; + + private Configuration conf = null; + protected final ConcurrentMap applications = new ConcurrentHashMap(); @@ -478,9 +482,6 @@ public static class NMContext implements Context { protected Map registeredCollectors = new ConcurrentHashMap(); - protected Map knownCollectors = - new ConcurrentHashMap(); - protected final ConcurrentMap increasedContainers = new ConcurrentHashMap<>(); @@ -508,7 +509,8 @@ public static class NMContext implements Context { public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, - NMStateStoreService stateStore, boolean isDistSchedulingEnabled) { + NMStateStoreService stateStore, boolean isDistSchedulingEnabled, + Configuration conf) { this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; this.dirsHandler = dirsHandler; @@ -521,6 +523,7 @@ public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, LogAggregationReport>(); this.queuingContext = new QueuingNMContext(); this.isDistSchedulingEnabled = isDistSchedulingEnabled; + this.conf = conf; } /** @@ -540,6 +543,11 @@ public int getHttpPort() { public ConcurrentMap getApplications() { return this.applications; } + + @Override + public Configuration getConf() { + return this.conf; + } @Override public ConcurrentMap getContainers() { @@ -669,19 +677,6 @@ public Map getRegisteredCollectors() { public void addRegisteredCollectors( Map newRegisteredCollectors) { this.registeredCollectors.putAll(newRegisteredCollectors); - // Update to knownCollectors as well so it can immediately be consumed by - // this NM's TimelineClient. - this.knownCollectors.putAll(newRegisteredCollectors); - } - - @Override - public Map getKnownCollectors() { - return this.knownCollectors; - } - - public void addKnownCollectors( - Map knownCollectors) { - this.knownCollectors.putAll(knownCollectors); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index eebc1e0952..c658f49196 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -81,6 +82,8 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; @@ -906,10 +909,9 @@ public void run() { newResource.toString()); } } - - Map knownCollectors = - response.getAppCollectorsMap(); - ((NodeManager.NMContext)context).addKnownCollectors(knownCollectors); + if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) { + updateTimelineClientsAddress(response); + } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM @@ -938,6 +940,46 @@ public void run() { } } + /** + * Caller should take care of sending non null nodelabels for both + * arguments + * + * @param nodeLabelsNew + * @param nodeLabelsOld + * @return if the New node labels are diff from the older one. + */ + private boolean areNodeLabelsUpdated(Set nodeLabelsNew, + Set nodeLabelsOld) { + if (nodeLabelsNew.size() != nodeLabelsOld.size() + || !nodeLabelsOld.containsAll(nodeLabelsNew)) { + return true; + } + return false; + } + + private void updateTimelineClientsAddress( + NodeHeartbeatResponse response) { + Set> rmKnownCollectors = + response.getAppCollectorsMap().entrySet(); + for (Map.Entry entry : rmKnownCollectors) { + ApplicationId appId = entry.getKey(); + String collectorAddr = entry.getValue(); + + // Only handle applications running on local node. + // Not include apps with timeline collectors running in local + Application application = context.getApplications().get(appId); + if (application != null && + !context.getRegisteredCollectors().containsKey(appId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sync a new collector address: " + collectorAddr + + " for application: " + appId + " from RM."); + } + TimelineClient client = application.getTimelineClient(); + client.setTimelineServiceAddress(collectorAddr); + } + } + } + private void updateMasterKeys(NodeHeartbeatResponse response) { // See if the master-key has rolled over MasterKey updatedMasterKey = response.getContainerTokenMasterKey(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index 6ccea84fee..f37be23c4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -29,6 +29,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -103,7 +104,15 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo( Map newCollectorsMap = new HashMap(); for (AppCollectorsMap collector : newCollectorsList) { - newCollectorsMap.put(collector.getApplicationId(), collector.getCollectorAddr()); + ApplicationId appId = collector.getApplicationId(); + String collectorAddr = collector.getCollectorAddr(); + newCollectorsMap.put(appId, collectorAddr); + // set registered collector address to TimelineClient. + if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) { + TimelineClient client = + context.getApplications().get(appId).getTimelineClient(); + client.setTimelineServiceAddress(collectorAddr); + } } ((NodeManager.NMContext)context).addRegisteredCollectors(newCollectorsMap); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java index decd17dea3..0c95193f81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -38,5 +39,7 @@ public interface Application extends EventHandler { String getFlowId(); String getFlowRunId(); + + TimelineClient getTimelineClient(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index b37b993f8d..181878fbcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -39,6 +40,8 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -80,6 +83,7 @@ public class ApplicationImpl implements Application { private final ReadLock readLock; private final WriteLock writeLock; private final Context context; + private TimelineClient timelineClient; private static final Log LOG = LogFactory.getLog(ApplicationImpl.class); @@ -122,6 +126,17 @@ public ApplicationImpl(Dispatcher dispatcher, String user, String flowId, Context context) { this(dispatcher, user, flowId, flowRunId, appId, credentials, context, -1); + Configuration conf = context.getConf(); + if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) { + createAndStartTimelienClient(conf); + } + } + + private void createAndStartTimelienClient(Configuration conf) { + // create and start timeline client + this.timelineClient = TimelineClient.createTimelineClient(appId); + timelineClient.init(conf); + timelineClient.start(); } @Override @@ -133,6 +148,11 @@ public String getUser() { public ApplicationId getAppId() { return appId; } + + @Override + public TimelineClient getTimelineClient() { + return timelineClient; + } @Override public ApplicationState getApplicationState() { @@ -507,7 +527,11 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { // TODO check we remove related collectors info in failure cases // (YARN-3038) app.context.getRegisteredCollectors().remove(app.getAppId()); - app.context.getKnownCollectors().remove(app.getAppId()); + // stop timelineClient when application get finished. + TimelineClient timelineClient = app.getTimelineClient(); + if (timelineClient != null) { + timelineClient.stop(); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index b5c2747508..d57c91fd38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -18,32 +18,43 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; +import java.io.IOException; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; -import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; public class ContainersMonitorImpl extends AbstractService implements ContainersMonitor { @@ -75,11 +86,25 @@ public class ContainersMonitorImpl extends AbstractService implements private boolean pmemCheckEnabled; private boolean vmemCheckEnabled; private boolean containersMonitorEnabled; + + private boolean publishContainerMetricsToTimelineService; private long maxVCoresAllottedForContainers; private static final long UNKNOWN_MEMORY_LIMIT = -1L; private int nodeCpuPercentageForYARN; + + // For posting entities in new timeline service in a non-blocking way + // TODO replace with event loop in TimelineClient. + private static ExecutorService threadPool = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") + .build()); + + @Private + public static enum ContainerMetric { + CPU, MEMORY + } private ResourceUtilization containersUtilization; // Tracks the aggregated allocation of the currently allocated containers @@ -193,6 +218,18 @@ protected void serviceInit(Configuration conf) throws Exception { 1) + "). Thrashing might happen."); } } + + publishContainerMetricsToTimelineService = + YarnConfiguration.systemMetricsPublisherEnabled(conf); + + if (publishContainerMetricsToTimelineService) { + LOG.info("NodeManager has been configured to publish container " + + "metrics to Timeline Service V2."); + } else { + LOG.warn("NodeManager has not been configured to publish container " + + "metrics to Timeline Service V2."); + } + super.serviceInit(conf); } @@ -235,8 +272,27 @@ protected void serviceStop() throws Exception { ; } } + + shutdownAndAwaitTermination(); + super.serviceStop(); } + + // TODO remove threadPool after adding non-blocking call in TimelineClient + private static void shutdownAndAwaitTermination() { + threadPool.shutdown(); + try { + if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); + if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) + LOG.error("ThreadPool did not terminate"); + } + } catch (InterruptedException ie) { + threadPool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } public static class ProcessTreeInfo { private ContainerId containerId; @@ -413,6 +469,10 @@ public void run() { .entrySet()) { ContainerId containerId = entry.getKey(); ProcessTreeInfo ptInfo = entry.getValue(); + + ContainerEntity entity = new ContainerEntity(); + entity.setId(containerId.toString()); + try { String pId = ptInfo.getPID(); @@ -427,7 +487,8 @@ public void run() { + " for the first time"); ResourceCalculatorProcessTree pt = - ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf); + ResourceCalculatorProcessTree.getResourceCalculatorProcessTree( + pId, processTreeClass, conf); ptInfo.setPid(pId); ptInfo.setProcessTree(pt); @@ -451,6 +512,8 @@ public void run() { pTree.updateProcessTree(); // update process-tree long currentVmemUsage = pTree.getVirtualMemorySize(); long currentPmemUsage = pTree.getRssMemorySize(); + long currentTime = System.currentTimeMillis(); + // if machine has 6 cores and 3 are used, // cpuUsagePercentPerCore should be 300% and // cpuUsageTotalCoresPercentage should be 50% @@ -466,7 +529,7 @@ public void run() { float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore / resourceCalculatorPlugin.getNumProcessors(); - + // Multiply by 1000 to avoid losing data when converting to int int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000 * maxVCoresAllottedForContainers /nodeCpuPercentageForYARN); @@ -503,6 +566,26 @@ public void run() { ((int)cpuUsagePercentPerCore, milliVcoresUsed); } + if (publishContainerMetricsToTimelineService) { + // if currentPmemUsage data is available + if (currentPmemUsage != + ResourceCalculatorProcessTree.UNAVAILABLE) { + TimelineMetric memoryMetric = new TimelineMetric(); + memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId); + memoryMetric.addTimeSeriesData(currentTime, currentPmemUsage); + entity.addMetric(memoryMetric); + } + // if cpuUsageTotalCoresPercentage data is available + if (cpuUsageTotalCoresPercentage != + ResourceCalculatorProcessTree.UNAVAILABLE) { + TimelineMetric cpuMetric = new TimelineMetric(); + cpuMetric.setId(ContainerMetric.CPU.toString() + pId); + cpuMetric.addTimeSeriesData(currentTime, + cpuUsageTotalCoresPercentage); + entity.addMetric(cpuMetric); + } + } + boolean isMemoryOverLimit = false; String msg = ""; int containerExitStatus = ContainerExitStatus.INVALID; @@ -557,10 +640,23 @@ && isProcessTreeOverLimit(containerId.toString(), trackingContainers.remove(containerId); LOG.info("Removed ProcessTree with root " + pId); } + } catch (Exception e) { // Log the exception and proceed to the next container. - LOG.warn("Uncaught exception in ContainerMemoryManager " - + "while managing memory of " + containerId, e); + LOG.warn("Uncaught exception in ContainersMonitorImpl " + + "while monitoring resource of " + containerId, e); + } + + if (publishContainerMetricsToTimelineService) { + try { + TimelineClient timelineClient = context.getApplications().get( + containerId.getApplicationAttemptId().getApplicationId()). + getTimelineClient(); + putEntityWithoutBlocking(timelineClient, entity); + } catch (Exception e) { + LOG.error("Exception in ContainersMonitorImpl in putting " + + "resource usage metrics to timeline service.", e); + } } } if (LOG.isDebugEnabled()) { @@ -584,6 +680,21 @@ && isProcessTreeOverLimit(containerId.toString(), } } } + + private void putEntityWithoutBlocking(final TimelineClient timelineClient, + final TimelineEntity entity) { + Runnable publishWrapper = new Runnable() { + public void run() { + try { + timelineClient.putEntities(entity); + } catch (IOException|YarnException e) { + LOG.error("putEntityNonBlocking get failed: " + e); + throw new RuntimeException(e.toString()); + } + } + }; + threadPool.execute(publishWrapper); + } private String formatErrorMessage(String memTypeExceeded, long currentVmemUsage, long vmemLimit, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index a9ff83c7a4..3b84a78b0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -81,7 +81,7 @@ public void testSuccessfulContainerLaunch() throws InterruptedException, Context context = new NMContext(new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(), null, null, - new NMNullStateStoreService(), false) { + new NMNullStateStoreService(), false, conf) { @Override public int getHttpPort() { return 1234; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 7975f23de6..413cdd290c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -1704,9 +1704,10 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NMContext createNMContext( NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, - NMStateStoreService store, boolean isDistributedSchedulingEnabled) { + NMStateStoreService store, boolean isDistributedSchedulingEnabled, + Configuration conf) { return new MyNMContext(containerTokenSecretManager, - nmTokenSecretManager); + nmTokenSecretManager, conf); } }; @@ -1937,9 +1938,9 @@ private class MyNMContext extends NMContext { public MyNMContext( NMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInNM nmTokenSecretManager) { + NMTokenSecretManagerInNM nmTokenSecretManager, Configuration conf) { super(containerTokenSecretManager, nmTokenSecretManager, null, null, - new NMNullStateStoreService(), false); + new NMNullStateStoreService(), false, conf); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 70bd8f4c1f..d4a95ab208 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -622,11 +622,6 @@ public Map getRegisteredCollectors() { return null; } - @Override - public Map getKnownCollectors() { - return null; - } - @Override public ConcurrentMap getContainers() { return null; @@ -677,6 +672,11 @@ public boolean getDecommissioned() { return false; } + @Override + public Configuration getConf() { + return null; + } + @Override public void setDecommissioned(boolean isDecommissioned) { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 4f0e5c3f58..726b3535e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -119,7 +119,8 @@ public BaseContainerManagerTest() throws UnsupportedFileSystemException { protected Configuration conf = new YarnConfiguration(); protected Context context = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) { + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false, + conf) { public int getHttpPort() { return HTTP_PORT; }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index b7d0e48004..524ea1c7e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -22,7 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.isA; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -559,7 +559,7 @@ private NMContext createContext(Configuration conf, NMStateStoreService stateStore) { NMContext context = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore, false){ + new ApplicationACLsManager(conf), stateStore, false, conf) { public int getHttpPort() { return HTTP_PORT; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java index a558338ee3..7a4fca3661 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java @@ -113,7 +113,8 @@ public class TestContainerLaunch extends BaseContainerManagerTest { private static final String INVALID_JAVA_HOME = "/no/jvm/here"; protected Context distContext = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) { + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false, + conf) { public int getHttpPort() { return HTTP_PORT; }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java index c768df1ddb..df00f9e7ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java @@ -82,7 +82,7 @@ public void testMinimumPerDirectoryFileLimit() { new NMContext(new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(), null, new ApplicationACLsManager(conf), new NMNullStateStoreService(), - false); + false, conf); ResourceLocalizationService service = new ResourceLocalizationService(null, null, null, null, nmContext); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index f594d8cf36..23786fe180 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -186,7 +186,8 @@ public void setup() throws IOException { conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); nmContext = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), new NMNullStateStoreService(), false); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false, + conf); } @After @@ -2369,7 +2370,7 @@ private ResourceLocalizationService createSpyService( NMContext nmContext = new NMContext(new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore, false); + new ApplicationACLsManager(conf), stateStore, false, conf); ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, dirsHandler, nmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java index 35b95eeb83..2ee572b7ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -41,7 +42,7 @@ public class MockApp implements Application { Application app; String flowId; String flowRunId; - + TimelineClient timelineClient = null; public MockApp(int uniqId) { this("mockUser", 1234, uniqId); @@ -87,4 +88,9 @@ public String getFlowId() { public String getFlowRunId() { return flowRunId; } + + @Override + public TimelineClient getTimelineClient() { + return timelineClient; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java index 6a72cc000d..8dc06c7ace 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java @@ -21,8 +21,8 @@ import static org.junit.Assume.assumeTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.BufferedOutputStream; import java.io.File; @@ -47,10 +47,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -63,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; @@ -96,7 +96,8 @@ public void testContainerLogDirs() throws IOException, YarnException { healthChecker.init(conf); LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler(); NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, - new ApplicationACLsManager(conf), new NMNullStateStoreService(), false); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false, + conf); // Add an application and the corresponding containers RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf); String user = "nobody"; @@ -136,7 +137,8 @@ public void testContainerLogDirs() throws IOException, YarnException { when(dirsHandlerForFullDisk.getLogDirsForRead()). thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()})); nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk, - new ApplicationACLsManager(conf), new NMNullStateStoreService(), false); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false, + conf); nmContext.getApplications().put(appId, app); container.setState(ContainerState.RUNNING); nmContext.getContainers().put(container1, container); @@ -158,7 +160,8 @@ public void testContainerLogFile() throws IOException, YarnException { LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); dirsHandler.init(conf); NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, - new ApplicationACLsManager(conf), new NMNullStateStoreService(), false); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false, + conf); // Add an application and the corresponding containers String user = "nobody"; long clusterTimeStamp = 1234; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMAppsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMAppsPage.java index ca729f5e07..0214d04f0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMAppsPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMAppsPage.java @@ -63,7 +63,7 @@ public void testNMAppsPage() { final NMContext nmcontext = new NMContext( new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(), null, new ApplicationACLsManager(conf), new NMNullStateStoreService(), - false); + false, conf); Injector injector = WebAppTests.createMockInjector(NMContext.class, nmcontext, new Module() { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index 3f71179fca..9923f409b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -86,8 +86,9 @@ private NodeHealthCheckerService createNodeHealthCheckerService(Configuration co } private int startNMWebAppServer(String webAddr) { + Configuration conf = new Configuration(); Context nmContext = new NodeManager.NMContext(null, null, null, null, - null, false); + null, false, conf); ResourceView resourceView = new ResourceView() { @Override public long getVmemAllocatedForContainers() { @@ -110,7 +111,7 @@ public boolean isPmemCheckEnabled() { return true; } }; - Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath()); NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf); @@ -149,8 +150,9 @@ public void testNMWebAppWithEphemeralPort() throws IOException { @Test public void testNMWebApp() throws IOException, YarnException { + Configuration conf = new Configuration(); Context nmContext = new NodeManager.NMContext(null, null, null, null, - null, false); + null, false, conf); ResourceView resourceView = new ResourceView() { @Override public long getVmemAllocatedForContainers() { @@ -173,7 +175,7 @@ public boolean isPmemCheckEnabled() { return true; } }; - Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath()); NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 995bcb7a17..107326b6a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -108,7 +108,7 @@ protected void configureServlets() { healthChecker.init(conf); aclsManager = new ApplicationACLsManager(conf); nmContext = new NodeManager.NMContext(null, null, dirsHandler, - aclsManager, null, false); + aclsManager, null, false, conf); NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042); ((NodeManager.NMContext)nmContext).setNodeId(nodeId); resourceView = new ResourceView() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java index 1c34b44d82..3b7694aac8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java @@ -102,7 +102,7 @@ protected void configureServlets() { dirsHandler = healthChecker.getDiskHandler(); aclsManager = new ApplicationACLsManager(conf); nmContext = new NodeManager.NMContext(null, null, dirsHandler, - aclsManager, null, false); + aclsManager, null, false, conf); NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999); ((NodeManager.NMContext)nmContext).setNodeId(nodeId); resourceView = new ResourceView() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java index bec448de4a..06a399504f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java @@ -136,7 +136,7 @@ public boolean isPmemCheckEnabled() { dirsHandler = healthChecker.getDiskHandler(); aclsManager = new ApplicationACLsManager(conf); nmContext = new NodeManager.NMContext(null, null, dirsHandler, - aclsManager, null, false) { + aclsManager, null, false, conf) { public NodeId getNodeId() { return NodeId.newInstance("testhost.foo.com", 8042); }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 683aa9143b..cac4b42f79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -596,18 +596,16 @@ private void setAppCollectorsMapToResponse( Map liveAppCollectorsMap = new ConcurrentHashMap(); Map rmApps = rmContext.getRMApps(); - for (ApplicationId appId : liveApps) { - String appCollectorAddr = rmApps.get(appId).getCollectorAddr(); - if (appCollectorAddr != null) { - liveAppCollectorsMap.put(appId, appCollectorAddr); - } else { - // Log a debug info if collector address is not found. - if (LOG.isDebugEnabled()) { - LOG.debug("Collector for applicaton: " + appId + - " hasn't registered yet!"); - } - } + // Set collectors for all apps now. + // TODO set collectors for only active apps running on NM (liveApps cannot be + // used for this case) + for (Map.Entry rmApp : rmApps.entrySet()) { + ApplicationId appId = rmApp.getKey(); + String appCollectorAddr = rmApp.getValue().getCollectorAddr(); + if (appCollectorAddr != null) { + liveAppCollectorsMap.put(appId, appCollectorAddr); } + } response.setAppCollectorsMap(liveAppCollectorsMap); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index 0f516568de..edec0d357b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -135,7 +135,7 @@ public Response putEntities( } TimelineCollector collector = getCollector(req, appId); if (collector == null) { - LOG.error("Application not found"); + LOG.error("Application: "+ appId + " is not found"); throw new NotFoundException(); // different exception? } collector.putEntities(entities, callerUgi);