YARN-6888. Refactor AppLevelTimelineCollector such that RM does not have aggregator threads created. Contributed by Vrushali C.

This commit is contained in:
Rohith Sharma K S 2017-07-28 11:47:16 +05:30 committed by Varun Saxena
parent a990ff70c2
commit 3fb71b1393
4 changed files with 155 additions and 88 deletions

View File

@ -18,27 +18,17 @@
package org.apache.hadoop.yarn.server.timelineservice.collector;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Service that handles writes to the timeline service and writes them to the
* backing storage for a given YARN application.
@ -51,15 +41,8 @@ public class AppLevelTimelineCollector extends TimelineCollector {
private static final Logger LOG =
LoggerFactory.getLogger(TimelineCollector.class);
private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1;
private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15;
private static Set<String> entityTypesSkipAggregation
= initializeSkipSet();
private final ApplicationId appId;
private final TimelineCollectorContext context;
private ScheduledThreadPoolExecutor appAggregationExecutor;
private AppLevelAggregator appAggregator;
private UserGroupInformation currentUser;
public AppLevelTimelineCollector(ApplicationId appId) {
@ -69,12 +52,8 @@ public AppLevelTimelineCollector(ApplicationId appId) {
context = new TimelineCollectorContext();
}
private static Set<String> initializeSkipSet() {
Set<String> result = new HashSet<>();
result.add(TimelineEntityType.YARN_APPLICATION.toString());
result.add(TimelineEntityType.YARN_FLOW_RUN.toString());
result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
return result;
public UserGroupInformation getCurrentUser() {
return currentUser;
}
@Override
@ -92,29 +71,11 @@ protected void serviceInit(Configuration conf) throws Exception {
@Override
protected void serviceStart() throws Exception {
// Launch the aggregation thread
appAggregationExecutor = new ScheduledThreadPoolExecutor(
AppLevelTimelineCollector.AGGREGATION_EXECUTOR_NUM_THREADS,
new ThreadFactoryBuilder()
.setNameFormat("TimelineCollector Aggregation thread #%d")
.build());
appAggregator = new AppLevelAggregator();
appAggregationExecutor.scheduleAtFixedRate(appAggregator,
AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
TimeUnit.SECONDS);
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
appAggregationExecutor.shutdown();
if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
appAggregationExecutor.shutdownNow();
}
// Perform one round of aggregation after the aggregation executor is done.
appAggregator.aggregate();
super.serviceStop();
}
@ -123,48 +84,4 @@ public TimelineCollectorContext getTimelineEntityContext() {
return context;
}
@Override
protected Set<String> getEntityTypesSkipAggregation() {
return entityTypesSkipAggregation;
}
private class AppLevelAggregator implements Runnable {
private void aggregate() {
if (LOG.isDebugEnabled()) {
LOG.debug("App-level real-time aggregating");
}
if (!isReadyToAggregate()) {
LOG.warn("App-level collector is not ready, skip aggregation. ");
return;
}
try {
TimelineCollectorContext currContext = getTimelineEntityContext();
Map<String, AggregationStatusTable> aggregationGroups
= getAggregationGroups();
if (aggregationGroups == null
|| aggregationGroups.isEmpty()) {
LOG.debug("App-level collector is empty, skip aggregation. ");
return;
}
TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
aggregationGroups, currContext.getAppId(),
TimelineEntityType.YARN_APPLICATION.toString());
TimelineEntities entities = new TimelineEntities();
entities.addEntity(resultEntity);
putEntitiesAsync(entities, currentUser);
} catch (Exception e) {
LOG.error("Error aggregating timeline metrics", e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("App-level real-time aggregation complete");
}
}
@Override
public void run() {
aggregate();
}
}
}

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.collector;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.conf.Configuration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Service that handles aggregations for applications
* and makes use of {@link AppLevelTimelineCollector} class for
* writes to Timeline Service.
*
* App-related lifecycle management is handled by this service.
*/
@Private
@Unstable
public class AppLevelTimelineCollectorWithAgg
extends AppLevelTimelineCollector {
private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1;
private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15;
private static Set<String> entityTypesSkipAggregation
= initializeSkipSet();
private ScheduledThreadPoolExecutor appAggregationExecutor;
private AppLevelAggregator appAggregator;
public AppLevelTimelineCollectorWithAgg(ApplicationId appId) {
super(appId);
}
private static Set<String> initializeSkipSet() {
Set<String> result = new HashSet<>();
result.add(TimelineEntityType.YARN_APPLICATION.toString());
result.add(TimelineEntityType.YARN_FLOW_RUN.toString());
result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
return result;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
// Launch the aggregation thread
appAggregationExecutor = new ScheduledThreadPoolExecutor(
AppLevelTimelineCollectorWithAgg.AGGREGATION_EXECUTOR_NUM_THREADS,
new ThreadFactoryBuilder()
.setNameFormat("TimelineCollector Aggregation thread #%d")
.build());
appAggregator = new AppLevelAggregator();
appAggregationExecutor.scheduleAtFixedRate(appAggregator,
AppLevelTimelineCollectorWithAgg.
AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
AppLevelTimelineCollectorWithAgg.
AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
TimeUnit.SECONDS);
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
appAggregationExecutor.shutdown();
if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
appAggregationExecutor.shutdownNow();
}
// Perform one round of aggregation after the aggregation executor is done.
appAggregator.aggregate();
super.serviceStop();
}
@Override
protected Set<String> getEntityTypesSkipAggregation() {
return entityTypesSkipAggregation;
}
private class AppLevelAggregator implements Runnable {
private void aggregate() {
if (LOG.isDebugEnabled()) {
LOG.debug("App-level real-time aggregating");
}
if (!isReadyToAggregate()) {
LOG.warn("App-level collector is not ready, skip aggregation. ");
return;
}
try {
TimelineCollectorContext currContext = getTimelineEntityContext();
Map<String, AggregationStatusTable> aggregationGroups
= getAggregationGroups();
if (aggregationGroups == null
|| aggregationGroups.isEmpty()) {
LOG.debug("App-level collector is empty, skip aggregation. ");
return;
}
TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
aggregationGroups, currContext.getAppId(),
TimelineEntityType.YARN_APPLICATION.toString());
TimelineEntities entities = new TimelineEntities();
entities.addEntity(resultEntity);
putEntitiesAsync(entities, getCurrentUser());
} catch (Exception e) {
LOG.error("Error aggregating timeline metrics", e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("App-level real-time aggregation complete");
}
}
@Override
public void run() {
aggregate();
}
}
}

View File

@ -118,7 +118,7 @@ protected void serviceStop() throws Exception {
*/
public boolean addApplication(ApplicationId appId) {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollector(appId);
new AppLevelTimelineCollectorWithAgg(appId);
return (collectorManager.putIfAbsent(appId, collector)
== collector);
}

View File

@ -95,7 +95,7 @@ public void testMultithreadedAdd() throws Exception {
Callable<Boolean> task = new Callable<Boolean>() {
public Boolean call() {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollector(appId);
new AppLevelTimelineCollectorWithAgg(appId);
return (collectorManager.putIfAbsent(appId, collector) == collector);
}
};
@ -126,7 +126,7 @@ public void testMultithreadedAddAndRemove() throws Exception {
Callable<Boolean> task = new Callable<Boolean>() {
public Boolean call() {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollector(appId);
new AppLevelTimelineCollectorWithAgg(appId);
boolean successPut =
(collectorManager.putIfAbsent(appId, collector) == collector);
return successPut && collectorManager.remove(appId);