From 84c35ac6c4a76c31d9bf9c87b87ed29394564611 Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Mon, 28 Mar 2016 15:50:03 -0700 Subject: [PATCH] YARN-4711. NM is going down with NPE's due to single thread processing of events by Timeline client (Naganarasimha G R via sjlee) --- .../dev-support/findbugs-exclude.xml | 11 +- .../timelineservice/TimelineEntity.java | 25 ++- .../client/api/impl/TimelineClientImpl.java | 35 ++- .../api/impl/TestTimelineClientV2Impl.java | 91 +++++++- .../metrics/ContainerMetricsConstants.java | 8 + .../nodemanager/NodeStatusUpdaterImpl.java | 10 +- .../collectormanager/NMCollectorService.java | 10 +- .../application/Application.java | 4 - .../application/ApplicationImpl.java | 23 +- .../timelineservice/NMTimelinePublisher.java | 212 ++++++++++-------- .../TestNMTimelinePublisher.java | 24 +- .../server/nodemanager/webapp/MockApp.java | 5 - 12 files changed, 279 insertions(+), 179 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index d724026e8e..08c6ba28c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -117,8 +117,15 @@ - - + + + + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java index acc132e604..7ce8279e5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java @@ -17,15 +17,6 @@ */ package org.apache.hadoop.yarn.api.records.timelineservice; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.yarn.util.TimelineServiceHelper; -import org.codehaus.jackson.annotate.JsonSetter; - -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlElement; -import javax.xml.bind.annotation.XmlRootElement; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -33,6 +24,16 @@ import java.util.Set; import java.util.TreeSet; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.util.TimelineServiceHelper; +import org.codehaus.jackson.annotate.JsonSetter; + /** * The basic timeline entity data structure for timeline service v2. Timeline * entity objects are not thread safe and should not be accessed concurrently. @@ -564,6 +565,10 @@ protected TimelineEntity getReal() { } public String toString() { - return identifier.toString(); + if (real == null) { + return identifier.toString(); + } else { + return real.toString(); + } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 87a5e9c1b2..ef8838e1a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -429,9 +429,8 @@ protected void putObjects(String path, MultivaluedMap params, URI uri = constructResURI(getConfig(), timelineServiceAddress, true); putObjects(uri, path, params, obj); needRetry = false; - } catch (Exception e) { - // TODO only handle exception for timelineServiceAddress being updated. - // skip retry for other exceptions. + } catch (IOException e) { + // handle exception for timelineServiceAddress being updated. checkRetryWithSleep(retries, e); retries--; } @@ -458,29 +457,27 @@ private int verifyRestEndPointAvailable() throws YarnException { * @param retries * @param e */ - private void checkRetryWithSleep(int retries, Exception e) throws - YarnException, IOException { + private void checkRetryWithSleep(int retries, IOException e) + throws YarnException, IOException { if (retries > 0) { try { Thread.sleep(this.serviceRetryInterval); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); + throw new YarnException("Interrupted while retrying to connect to ATS"); } } else { - LOG.error("TimelineClient has reached to max retry times :" + - this.maxServiceRetries + " for service address: " + - timelineServiceAddress); - if (e instanceof YarnException) { - throw (YarnException)e; - } else if (e instanceof IOException) { - throw (IOException)e; - } else { - throw new YarnException(e); - } + StringBuilder msg = + new StringBuilder("TimelineClient has reached to max retry times : "); + msg.append(this.maxServiceRetries); + msg.append(" for service address: "); + msg.append(timelineServiceAddress); + LOG.error(msg.toString()); + throw new IOException(msg.toString(), e); } } - private void putObjects( + protected void putObjects( URI base, String path, MultivaluedMap params, Object obj) throws IOException, YarnException { ClientResponse resp; @@ -636,17 +633,19 @@ private Object operateDelegationToken( /** * Poll TimelineServiceAddress for maximum of retries times if it is null. + * * @param retries * @return the left retry times + * @throws IOException */ - private int pollTimelineServiceAddress(int retries) { + private int pollTimelineServiceAddress(int retries) throws YarnException { while (timelineServiceAddress == null && retries > 0) { try { Thread.sleep(this.serviceRetryInterval); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + throw new YarnException("Interrupted while trying to connect ATS"); } - // timelineServiceAddress = getTimelineServiceAddress(); retries--; } return retries; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java index 7803f94239..71dafdc846 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -34,23 +35,33 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; public class TestTimelineClientV2Impl { private static final Log LOG = LogFactory.getLog(TestTimelineClientV2Impl.class); private TestV2TimelineClient client; private static long TIME_TO_SLEEP = 150; + private static final String EXCEPTION_MSG = "Exception in the content"; @Before public void setup() { - YarnConfiguration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3); - client = createTimelineClient(conf); + if (!currTestName.getMethodName() + .contains("testRetryOnConnectionFailure")) { + client = createTimelineClient(conf); + } } + @Rule + public TestName currTestName = new TestName(); + private YarnConfiguration conf; + private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) { ApplicationId id = ApplicationId.newInstance(0, 0); TestV2TimelineClient client = new TestV2TimelineClient(id); @@ -59,9 +70,34 @@ private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) { return client; } - private class TestV2TimelineClient extends TimelineClientImpl { + private class TestV2TimelineClientForExceptionHandling + extends TimelineClientImpl { + public TestV2TimelineClientForExceptionHandling(ApplicationId id) { + super(id); + } + + protected boolean throwYarnException; + + public void setThrowYarnException(boolean throwYarnException) { + this.throwYarnException = throwYarnException; + } + + @Override + protected void putObjects(URI base, String path, + MultivaluedMap params, Object obj) + throws IOException, YarnException { + if (throwYarnException) { + throw new YarnException(EXCEPTION_MSG); + } else { + throw new IOException( + "Failed to get the response from the timeline server."); + } + } + } + + private class TestV2TimelineClient + extends TestV2TimelineClientForExceptionHandling { private boolean sleepBeforeReturn; - private boolean throwException; private List publishedEntities; @@ -75,10 +111,6 @@ public void setSleepBeforeReturn(boolean sleepBeforeReturn) { this.sleepBeforeReturn = sleepBeforeReturn; } - public void setThrowException(boolean throwException) { - this.throwException = throwException; - } - public int getNumOfTimelineEntitiesPublished() { return publishedEntities.size(); } @@ -91,7 +123,7 @@ public TestV2TimelineClient(ApplicationId id) { protected void putObjects(String path, MultivaluedMap params, Object obj) throws IOException, YarnException { - if (throwException) { + if (throwYarnException) { throw new YarnException("ActualException"); } publishedEntities.add((TimelineEntities) obj); @@ -105,6 +137,45 @@ protected void putObjects(String path, } } + @Test + public void testExceptionMultipleRetry() { + TestV2TimelineClientForExceptionHandling client = + new TestV2TimelineClientForExceptionHandling( + ApplicationId.newInstance(0, 0)); + int maxRetries = 2; + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + maxRetries); + client.init(conf); + client.start(); + client.setTimelineServiceAddress("localhost:12345"); + try { + client.putEntities(new TimelineEntity()); + } catch (IOException e) { + Assert.fail("YARN exception is expected"); + } catch (YarnException e) { + Throwable cause = e.getCause(); + Assert.assertTrue("IOException is expected", + cause instanceof IOException); + Assert.assertTrue("YARN exception is expected", + cause.getMessage().contains( + "TimelineClient has reached to max retry times : " + maxRetries)); + } + + client.setThrowYarnException(true); + try { + client.putEntities(new TimelineEntity()); + } catch (IOException e) { + Assert.fail("YARN exception is expected"); + } catch (YarnException e) { + Throwable cause = e.getCause(); + Assert.assertTrue("YARN exception is expected", + cause instanceof YarnException); + Assert.assertTrue("YARN exception is expected", + cause.getMessage().contains(EXCEPTION_MSG)); + } + client.stop(); + } + @Test public void testPostEntities() throws Exception { try { @@ -189,7 +260,7 @@ public void testSyncCall() throws Exception { @Test public void testExceptionCalls() throws Exception { - client.setThrowException(true); + client.setThrowYarnException(true); try { client.putEntitiesAsync(generateEntity("1")); } catch (YarnException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java index 7b429948fb..eadb5b792d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java @@ -69,4 +69,12 @@ public class ContainerMetricsConstants { public static final String ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO = "YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS"; + + // Event of this type will be emitted by NM. + public static final String LOCALIZATION_START_EVENT_TYPE = + "YARN_NM_CONTAINER_LOCALIZATION_STARTED"; + + // Event of this type will be emitted by NM. + public static final String LOCALIZATION_FINISHED_EVENT_TYPE = + "YARN_NM_CONTAINER_LOCALIZATION_FINISHED"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 9e2254e2bc..6e0e7601a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; -import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -89,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -983,9 +983,11 @@ private void updateTimelineClientsAddress( LOG.debug("Sync a new collector address: " + collectorAddr + " for application: " + appId + " from RM."); } - TimelineClient client = application.getTimelineClient(); - if (client != null) { - client.setTimelineServiceAddress(collectorAddr); + NMTimelinePublisher nmTimelinePublisher = + context.getNMTimelinePublisher(); + if (nmTimelinePublisher != null) { + nmTimelinePublisher.setTimelineServiceAddress( + application.getAppId(), collectorAddr); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index 548c8617c7..d667c0ee24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -29,7 +29,6 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -42,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; /** * Service that handles collector information. It is used only if the timeline @@ -116,10 +116,10 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo( String collectorAddr = collector.getCollectorAddr(); newCollectorsMap.put(appId, collectorAddr); // set registered collector address to TimelineClient. - TimelineClient client = - context.getApplications().get(appId).getTimelineClient(); - if (client != null) { - client.setTimelineServiceAddress(collectorAddr); + NMTimelinePublisher nmTimelinePublisher = + context.getNMTimelinePublisher(); + if (nmTimelinePublisher != null) { + nmTimelinePublisher.setTimelineServiceAddress(appId, collectorAddr); } } ((NodeManager.NMContext)context).addRegisteredCollectors( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java index 5de339875f..aee0862ae8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java @@ -22,7 +22,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -41,7 +40,4 @@ public interface Application extends EventHandler { String getFlowVersion(); long getFlowRunId(); - - TimelineClient getTimelineClient(); - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 0531fe4e4a..22779bb4b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -83,7 +84,6 @@ public class ApplicationImpl implements Application { private final ReadLock readLock; private final WriteLock writeLock; private final Context context; - private TimelineClient timelineClient; private static final Log LOG = LogFactory.getLog(ApplicationImpl.class); @@ -143,7 +143,7 @@ public ApplicationImpl(Dispatcher dispatcher, String user, } this.flowContext = flowContext; if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) { - createAndStartTimelineClient(conf); + context.getNMTimelinePublisher().createTimelineClient(appId); } } } @@ -175,13 +175,6 @@ public long getFlowRunId() { } } - private void createAndStartTimelineClient(Configuration conf) { - // create and start timeline client - this.timelineClient = TimelineClient.createTimelineClient(appId); - timelineClient.init(conf); - timelineClient.start(); - } - @Override public String getUser() { return user.toString(); @@ -192,11 +185,6 @@ public ApplicationId getAppId() { return appId; } - @Override - public TimelineClient getTimelineClient() { - return timelineClient; - } - @Override public ApplicationState getApplicationState() { this.readLock.lock(); @@ -575,9 +563,10 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { registeredCollectors.remove(app.getAppId()); } // stop timelineClient when application get finished. - TimelineClient timelineClient = app.getTimelineClient(); - if (timelineClient != null) { - timelineClient.stop(); + NMTimelinePublisher nmTimelinePublisher = + app.context.getNMTimelinePublisher(); + if (nmTimelinePublisher != null) { + nmTimelinePublisher.stopTimelineClient(app.getAppId()); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 70b7e8dd66..4d3dafdc06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -18,8 +18,10 @@ package org.apache.hadoop.yarn.server.nodemanager.timelineservice; +import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -41,16 +42,15 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @@ -72,9 +72,12 @@ public class NMTimelinePublisher extends CompositeService { private String httpAddress; + protected final Map appToClientMap; + public NMTimelinePublisher(Context context) { super(NMTimelinePublisher.class.getName()); this.context = context; + appToClientMap = new ConcurrentHashMap<>(); } @Override @@ -82,12 +85,6 @@ protected void serviceInit(Configuration conf) throws Exception { dispatcher = new AsyncDispatcher(); dispatcher.register(NMTimelineEventType.class, new ForwardingEventHandler()); - dispatcher - .register(ContainerEventType.class, new ContainerEventHandler()); - dispatcher.register(ApplicationEventType.class, - new ApplicationEventHandler()); - dispatcher.register(LocalizationEventType.class, - new LocalizationEventDispatcher()); addIfService(dispatcher); super.serviceInit(conf); } @@ -112,7 +109,6 @@ protected void handleNMTimelineEvent(NMTimelineEvent event) { } } - @SuppressWarnings("unchecked") public void reportContainerResourceUsage(Container container, Long pmemUsage, Float cpuUsagePercentPerCore) { if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || @@ -133,15 +129,32 @@ public void reportContainerResourceUsage(Container container, Long pmemUsage, Math.round(cpuUsagePercentPerCore)); entity.addMetric(cpuMetric); } - dispatcher.getEventHandler() - .handle(new TimelinePublishEvent(entity, container.getContainerId() - .getApplicationAttemptId().getApplicationId())); + ApplicationId appId = container.getContainerId().getApplicationAttemptId() + .getApplicationId(); + try { + // no need to put it as part of publisher as timeline client already has + // Queuing concept + TimelineClient timelineClient = getTimelineClient(appId); + if (timelineClient != null) { + timelineClient.putEntitiesAsync(entity); + } else { + LOG.error("Seems like client has been removed before the container" + + " metric could be published for " + container.getContainerId()); + } + } catch (IOException | YarnException e) { + LOG.error("Failed to publish Container metrics for container " + + container.getContainerId(), e); + } } } - private void publishContainerCreatedEvent(ContainerEntity entity, - ContainerId containerId, Resource resource, Priority priority, - long timestamp) { + @SuppressWarnings("unchecked") + private void publishContainerCreatedEvent(ContainerEvent event) { + ContainerId containerId = event.getContainerID(); + ContainerEntity entity = createContainerEntity(containerId); + Container container = context.getContainers().get(containerId); + Resource resource = container.getResource(); + Map entityInfo = new HashMap(); entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, resource.getMemory()); @@ -152,7 +165,7 @@ private void publishContainerCreatedEvent(ContainerEntity entity, entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, nodeId.getPort()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, - priority.toString()); + container.getPriority().toString()); entityInfo.put( ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, httpAddress); @@ -160,13 +173,15 @@ private void publishContainerCreatedEvent(ContainerEntity entity, TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(timestamp); + tEvent.setTimestamp(event.getTimestamp()); entity.addEvent(tEvent); - entity.setCreatedTime(timestamp); - putEntity(entity, containerId.getApplicationAttemptId().getApplicationId()); + entity.setCreatedTime(event.getTimestamp()); + dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, + containerId.getApplicationAttemptId().getApplicationId())); } + @SuppressWarnings("unchecked") private void publishContainerFinishedEvent(ContainerStatus containerStatus, long timeStamp) { ContainerId containerId = containerStatus.getContainerId(); @@ -186,7 +201,38 @@ private void publishContainerFinishedEvent(ContainerStatus containerStatus, tEvent.setInfo(eventInfo); entity.addEvent(tEvent); - putEntity(entity, containerId.getApplicationAttemptId().getApplicationId()); + + dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, + containerId.getApplicationAttemptId().getApplicationId())); + } + + private void publishContainerLocalizationEvent( + ContainerLocalizationEvent event, String eventType) { + Container container = event.getContainer(); + ContainerId containerId = container.getContainerId(); + TimelineEntity entity = createContainerEntity(containerId); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(eventType); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + + ApplicationId appId = + container.getContainerId().getApplicationAttemptId().getApplicationId(); + try { + // no need to put it as part of publisher as timeline client already has + // Queuing concept + TimelineClient timelineClient = getTimelineClient(appId); + if (timelineClient != null) { + timelineClient.putEntitiesAsync(entity); + } else { + LOG.error("Seems like client has been removed before the event could be" + + " published for " + container.getContainerId()); + } + } catch (IOException | YarnException e) { + LOG.error("Failed to publish Container metrics for container " + + container.getContainerId(), e); + } } private static ContainerEntity createContainerEntity( @@ -207,23 +253,33 @@ private void putEntity(TimelineEntity entity, ApplicationId appId) { LOG.debug("Publishing the entity " + entity + ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); } - TimelineClient timelineClient = - context.getApplications().get(appId).getTimelineClient(); - timelineClient.putEntities(entity); + TimelineClient timelineClient = getTimelineClient(appId); + if (timelineClient != null) { + timelineClient.putEntities(entity); + } else { + LOG.error("Seems like client has been removed before the entity " + + "could be published for " + entity); + } } catch (Exception e) { LOG.error("Error when publishing entity " + entity, e); } } - @SuppressWarnings("unchecked") public void publishApplicationEvent(ApplicationEvent event) { // publish only when the desired event is received switch (event.getType()) { case INIT_APPLICATION: case FINISH_APPLICATION: - case APPLICATION_CONTAINER_FINISHED: case APPLICATION_LOG_HANDLING_FAILED: - dispatcher.getEventHandler().handle(event); + // TODO need to be handled in future, + // not sure to publish under which entity + break; + case APPLICATION_CONTAINER_FINISHED: + // this is actually used to publish the container Event + ApplicationContainerFinishedEvent evnt = + (ApplicationContainerFinishedEvent) event; + publishContainerFinishedEvent(evnt.getContainerStatus(), + event.getTimestamp()); break; default: @@ -235,12 +291,11 @@ public void publishApplicationEvent(ApplicationEvent event) { } } - @SuppressWarnings("unchecked") public void publishContainerEvent(ContainerEvent event) { // publish only when the desired event is received switch (event.getType()) { case INIT_CONTAINER: - dispatcher.getEventHandler().handle(event); + publishContainerCreatedEvent(event); break; default: @@ -253,15 +308,17 @@ public void publishContainerEvent(ContainerEvent event) { } } - @SuppressWarnings("unchecked") public void publishLocalizationEvent(LocalizationEvent event) { // publish only when the desired event is received switch (event.getType()) { case CONTAINER_RESOURCES_LOCALIZED: - case INIT_CONTAINER_RESOURCES: - dispatcher.getEventHandler().handle(event); + publishContainerLocalizationEvent((ContainerLocalizationEvent) event, + ContainerMetricsConstants.LOCALIZATION_FINISHED_EVENT_TYPE); + break; + case INIT_CONTAINER_RESOURCES: + publishContainerLocalizationEvent((ContainerLocalizationEvent) event, + ContainerMetricsConstants.LOCALIZATION_START_EVENT_TYPE); break; - default: if (LOG.isDebugEnabled()) { LOG.debug(event.getType() @@ -272,64 +329,6 @@ public void publishLocalizationEvent(LocalizationEvent event) { } } - private class ApplicationEventHandler implements - EventHandler { - @Override - public void handle(ApplicationEvent event) { - switch (event.getType()) { - case APPLICATION_CONTAINER_FINISHED: - // this is actually used to publish the container Event - ApplicationContainerFinishedEvent evnt = - (ApplicationContainerFinishedEvent) event; - publishContainerFinishedEvent(evnt.getContainerStatus(), - event.getTimestamp()); - break; - default: - LOG.error("Seems like event type is captured only in " - + "publishApplicationEvent method and not handled here"); - break; - } - } - } - - private class ContainerEventHandler implements EventHandler { - @Override - public void handle(ContainerEvent event) { - ContainerId containerId = event.getContainerID(); - Container container = context.getContainers().get(containerId); - long timestamp = event.getTimestamp(); - ContainerEntity entity = createContainerEntity(containerId); - - switch (event.getType()) { - case INIT_CONTAINER: - publishContainerCreatedEvent(entity, containerId, - container.getResource(), container.getPriority(), timestamp); - break; - default: - LOG.error("Seems like event type is captured only in " - + "publishContainerEvent method and not handled here"); - break; - } - } - } - - private static final class LocalizationEventDispatcher implements - EventHandler { - @Override - public void handle(LocalizationEvent event) { - switch (event.getType()) { - case INIT_CONTAINER_RESOURCES: - case CONTAINER_RESOURCES_LOCALIZED: - // TODO after priority based flush jira is finished - break; - default: - LOG.error("Seems like event type is captured only in " - + "publishLocalizationEvent method and not handled here"); - break; - } - } - } - /** * EventHandler implementation which forward events to NMMetricsPublisher. * Making use of it, NMMetricsPublisher can avoid to have a public handle @@ -363,4 +362,33 @@ public TimelineEntity getTimelineEntityToPublish() { return entityToPublish; } } + + public void createTimelineClient(ApplicationId appId) { + if (!appToClientMap.containsKey(appId)) { + TimelineClient timelineClient = + TimelineClient.createTimelineClient(appId); + timelineClient.init(getConfig()); + timelineClient.start(); + appToClientMap.put(appId, timelineClient); + } + } + + public void stopTimelineClient(ApplicationId appId) { + TimelineClient client = appToClientMap.remove(appId); + if (client != null) { + client.stop(); + } + } + + public void setTimelineServiceAddress(ApplicationId appId, + String collectorAddr) { + TimelineClient client = appToClientMap.get(appId); + if (client != null) { + client.setTimelineServiceAddress(collectorAddr); + } + } + + private TimelineClient getTimelineClient(ApplicationId appId) { + return appToClientMap.get(appId); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java index 830ed6b504..4aa28d2dbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java @@ -20,14 +20,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Iterator; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -39,7 +37,6 @@ import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.junit.Assert; @@ -53,20 +50,23 @@ public class TestNMTimelinePublisher { public void testContainerResourceUsage() { Context context = mock(Context.class); @SuppressWarnings("unchecked") - ConcurrentMap map = mock(ConcurrentMap.class); - Application aApp = mock(Application.class); - when(map.get(any(ApplicationId.class))).thenReturn(aApp); - DummyTimelineClient timelineClient = new DummyTimelineClient(); - when(aApp.getTimelineClient()).thenReturn(timelineClient); - when(context.getApplications()).thenReturn(map); + final DummyTimelineClient timelineClient = new DummyTimelineClient(); when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); when(context.getHttpPort()).thenReturn(0); - NMTimelinePublisher publisher = new NMTimelinePublisher(context); + NMTimelinePublisher publisher = new NMTimelinePublisher(context) { + public void createTimelineClient(ApplicationId appId) { + if (!appToClientMap.containsKey(appId)) { + appToClientMap.put(appId, timelineClient); + } + } + }; publisher.init(new Configuration()); publisher.start(); + ApplicationId appId = ApplicationId.newInstance(0, 1); + publisher.createTimelineClient(appId); Container aContainer = mock(Container.class); when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId( - ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1), + ApplicationAttemptId.newInstance(appId, 1), 0L)); publisher.reportContainerResourceUsage(aContainer, 1024L, 8F); verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8); @@ -141,7 +141,7 @@ protected static class DummyTimelineClient extends TimelineClientImpl { private TimelineEntity[] lastPublishedEntities; @Override - public void putEntities(TimelineEntity... entities) + public void putEntitiesAsync(TimelineEntity... entities) throws IOException, YarnException { this.lastPublishedEntities = entities; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java index 4d1be84368..c98304001a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java @@ -101,9 +101,4 @@ public String getFlowVersion() { public long getFlowRunId() { return flowRunId; } - - @Override - public TimelineClient getTimelineClient() { - return timelineClient; - } }