From 0d02ab8729630ad3cfb4300702927333b1d349e3 Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Tue, 9 Feb 2016 09:07:37 -0800 Subject: [PATCH] YARN-3367. Replace starting a separate thread for post entity with event loop in TimelineClient (Naganarasimha G R via sjlee) --- .../jobhistory/JobHistoryEventHandler.java | 81 +---- .../mapred/JobHistoryFileReplayMapper.java | 8 +- .../mapred/TimelineEntityConverter.java | 12 +- .../timelineservice/TimelineEntities.java | 17 +- .../hadoop/yarn/conf/YarnConfiguration.java | 6 + .../distributedshell/ApplicationMaster.java | 78 +---- .../api/async/impl/AMRMClientAsyncImpl.java | 26 +- .../yarn/client/api/TimelineClient.java | 8 +- .../client/api/impl/TimelineClientImpl.java | 285 +++++++++++++--- .../src/main/resources/yarn-default.xml | 7 + .../api/impl/TestTimelineClientV2Impl.java | 304 ++++++++++++++++++ .../nodemanager/NodeStatusUpdaterImpl.java | 4 +- 12 files changed, 619 insertions(+), 217 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index c2f3868a4a..d7cf19140c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -28,10 +28,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -85,7 +82,6 @@ import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientHandlerException; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * The job history events get routed to this class. This class writes the Job * history events to the DFS directly into a staging dir and then moved to a @@ -141,10 +137,6 @@ public class JobHistoryEventHandler extends AbstractService private boolean timelineServiceV2Enabled = false; - // For posting entities in new timeline service in a non-blocking way - // TODO YARN-3367 replace with event loop in TimelineClient. - private ExecutorService threadPool; - private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB"; private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK"; private static final String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE = @@ -284,10 +276,6 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.timelineServiceV2Enabled(conf); LOG.info("Timeline service is enabled; version: " + YarnConfiguration.getTimelineServiceVersion(conf)); - if (timelineServiceV2Enabled) { - // initialize the thread pool for v.2 timeline service - threadPool = createThreadPool(); - } } else { LOG.info("Timeline service is not enabled"); } @@ -461,35 +449,9 @@ protected void serviceStop() throws Exception { if (timelineClient != null) { timelineClient.stop(); } - if (threadPool != null) { - shutdownAndAwaitTermination(); - } LOG.info("Stopped JobHistoryEventHandler. super.stop()"); super.serviceStop(); } - - // TODO remove threadPool after adding non-blocking call in TimelineClient - private ExecutorService createThreadPool() { - return Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); - } - - private void shutdownAndAwaitTermination() { - threadPool.shutdown(); - try { - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { - threadPool.shutdownNow(); - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { - LOG.error("ThreadPool did not terminate"); - } - } - } catch (InterruptedException ie) { - threadPool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } protected EventWriter createEventWriter(Path historyFilePath) throws IOException { @@ -1097,41 +1059,6 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId, + "Server", ex); } } - - @Private - public JsonNode countersToJSON(Counters counters) { - ArrayNode nodes = FACTORY.arrayNode(); - if (counters != null) { - for (CounterGroup counterGroup : counters) { - ObjectNode groupNode = nodes.addObject(); - groupNode.put("NAME", counterGroup.getName()); - groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName()); - ArrayNode countersNode = groupNode.putArray("COUNTERS"); - for (Counter counter : counterGroup) { - ObjectNode counterNode = countersNode.addObject(); - counterNode.put("NAME", counter.getName()); - counterNode.put("DISPLAY_NAME", counter.getDisplayName()); - counterNode.put("VALUE", counter.getValue()); - } - } - } - return nodes; - } - - private void putEntityWithoutBlocking(final TimelineClient client, - final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) { - Runnable publishWrapper = new Runnable() { - public void run() { - try { - client.putEntities(entity); - } catch (IOException|YarnException e) { - LOG.error("putEntityNonBlocking get failed: " + e); - throw new RuntimeException(e.toString()); - } - } - }; - threadPool.execute(publishWrapper); - } // create JobEntity from HistoryEvent with adding other info, like: // jobId, timestamp and entityType. @@ -1293,7 +1220,13 @@ private void processEventForNewTimelineService(HistoryEvent event, taskId, setCreatedTime); } } - putEntityWithoutBlocking(timelineClient, tEntity); + try { + timelineClient.putEntitiesAsync(tEntity); + } catch (IOException | YarnException e) { + LOG.error("Failed to process Event " + event.getEventType() + + " for the job : " + jobId, e); + } + } private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java index 802b78ff46..4fb5308084 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java @@ -21,8 +21,8 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -176,7 +176,7 @@ protected void writeEntities(Configuration tlConf, // create entities from job history and write them long totalTime = 0; - Set entitySet = + List entitySet = converter.createTimelineEntities(jobInfo, jobConf); LOG.info("converted them into timeline entities for job " + jobIdStr); // use the current user for this purpose @@ -215,7 +215,7 @@ protected void writeEntities(Configuration tlConf, } private void writeAllEntities(AppLevelTimelineCollector collector, - Set entitySet, UserGroupInformation ugi) + List entitySet, UserGroupInformation ugi) throws IOException { TimelineEntities entities = new TimelineEntities(); entities.setEntities(entitySet); @@ -223,7 +223,7 @@ private void writeAllEntities(AppLevelTimelineCollector collector, } private void writePerEntity(AppLevelTimelineCollector collector, - Set entitySet, UserGroupInformation ugi) + List entitySet, UserGroupInformation ugi) throws IOException { for (TimelineEntity entity : entitySet) { TimelineEntities entities = new TimelineEntities(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java index 880014b22e..0e2eb7218e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java @@ -18,7 +18,9 @@ package org.apache.hadoop.mapred; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -57,16 +59,16 @@ class TimelineEntityConverter { * Note that we also do not add info to the YARN application entity, which * would be needed for aggregation. */ - public Set createTimelineEntities(JobInfo jobInfo, + public List createTimelineEntities(JobInfo jobInfo, Configuration conf) { - Set entities = new HashSet<>(); + List entities = new ArrayList<>(); // create the job entity TimelineEntity job = createJobEntity(jobInfo, conf); entities.add(job); // create the task and task attempt entities - Set tasksAndAttempts = + List tasksAndAttempts = createTaskAndTaskAttemptEntities(jobInfo); entities.addAll(tasksAndAttempts); @@ -125,9 +127,9 @@ private void addMetrics(TimelineEntity entity, Counters counters) { } } - private Set createTaskAndTaskAttemptEntities( + private List createTaskAndTaskAttemptEntities( JobInfo jobInfo) { - Set entities = new HashSet<>(); + List entities = new ArrayList<>(); Map taskInfoMap = jobInfo.getAllTasks(); LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() + " tasks"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java index f08a0ec93b..63989e682e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java @@ -17,15 +17,16 @@ */ package org.apache.hadoop.yarn.api.records.timelineservice; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; +import java.util.ArrayList; +import java.util.List; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; -import java.util.HashSet; -import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; /** * This class hosts a set of timeline entities. @@ -36,22 +37,22 @@ @InterfaceStability.Unstable public class TimelineEntities { - private Set entities = new HashSet<>(); + private List entities = new ArrayList<>(); public TimelineEntities() { } @XmlElement(name = "entities") - public Set getEntities() { + public List getEntities() { return entities; } - public void setEntities(Set timelineEntities) { + public void setEntities(List timelineEntities) { this.entities = timelineEntities; } - public void addEntities(Set timelineEntities) { + public void addEntities(List timelineEntities) { this.entities.addAll(timelineEntities); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2d0f285a20..7daf2304dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1987,6 +1987,12 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = 1000; + public static final String NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = + TIMELINE_SERVICE_PREFIX + + "timeline-client.number-of-async-entities-to-merge"; + + public static final int DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = 10; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 79827b97d3..b22e1b27af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -40,9 +40,6 @@ import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.cli.CommandLine; @@ -111,7 +108,6 @@ import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientHandlerException; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * An ApplicationMaster for executing shell commands on a set of launched @@ -225,10 +221,6 @@ public static enum DSEntity { private boolean timelineServiceV2 = false; - // For posting entities in new timeline service in a non-blocking way - // TODO replace with event loop in TimelineClient. - private ExecutorService threadPool; - // App Master configuration // No. of containers to run shell command on @VisibleForTesting @@ -328,10 +320,6 @@ public static void main(String[] args) { } appMaster.run(); result = appMaster.finish(); - - if (appMaster.threadPool != null) { - appMaster.shutdownAndAwaitTermination(); - } } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); LogManager.shutdown(); @@ -346,29 +334,6 @@ public static void main(String[] args) { } } - //TODO remove threadPool after adding non-blocking call in TimelineClient - private ExecutorService createThreadPool() { - return Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); - } - - private void shutdownAndAwaitTermination() { - threadPool.shutdown(); - try { - // Wait a while for existing tasks to terminate - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { - threadPool.shutdownNow(); - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) - LOG.error("ThreadPool did not terminate"); - } - } catch (InterruptedException ie) { - threadPool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - /** * Dump out contents of $CWD and the environment to stdout for debugging */ @@ -591,11 +556,7 @@ public boolean init(String[] args) throws ParseException, IOException { "container_retry_interval", "0")); if (YarnConfiguration.timelineServiceEnabled(conf)) { - timelineServiceV2 = - YarnConfiguration.timelineServiceV2Enabled(conf); - if (timelineServiceV2) { - threadPool = createThreadPool(); - } + timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf); } else { timelineClient = null; LOG.warn("Timeline service is not enabled"); @@ -746,8 +707,10 @@ public Void run() throws Exception { if (timelineServiceV2) { timelineClient = TimelineClient.createTimelineClient( appAttemptID.getApplicationId()); + LOG.info("Timeline service V2 client is enabled"); } else { timelineClient = TimelineClient.createTimelineClient(); + LOG.info("Timeline service V1 client is enabled"); } timelineClient.init(conf); timelineClient.start(); @@ -1385,18 +1348,8 @@ Thread createLaunchContainerThread(Container allocatedContainer, shellId); return new Thread(runnableLaunchContainer); } - - private void publishContainerStartEventOnTimelineServiceV2( - final Container container) { - Runnable publishWrapper = new Runnable() { - public void run() { - publishContainerStartEventOnTimelineServiceV2Base(container); - } - }; - threadPool.execute(publishWrapper); - } - private void publishContainerStartEventOnTimelineServiceV2Base( + private void publishContainerStartEventOnTimelineServiceV2( Container container) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); @@ -1430,16 +1383,6 @@ public TimelinePutResponse run() throws Exception { private void publishContainerEndEventOnTimelineServiceV2( final ContainerStatus container) { - Runnable publishWrapper = new Runnable() { - public void run() { - publishContainerEndEventOnTimelineServiceV2Base(container); - } - }; - threadPool.execute(publishWrapper); - } - - private void publishContainerEndEventOnTimelineServiceV2Base( - final ContainerStatus container) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(container.getContainerId().toString()); @@ -1470,17 +1413,6 @@ public TimelinePutResponse run() throws Exception { } private void publishApplicationAttemptEventOnTimelineServiceV2( - final DSEvent appEvent) { - - Runnable publishWrapper = new Runnable() { - public void run() { - publishApplicationAttemptEventOnTimelineServiceV2Base(appEvent); - } - }; - threadPool.execute(publishWrapper); - } - - private void publishApplicationAttemptEventOnTimelineServiceV2Base( DSEvent appEvent) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); @@ -1498,7 +1430,7 @@ private void publishApplicationAttemptEventOnTimelineServiceV2Base( appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @Override public TimelinePutResponse run() throws Exception { - timelineClient.putEntities(entity); + timelineClient.putEntitiesAsync(entity); return null; } }); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 212f721323..8af0c78b1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -41,9 +41,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; -import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -327,6 +327,19 @@ public void run() { LOG.info("Interrupted while waiting for queue", ex); continue; } + + String collectorAddress = response.getCollectorAddr(); + TimelineClient timelineClient = client.getRegisteredTimeineClient(); + if (timelineClient != null && collectorAddress != null + && !collectorAddress.isEmpty()) { + if (collectorAddr == null + || !collectorAddr.equals(collectorAddress)) { + collectorAddr = collectorAddress; + timelineClient.setTimelineServiceAddress(collectorAddress); + LOG.info("collectorAddress " + collectorAddress); + } + } + List updatedNodes = response.getUpdatedNodes(); if (!updatedNodes.isEmpty()) { handler.onNodesUpdated(updatedNodes); @@ -354,17 +367,6 @@ public void run() { if (!allocated.isEmpty()) { handler.onContainersAllocated(allocated); } - - String collectorAddress = response.getCollectorAddr(); - TimelineClient timelineClient = client.getRegisteredTimeineClient(); - if (timelineClient != null && collectorAddress != null - && !collectorAddress.isEmpty()) { - if (collectorAddr == null || - !collectorAddr.equals(collectorAddress)) { - collectorAddr = collectorAddress; - timelineClient.setTimelineServiceAddress(collectorAddress); - } - } progress = handler.getProgress(); } catch (Throwable ex) { handler.onError(ex); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index 58307d4e20..e043c2f2c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -28,12 +28,12 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -55,7 +55,7 @@ public abstract class TimelineClient extends AbstractService implements * construct and initialize a timeline client if the following operations are * supposed to be conducted by that user. */ - private ApplicationId contextAppId; + protected ApplicationId contextAppId; /** * Creates an instance of the timeline v.1.x client. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 06b3ac4a84..87a5e9c1b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -30,6 +30,14 @@ import java.net.URLConnection; import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; @@ -130,6 +138,8 @@ public class TimelineClientImpl extends TimelineClient { @VisibleForTesting TimelineClientConnectionRetry connectionRetry; + private TimelineEntityDispatcher entityDispatcher; + // Abstract class for an operation that should be retried by timeline client @Private @VisibleForTesting @@ -315,6 +325,7 @@ protected void serviceInit(Configuration conf) throws Exception { serviceRetryInterval = conf.getLong( YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); + entityDispatcher = new TimelineEntityDispatcher(conf); } else { if (YarnConfiguration.useHttps(conf)) { setTimelineServiceAddress(conf.get( @@ -335,7 +346,9 @@ protected void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { - if (!timelineServiceV2) { + if (timelineServiceV2) { + entityDispatcher.start(); + } else { timelineWriter = createTimelineWriter(configuration, authUgi, client, constructResURI(getConfig(), timelineServiceAddress, false)); } @@ -357,6 +370,9 @@ protected void serviceStop() throws Exception { if (this.timelineWriter != null) { this.timelineWriter.close(); } + if (timelineServiceV2) { + entityDispatcher.stop(); + } super.serviceStop(); } @@ -376,37 +392,21 @@ public TimelinePutResponse putEntities( @Override public void putEntities( org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) - throws IOException, YarnException { - putEntities(false, entities); + throws IOException, YarnException { + if (!timelineServiceV2) { + throw new YarnException("v.2 method is invoked on a v.1.x client"); + } + entityDispatcher.dispatchEntities(true, entities); } @Override public void putEntitiesAsync( org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) throws IOException, YarnException { - putEntities(true, entities); - } - - private void putEntities(boolean async, - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) - throws IOException, YarnException { if (!timelineServiceV2) { throw new YarnException("v.2 method is invoked on a v.1.x client"); } - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities - entitiesContainer = - new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); - for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) { - entitiesContainer.addEntity(entity); - } - MultivaluedMap params = new MultivaluedMapImpl(); - if (getContextAppId() != null) { - params.add("appid", getContextAppId().toString()); - } - if (async) { - params.add("async", Boolean.TRUE.toString()); - } - putObjects("entities", params, entitiesContainer); + entityDispatcher.dispatchEntities(false, entities); } @Override @@ -417,20 +417,10 @@ public void putDomain(TimelineDomain domain) throws IOException, // Used for new timeline service only @Private - public void putObjects(String path, MultivaluedMap params, + protected void putObjects(String path, MultivaluedMap params, Object obj) throws IOException, YarnException { - // timelineServiceAddress could haven't be initialized yet - // or stale (only for new timeline service) - int retries = pollTimelineServiceAddress(this.maxServiceRetries); - if (timelineServiceAddress == null) { - String errMessage = "TimelineClient has reached to max retry times : " - + this.maxServiceRetries - + ", but failed to fetch timeline service address. Please verify" - + " Timeline Auxillary Service is configured in all the NMs"; - LOG.error(errMessage); - throw new YarnException(errMessage); - } + int retries = verifyRestEndPointAvailable(); // timelineServiceAddress could be stale, add retry logic here. boolean needRetry = true; @@ -448,6 +438,21 @@ public void putObjects(String path, MultivaluedMap params, } } + private int verifyRestEndPointAvailable() throws YarnException { + // timelineServiceAddress could haven't be initialized yet + // or stale (only for new timeline service) + int retries = pollTimelineServiceAddress(this.maxServiceRetries); + if (timelineServiceAddress == null) { + String errMessage = "TimelineClient has reached to max retry times : " + + this.maxServiceRetries + + ", but failed to fetch timeline service address. Please verify" + + " Timeline Auxillary Service is configured in all the NMs"; + LOG.error(errMessage); + throw new YarnException(errMessage); + } + return retries; + } + /** * Check if reaching to maximum of retries. * @param retries @@ -641,7 +646,7 @@ private int pollTimelineServiceAddress(int retries) { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - timelineServiceAddress = getTimelineServiceAddress(); + // timelineServiceAddress = getTimelineServiceAddress(); retries--; } return retries; @@ -906,4 +911,212 @@ public boolean shouldRetryOn(Exception e) { } } + private final class EntitiesHolder extends FutureTask { + private final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities; + private final boolean isSync; + + EntitiesHolder( + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities, + final boolean isSync) { + super(new Callable() { + // publishEntities() + public Void call() throws Exception { + MultivaluedMap params = new MultivaluedMapImpl(); + params.add("appid", contextAppId.toString()); + params.add("async", Boolean.toString(!isSync)); + putObjects("entities", params, entities); + return null; + } + }); + this.entities = entities; + this.isSync = isSync; + } + + public boolean isSync() { + return isSync; + } + + public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities getEntities() { + return entities; + } + } + + /** + * This class is responsible for collecting the timeline entities and + * publishing them in async. + */ + private class TimelineEntityDispatcher { + /** + * Time period for which the timelineclient will wait for draining after + * stop + */ + private static final long DRAIN_TIME_PERIOD = 2000L; + + private int numberOfAsyncsToMerge; + private final BlockingQueue timelineEntityQueue; + private ExecutorService executor; + + TimelineEntityDispatcher(Configuration conf) { + timelineEntityQueue = new LinkedBlockingQueue(); + numberOfAsyncsToMerge = + conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, + YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE); + } + + Runnable createRunnable() { + return new Runnable() { + @Override + public void run() { + try { + EntitiesHolder entitiesHolder; + while (!Thread.currentThread().isInterrupted()) { + // Merge all the async calls and make one push, but if its sync + // call push immediately + try { + entitiesHolder = timelineEntityQueue.take(); + } catch (InterruptedException ie) { + LOG.info("Timeline dispatcher thread was interrupted "); + Thread.currentThread().interrupt(); + return; + } + if (entitiesHolder != null) { + publishWithoutBlockingOnQueue(entitiesHolder); + } + } + } finally { + if (!timelineEntityQueue.isEmpty()) { + LOG.info("Yet to publish " + timelineEntityQueue.size() + + " timelineEntities, draining them now. "); + } + // Try to drain the remaining entities to be published @ the max for + // 2 seconds + long timeTillweDrain = + System.currentTimeMillis() + DRAIN_TIME_PERIOD; + while (!timelineEntityQueue.isEmpty()) { + publishWithoutBlockingOnQueue(timelineEntityQueue.poll()); + if (System.currentTimeMillis() > timeTillweDrain) { + // time elapsed stop publishing further.... + if (!timelineEntityQueue.isEmpty()) { + LOG.warn("Time to drain elapsed! Remaining " + + timelineEntityQueue.size() + "timelineEntities will not" + + " be published"); + // if some entities were not drained then we need interrupt + // the threads which had put sync EntityHolders to the queue. + EntitiesHolder nextEntityInTheQueue = null; + while ((nextEntityInTheQueue = + timelineEntityQueue.poll()) != null) { + nextEntityInTheQueue.cancel(true); + } + } + break; + } + } + } + } + + /** + * Publishes the given EntitiesHolder and return immediately if sync + * call, else tries to fetch the EntitiesHolder from the queue in non + * blocking fashion and collate the Entities if possible before + * publishing through REST. + * + * @param entitiesHolder + */ + private void publishWithoutBlockingOnQueue( + EntitiesHolder entitiesHolder) { + if (entitiesHolder.isSync()) { + entitiesHolder.run(); + return; + } + int count = 1; + while (true) { + // loop till we find a sync put Entities or there is nothing + // to take + EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll(); + if (nextEntityInTheQueue == null) { + // Nothing in the queue just publish and get back to the + // blocked wait state + entitiesHolder.run(); + break; + } else if (nextEntityInTheQueue.isSync()) { + // flush all the prev async entities first + entitiesHolder.run(); + // and then flush the sync entity + nextEntityInTheQueue.run(); + break; + } else { + // append all async entities together and then flush + entitiesHolder.getEntities().addEntities( + nextEntityInTheQueue.getEntities().getEntities()); + count++; + if (count == numberOfAsyncsToMerge) { + // Flush the entities if the number of the async + // putEntites merged reaches the desired limit. To avoid + // collecting multiple entities and delaying for a long + // time. + entitiesHolder.run(); + break; + } + } + } + } + }; + } + + public void dispatchEntities(boolean sync, + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[] entitiesTobePublished) + throws YarnException { + if (executor.isShutdown()) { + throw new YarnException("Timeline client is in the process of stopping," + + " not accepting any more TimelineEntities"); + } + + // wrap all TimelineEntity into TimelineEntities object + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); + for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entitiesTobePublished) { + entities.addEntity(entity); + } + + // created a holder and place it in queue + EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync); + try { + timelineEntityQueue.put(entitiesHolder); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new YarnException( + "Failed while adding entity to the queue for publishing", e); + } + + if (sync) { + // In sync call we need to wait till its published and if any error then + // throw it back + try { + entitiesHolder.get(); + } catch (ExecutionException e) { + throw new YarnException("Failed while publishing entity", + e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new YarnException("Interrupted while publishing entity", e); + } + } + } + + public void start() { + executor = Executors.newSingleThreadExecutor(); + executor.execute(createRunnable()); + } + + public void stop() { + LOG.info("Stopping TimelineClient."); + executor.shutdownNow(); + try { + executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 2d3de01571..d47bab86fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2220,6 +2220,13 @@ 1000 + + Time line V2 client tries to merge these many number of + async entities (if available) and then call the REST ATS V2 API to submit. + + yarn.timeline-service.timeline-client.number-of-async-entities-to-merge + 10 + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java new file mode 100644 index 0000000000..7803f94239 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.ws.rs.core.MultivaluedMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestTimelineClientV2Impl { + private static final Log LOG = + LogFactory.getLog(TestTimelineClientV2Impl.class); + private TestV2TimelineClient client; + private static long TIME_TO_SLEEP = 150; + + @Before + public void setup() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); + conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3); + client = createTimelineClient(conf); + } + + private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) { + ApplicationId id = ApplicationId.newInstance(0, 0); + TestV2TimelineClient client = new TestV2TimelineClient(id); + client.init(conf); + client.start(); + return client; + } + + private class TestV2TimelineClient extends TimelineClientImpl { + private boolean sleepBeforeReturn; + private boolean throwException; + + private List publishedEntities; + + public TimelineEntities getPublishedEntities(int putIndex) { + Assert.assertTrue("Not So many entities Published", + putIndex < publishedEntities.size()); + return publishedEntities.get(putIndex); + } + + public void setSleepBeforeReturn(boolean sleepBeforeReturn) { + this.sleepBeforeReturn = sleepBeforeReturn; + } + + public void setThrowException(boolean throwException) { + this.throwException = throwException; + } + + public int getNumOfTimelineEntitiesPublished() { + return publishedEntities.size(); + } + + public TestV2TimelineClient(ApplicationId id) { + super(id); + publishedEntities = new ArrayList(); + } + + protected void putObjects(String path, + MultivaluedMap params, Object obj) + throws IOException, YarnException { + if (throwException) { + throw new YarnException("ActualException"); + } + publishedEntities.add((TimelineEntities) obj); + if (sleepBeforeReturn) { + try { + Thread.sleep(TIME_TO_SLEEP); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + + @Test + public void testPostEntities() throws Exception { + try { + client.putEntities(generateEntity("1")); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + } + + @Test + public void testASyncCallMerge() throws Exception { + client.setSleepBeforeReturn(true); + try { + client.putEntitiesAsync(generateEntity("1")); + Thread.sleep(TIME_TO_SLEEP / 2); + // by the time first put response comes push 2 entities in the queue + client.putEntitiesAsync(generateEntity("2")); + client.putEntitiesAsync(generateEntity("3")); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + for (int i = 0; i < 4; i++) { + if (client.getNumOfTimelineEntitiesPublished() == 2) { + break; + } + Thread.sleep(TIME_TO_SLEEP); + } + Assert.assertEquals("two merged TimelineEntities needs to be published", 2, + client.getNumOfTimelineEntitiesPublished()); + TimelineEntities secondPublishedEntities = client.getPublishedEntities(1); + Assert.assertEquals( + "Merged TimelineEntities Object needs to 2 TimelineEntity Object", 2, + secondPublishedEntities.getEntities().size()); + Assert.assertEquals("Order of Async Events Needs to be FIFO", "2", + secondPublishedEntities.getEntities().get(0).getId()); + Assert.assertEquals("Order of Async Events Needs to be FIFO", "3", + secondPublishedEntities.getEntities().get(1).getId()); + } + + @Test + public void testSyncCall() throws Exception { + try { + // sync entity should not be be merged with Async + client.putEntities(generateEntity("1")); + client.putEntitiesAsync(generateEntity("2")); + client.putEntitiesAsync(generateEntity("3")); + // except for the sync call above 2 should be merged + client.putEntities(generateEntity("4")); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + for (int i = 0; i < 4; i++) { + if (client.getNumOfTimelineEntitiesPublished() == 3) { + break; + } + Thread.sleep(TIME_TO_SLEEP); + } + printReceivedEntities(); + Assert.assertEquals("TimelineEntities not published as desired", 3, + client.getNumOfTimelineEntitiesPublished()); + TimelineEntities firstPublishedEntities = client.getPublishedEntities(0); + Assert.assertEquals("sync entities should not be merged with async", 1, + firstPublishedEntities.getEntities().size()); + + // test before pushing the sync entities asyncs are merged and pushed + TimelineEntities secondPublishedEntities = client.getPublishedEntities(1); + Assert.assertEquals( + "async entities should be merged before publishing sync", 2, + secondPublishedEntities.getEntities().size()); + Assert.assertEquals("Order of Async Events Needs to be FIFO", "2", + secondPublishedEntities.getEntities().get(0).getId()); + Assert.assertEquals("Order of Async Events Needs to be FIFO", "3", + secondPublishedEntities.getEntities().get(1).getId()); + + // test the last entity published is sync put + TimelineEntities thirdPublishedEntities = client.getPublishedEntities(2); + Assert.assertEquals("sync entities had to be published at the last", 1, + thirdPublishedEntities.getEntities().size()); + Assert.assertEquals("Expected last sync Event is not proper", "4", + thirdPublishedEntities.getEntities().get(0).getId()); + } + + @Test + public void testExceptionCalls() throws Exception { + client.setThrowException(true); + try { + client.putEntitiesAsync(generateEntity("1")); + } catch (YarnException e) { + Assert.fail("Async calls are not expected to throw exception"); + } + + try { + client.putEntities(generateEntity("2")); + Assert.fail("Sync calls are expected to throw exception"); + } catch (YarnException e) { + Assert.assertEquals("Same exception needs to be thrown", + "ActualException", e.getCause().getMessage()); + } + } + + @Test + public void testConfigurableNumberOfMerges() throws Exception { + client.setSleepBeforeReturn(true); + try { + // At max 3 entities need to be merged + client.putEntitiesAsync(generateEntity("1")); + client.putEntitiesAsync(generateEntity("2")); + client.putEntitiesAsync(generateEntity("3")); + client.putEntitiesAsync(generateEntity("4")); + client.putEntities(generateEntity("5")); + client.putEntitiesAsync(generateEntity("6")); + client.putEntitiesAsync(generateEntity("7")); + client.putEntitiesAsync(generateEntity("8")); + client.putEntitiesAsync(generateEntity("9")); + client.putEntitiesAsync(generateEntity("10")); + } catch (YarnException e) { + Assert.fail("No exception expected"); + } + // not having the same logic here as it doesn't depend on how many times + // events are published. + Thread.sleep(2 * TIME_TO_SLEEP); + printReceivedEntities(); + for (TimelineEntities publishedEntities : client.publishedEntities) { + Assert.assertTrue( + "Number of entities should not be greater than 3 for each publish," + + " but was " + publishedEntities.getEntities().size(), + publishedEntities.getEntities().size() <= 3); + } + } + + @Test + public void testAfterStop() throws Exception { + client.setSleepBeforeReturn(true); + try { + // At max 3 entities need to be merged + client.putEntities(generateEntity("1")); + for (int i = 2; i < 20; i++) { + client.putEntitiesAsync(generateEntity("" + i)); + } + client.stop(); + try { + client.putEntitiesAsync(generateEntity("50")); + Assert.fail("Exception expected"); + } catch (YarnException e) { + // expected + } + } catch (YarnException e) { + Assert.fail("No exception expected"); + } + // not having the same logic here as it doesn't depend on how many times + // events are published. + for (int i = 0; i < 5; i++) { + TimelineEntities publishedEntities = + client.publishedEntities.get(client.publishedEntities.size() - 1); + TimelineEntity timelineEntity = publishedEntities.getEntities() + .get(publishedEntities.getEntities().size() - 1); + if (!timelineEntity.getId().equals("19")) { + Thread.sleep(2 * TIME_TO_SLEEP); + } + } + printReceivedEntities(); + TimelineEntities publishedEntities = + client.publishedEntities.get(client.publishedEntities.size() - 1); + TimelineEntity timelineEntity = publishedEntities.getEntities() + .get(publishedEntities.getEntities().size() - 1); + Assert.assertEquals("", "19", timelineEntity.getId()); + } + + private void printReceivedEntities() { + for (int i = 0; i < client.getNumOfTimelineEntitiesPublished(); i++) { + TimelineEntities publishedEntities = client.getPublishedEntities(i); + StringBuilder entitiesPerPublish = new StringBuilder(); + ; + for (TimelineEntity entity : publishedEntities.getEntities()) { + entitiesPerPublish.append(entity.getId()); + entitiesPerPublish.append(","); + } + LOG.info("Entities Published @ index " + i + " : " + + entitiesPerPublish.toString()); + } + } + + private static TimelineEntity generateEntity(String id) { + TimelineEntity entity = new TimelineEntity(); + entity.setId(id); + entity.setType("testEntity"); + entity.setCreatedTime(System.currentTimeMillis()); + return entity; + } + + @After + public void tearDown() { + if (client != null) { + client.stop(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 4476c79a7d..9e2254e2bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -962,7 +962,9 @@ private void updateTimelineClientsAddress( Map knownCollectorsMap = response.getAppCollectorsMap(); if (knownCollectorsMap == null) { - LOG.warn("the collectors map is null"); + if (LOG.isDebugEnabled()) { + LOG.debug("No collectors to update RM"); + } } else { Set> rmKnownCollectors = knownCollectorsMap.entrySet();