YARN-3816. [Aggregation] App-level aggregation and accumulation for YARN system metrics (Li Lu via sjlee)

This commit is contained in:
Sangjin Lee 2016-04-22 10:24:40 -07:00
parent fba7532c56
commit 39cce4e629
12 changed files with 999 additions and 27 deletions

View File

@ -19,12 +19,13 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.Comparator;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
@ -48,13 +49,13 @@ public static enum Type {
private Type type;
private String id;
private Comparator<Long> reverseComparator = new Comparator<Long>() {
@Override
public int compare(Long l1, Long l2) {
return l2.compareTo(l1);
}
};
private TreeMap<Long, Number> values = new TreeMap<>(reverseComparator);
// By default, not to do any aggregation operations. This field will NOT be
// persisted (like a "transient" member).
private TimelineMetricOperation realtimeAggregationOp
= TimelineMetricOperation.NOP;
private TreeMap<Long, Number> values
= new TreeMap<>(Collections.reverseOrder());
public TimelineMetric() {
this(Type.SINGLE_VALUE);
@ -83,6 +84,26 @@ public void setId(String metricId) {
this.id = metricId;
}
/**
* Get the real time aggregation operation of this metric.
*
* @return Real time aggregation operation
*/
public TimelineMetricOperation getRealtimeAggregationOp() {
return realtimeAggregationOp;
}
/**
* Set the real time aggregation operation of this metric.
*
* @param op A timeline metric operation that the metric should perform on
* real time aggregations
*/
public void setRealtimeAggregationOp(
final TimelineMetricOperation op) {
this.realtimeAggregationOp = op;
}
// required by JAXB
@InterfaceAudience.Private
@XmlElement(name = "values")
@ -98,8 +119,8 @@ public void setValues(Map<Long, Number> vals) {
if (type == Type.SINGLE_VALUE) {
overwrite(vals);
} else {
if (values != null) {
this.values = new TreeMap<Long, Number>(reverseComparator);
if (vals != null) {
this.values = new TreeMap<>(Collections.reverseOrder());
this.values.putAll(vals);
} else {
this.values = null;
@ -166,11 +187,100 @@ public boolean equals(Object o) {
@Override
public String toString() {
String str = "{id:" + id + ", type:" + type;
if (!values.isEmpty()) {
str += ", values:" + values;
}
str += "}";
return str;
return "{id: " + id + ", type: " + type +
", realtimeAggregationOp: " +
realtimeAggregationOp + "; " + values.toString() +
"}";
}
/**
* Get the latest timeline metric as single value type.
*
* @param metric Incoming timeline metric
* @return The latest metric in the incoming metric
*/
public static TimelineMetric getLatestSingleValueMetric(
TimelineMetric metric) {
if (metric.getType() == Type.SINGLE_VALUE) {
return metric;
} else {
TimelineMetric singleValueMetric = new TimelineMetric(Type.SINGLE_VALUE);
Long firstKey = metric.values.firstKey();
if (firstKey != null) {
Number firstValue = metric.values.get(firstKey);
singleValueMetric.addValue(firstKey, firstValue);
}
return singleValueMetric;
}
}
/**
* Get single data timestamp of the metric.
*
* @return the single data timestamp
*/
public long getSingleDataTimestamp() {
if (this.type == Type.SINGLE_VALUE) {
if (values.size() == 0) {
throw new YarnRuntimeException("Values for this timeline metric is " +
"empty.");
} else {
return values.firstKey();
}
} else {
throw new YarnRuntimeException("Type for this timeline metric is not " +
"SINGLE_VALUE.");
}
}
/**
* Get single data value of the metric.
*
* @return the single data value
*/
public Number getSingleDataValue() {
if (this.type == Type.SINGLE_VALUE) {
if (values.size() == 0) {
return null;
} else {
return values.get(values.firstKey());
}
} else {
throw new YarnRuntimeException("Type for this timeline metric is not " +
"SINGLE_VALUE.");
}
}
/**
* Aggregate an incoming metric to the base aggregated metric with the given
* operation state in a stateless fashion. The assumption here is
* baseAggregatedMetric and latestMetric should be single value data if not
* null.
*
* @param incomingMetric Incoming timeline metric to aggregate
* @param baseAggregatedMetric Base timeline metric
* @return Result metric after aggregation
*/
public static TimelineMetric aggregateTo(TimelineMetric incomingMetric,
TimelineMetric baseAggregatedMetric) {
return aggregateTo(incomingMetric, baseAggregatedMetric, null);
}
/**
* Aggregate an incoming metric to the base aggregated metric with the given
* operation state. The assumption here is baseAggregatedMetric and
* latestMetric should be single value data if not null.
*
* @param incomingMetric Incoming timeline metric to aggregate
* @param baseAggregatedMetric Base timeline metric
* @param state Operation state
* @return Result metric after aggregation
*/
public static TimelineMetric aggregateTo(TimelineMetric incomingMetric,
TimelineMetric baseAggregatedMetric, Map<Object, Object> state) {
TimelineMetricOperation operation
= incomingMetric.getRealtimeAggregationOp();
return operation.aggregate(incomingMetric, baseAggregatedMetric, state);
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.timelineservice;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
/**
* A calculator for timeline metrics.
*/
public final class TimelineMetricCalculator {
private TimelineMetricCalculator() {
// do nothing.
}
/**
* Compare two not-null numbers.
* @param n1 Number n1
* @param n2 Number n2
* @return 0 if n1 equals n2, a negative int if n1 is less than n2, a
* positive int otherwise.
*/
public static int compare(Number n1, Number n2) {
if (n1 == null || n2 == null) {
throw new YarnRuntimeException(
"Number to be compared shouldn't be null.");
}
if (n1 instanceof Integer || n1 instanceof Long) {
if (n1.longValue() == n2.longValue()) {
return 0;
} else {
return (n1.longValue() < n2.longValue()) ? -1 : 1;
}
}
if (n1 instanceof Float || n1 instanceof Double) {
if (n1.doubleValue() == n2.doubleValue()) {
return 0;
} else {
return (n1.doubleValue() < n2.doubleValue()) ? -1 : 1;
}
}
// TODO throw warnings/exceptions for other types of number.
throw new YarnRuntimeException("Unsupported types for number comparison: "
+ n1.getClass().getName() + ", " + n2.getClass().getName());
}
/**
* Subtract operation between two Numbers.
* @param n1 Number n1
* @param n2 Number n2
* @return Number represent to (n1 - n2).
*/
public static Number sub(Number n1, Number n2) {
if (n1 == null) {
throw new YarnRuntimeException(
"Number to be subtracted shouldn't be null.");
} else if (n2 == null) {
return n1;
}
if (n1 instanceof Integer || n1 instanceof Long) {
return n1.longValue() - n2.longValue();
}
if (n1 instanceof Float || n1 instanceof Double) {
return n1.doubleValue() - n2.doubleValue();
}
// TODO throw warnings/exceptions for other types of number.
return null;
}
/**
* Sum up two Numbers.
* @param n1 Number n1
* @param n2 Number n2
* @return Number represent to (n1 + n2).
*/
public static Number sum(Number n1, Number n2) {
if (n1 == null) {
return n2;
} else if (n2 == null) {
return n1;
}
if (n1 instanceof Integer || n1 instanceof Long) {
return n1.longValue() + n2.longValue();
}
if (n1 instanceof Float || n1 instanceof Double) {
return n1.doubleValue() + n2.doubleValue();
}
// TODO throw warnings/exceptions for other types of number.
return null;
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.timelineservice;
import java.util.Map;
/**
* Aggregation operations.
*/
public enum TimelineMetricOperation {
NOP("NOP") {
/**
* Do nothing on the base metric.
*
* @param incoming Metric a
* @param base Metric b
* @param state Operation state (not used)
* @return Metric b
*/
@Override
public TimelineMetric exec(TimelineMetric incoming,
TimelineMetric base, Map<Object, Object> state) {
return base;
}
},
MAX("MAX") {
/**
* Keep the greater value of incoming and base. Stateless operation.
*
* @param incoming Metric a
* @param base Metric b
* @param state Operation state (not used)
* @return the greater value of a and b
*/
@Override
public TimelineMetric exec(TimelineMetric incoming,
TimelineMetric base, Map<Object, Object> state) {
if (base == null) {
return incoming;
}
Number incomingValue = incoming.getSingleDataValue();
Number aggregateValue = base.getSingleDataValue();
if (aggregateValue == null) {
aggregateValue = Long.MIN_VALUE;
}
if (TimelineMetricCalculator.compare(incomingValue, aggregateValue) > 0) {
base.addValue(incoming.getSingleDataTimestamp(), incomingValue);
}
return base;
}
},
REPLACE("REPLACE") {
/**
* Replace the base metric with the incoming value. Stateless operation.
*
* @param incoming Metric a
* @param base Metric b
* @param state Operation state (not used)
* @return Metric a
*/
@Override
public TimelineMetric exec(TimelineMetric incoming,
TimelineMetric base,
Map<Object, Object> state) {
return incoming;
}
},
SUM("SUM") {
/**
* Return the sum of the incoming metric and the base metric if the
* operation is stateless. For stateful operations, also subtract the
* value of the timeline metric mapped to the PREV_METRIC_STATE_KEY
* in the state object.
*
* @param incoming Metric a
* @param base Metric b
* @param state Operation state (PREV_METRIC_STATE_KEY's value as Metric p)
* @return A metric with value a + b - p
*/
@Override
public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base,
Map<Object, Object> state) {
if (base == null) {
return incoming;
}
Number incomingValue = incoming.getSingleDataValue();
Number aggregateValue = base.getSingleDataValue();
Number result
= TimelineMetricCalculator.sum(incomingValue, aggregateValue);
// If there are previous value in the state, we will take it off from the
// sum
if (state != null) {
Object prevMetric = state.get(PREV_METRIC_STATE_KEY);
if (prevMetric instanceof TimelineMetric) {
result = TimelineMetricCalculator.sub(result,
((TimelineMetric) prevMetric).getSingleDataValue());
}
}
base.addValue(incoming.getSingleDataTimestamp(), result);
return base;
}
},
AVG("AVERAGE") {
/**
* Return the average value of the incoming metric and the base metric,
* with a given state. Not supported yet.
*
* @param incoming Metric a
* @param base Metric b
* @param state Operation state
* @return Not finished yet
*/
@Override
public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base,
Map<Object, Object> state) {
// Not supported yet
throw new UnsupportedOperationException(
"Unsupported aggregation operation: AVERAGE");
}
};
public static final String PREV_METRIC_STATE_KEY = "PREV_METRIC";
/**
* Perform the aggregation operation.
*
* @param incoming Incoming metric
* @param aggregate Base aggregation metric
* @param state Operation state
* @return Result metric for this aggregation operation
*/
public TimelineMetric aggregate(TimelineMetric incoming,
TimelineMetric aggregate, Map<Object, Object> state) {
return exec(incoming, aggregate, state);
}
private final String opName;
TimelineMetricOperation(String opString) {
opName = opString;
}
@Override
public String toString() {
return this.opName;
}
abstract TimelineMetric exec(TimelineMetric incoming, TimelineMetric base,
Map<Object, Object> state);
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.timelineservice;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.junit.Test;
public class TestTimelineMetric {
@Test
public void testTimelineMetricAggregation() {
long ts = System.currentTimeMillis();
// single_value metric add against null metric
TimelineMetric m1 = getSingleValueMetric("MEGA_BYTES_MILLIS",
TimelineMetricOperation.SUM, ts, 10000L);
TimelineMetric aggregatedMetric = TimelineMetric.aggregateTo(m1, null);
assertEquals(10000L, aggregatedMetric.getSingleDataValue());
TimelineMetric m2 = getSingleValueMetric("MEGA_BYTES_MILLIS",
TimelineMetricOperation.SUM, ts, 20000L);
aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric);
assertEquals(30000L, aggregatedMetric.getSingleDataValue());
// stateful sum test
Map<Object, Object> state = new HashMap<>();
state.put(TimelineMetricOperation.PREV_METRIC_STATE_KEY, m2);
TimelineMetric m2New = getSingleValueMetric("MEGA_BYTES_MILLIS",
TimelineMetricOperation.SUM, ts, 10000L);
aggregatedMetric = TimelineMetric.aggregateTo(m2New, aggregatedMetric,
state);
assertEquals(20000L, aggregatedMetric.getSingleDataValue());
// single_value metric max against single_value metric
TimelineMetric m3 = getSingleValueMetric("TRANSFER_RATE",
TimelineMetricOperation.MAX, ts, 150L);
TimelineMetric aggregatedMax = TimelineMetric.aggregateTo(m3, null);
assertEquals(150L, aggregatedMax.getSingleDataValue());
TimelineMetric m4 = getSingleValueMetric("TRANSFER_RATE",
TimelineMetricOperation.MAX, ts, 170L);
aggregatedMax = TimelineMetric.aggregateTo(m4, aggregatedMax);
assertEquals(170L, aggregatedMax.getSingleDataValue());
// single_value metric avg against single_value metric
TimelineMetric m5 = getSingleValueMetric("TRANSFER_RATE",
TimelineMetricOperation.AVG, ts, 150L);
try {
TimelineMetric.aggregateTo(m5, null);
fail("Taking average among metrics is not supported! ");
} catch (UnsupportedOperationException e) {
// Expected
}
}
private static TimelineMetric getSingleValueMetric(String id,
TimelineMetricOperation op, long timestamp, long value) {
TimelineMetric m = new TimelineMetric();
m.setId(id);
m.setType(Type.SINGLE_VALUE);
m.setRealtimeAggregationOp(op);
Map<Long, Number> metricValues = new HashMap<Long, Number>();
metricValues.put(timestamp, value);
m.setValues(metricValues);
return m;
}
private static TimelineMetric getTimeSeriesMetric(String id,
TimelineMetricOperation op, Map<Long, Number> metricValues) {
TimelineMetric m = new TimelineMetric();
m.setId(id);
m.setType(Type.TIME_SERIES);
m.setRealtimeAggregationOp(op);
m.setValues(metricValues);
return m;
}
}

View File

@ -64,13 +64,13 @@ public void testTimelineEntities() throws Exception {
metric1.getValues().entrySet().iterator();
Map.Entry<Long, Number> entry = itr.next();
Assert.assertEquals(new Long(3L), entry.getKey());
Assert.assertEquals(new Double(3.0D), entry.getValue());
Assert.assertEquals(3.0D, entry.getValue());
entry = itr.next();
Assert.assertEquals(new Long(2L), entry.getKey());
Assert.assertEquals(new Integer(2), entry.getValue());
Assert.assertEquals(2, entry.getValue());
entry = itr.next();
Assert.assertEquals(new Long(1L), entry.getKey());
Assert.assertEquals(new Float(1.0F), entry.getValue());
Assert.assertEquals(1.0F, entry.getValue());
Assert.assertFalse(itr.hasNext());
entity.addMetric(metric1);

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
@ -119,12 +120,15 @@ public void reportContainerResourceUsage(Container container, Long pmemUsage,
if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
TimelineMetric memoryMetric = new TimelineMetric();
memoryMetric.setId(ContainerMetric.MEMORY.toString());
memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
memoryMetric.addValue(currentTimeMillis, pmemUsage);
entity.addMetric(memoryMetric);
}
if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
TimelineMetric cpuMetric = new TimelineMetric();
cpuMetric.setId(ContainerMetric.CPU.toString());
// TODO: support average
cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
cpuMetric.addValue(currentTimeMillis,
Math.round(cpuUsagePercentPerCore));
entity.addMetric(cpuMetric);

View File

@ -18,15 +18,26 @@
package org.apache.hadoop.yarn.server.timelineservice.collector;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.base.Preconditions;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Service that handles writes to the timeline service and writes them to the
* backing storage for a given YARN application.
@ -36,8 +47,16 @@
@Private
@Unstable
public class AppLevelTimelineCollector extends TimelineCollector {
private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1;
private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15;
private static Set<String> entityTypesSkipAggregation
= initializeSkipSet();
private final ApplicationId appId;
private final TimelineCollectorContext context;
private ScheduledThreadPoolExecutor appAggregationExecutor;
public AppLevelTimelineCollector(ApplicationId appId) {
super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
@ -46,6 +65,14 @@ public AppLevelTimelineCollector(ApplicationId appId) {
context = new TimelineCollectorContext();
}
private static Set<String> initializeSkipSet() {
Set<String> result = new HashSet<>();
result.add(TimelineEntityType.YARN_APPLICATION.toString());
result.add(TimelineEntityType.YARN_FLOW_RUN.toString());
result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
return result;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID,
@ -60,11 +87,25 @@ protected void serviceInit(Configuration conf) throws Exception {
@Override
protected void serviceStart() throws Exception {
// Launch the aggregation thread
appAggregationExecutor = new ScheduledThreadPoolExecutor(
AppLevelTimelineCollector.AGGREGATION_EXECUTOR_NUM_THREADS,
new ThreadFactoryBuilder()
.setNameFormat("TimelineCollector Aggregation thread #%d")
.build());
appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), 0,
AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
TimeUnit.SECONDS);
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
appAggregationExecutor.shutdown();
if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
appAggregationExecutor.shutdownNow();
}
super.serviceStop();
}
@ -73,4 +114,35 @@ public TimelineCollectorContext getTimelineEntityContext() {
return context;
}
@Override
protected Set<String> getEntityTypesSkipAggregation() {
return entityTypesSkipAggregation;
}
private class AppLevelAggregator implements Runnable {
@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("App-level real-time aggregating");
}
try {
TimelineCollectorContext currContext = getTimelineEntityContext();
TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
getAggregationGroups(), currContext.getAppId(),
TimelineEntityType.YARN_APPLICATION.toString());
TimelineEntities entities = new TimelineEntities();
entities.addEntity(resultEntity);
getWriter().write(currContext.getClusterId(), currContext.getUserId(),
currContext.getFlowName(), currContext.getFlowVersion(),
currContext.getFlowRunId(), currContext.getAppId(), entities);
} catch (Exception e) {
LOG.error("Error aggregating timeline metrics", e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("App-level real-time aggregation complete");
}
}
}
}

View File

@ -19,6 +19,12 @@
package org.apache.hadoop.yarn.server.timelineservice.collector;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -27,7 +33,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
@ -41,9 +50,15 @@
@Private
@Unstable
public abstract class TimelineCollector extends CompositeService {
private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
public static final String SEPARATOR = "_";
private TimelineWriter writer;
private ConcurrentMap<String, AggregationStatusTable> aggregationGroups
= new ConcurrentHashMap<>();
private static Set<String> entityTypesSkipAggregation
= new HashSet<>();
public TimelineCollector(String name) {
super(name);
@ -68,6 +83,28 @@ protected void setWriter(TimelineWriter w) {
this.writer = w;
}
protected TimelineWriter getWriter() {
return writer;
}
protected Map<String, AggregationStatusTable> getAggregationGroups() {
return aggregationGroups;
}
/**
* Method to decide the set of timeline entity types the collector should
* skip on aggregations. Subclasses may want to override this method to
* customize their own behaviors.
*
* @return A set of strings consists of all types the collector should skip.
*/
protected Set<String> getEntityTypesSkipAggregation() {
return entityTypesSkipAggregation;
}
public abstract TimelineCollectorContext getTimelineEntityContext();
/**
* Handles entity writes. These writes are synchronous and are written to the
* backing storage without buffering/batching. If any entity already exists,
@ -90,8 +127,12 @@ public TimelineWriteResponse putEntities(TimelineEntities entities,
LOG.debug("putEntities(entities=" + entities + ", callerUgi="
+ callerUgi + ")");
}
TimelineCollectorContext context = getTimelineEntityContext();
// Update application metrics for aggregation
updateAggregateStatus(entities, aggregationGroups,
getEntityTypesSkipAggregation());
return writer.write(context.getClusterId(), context.getUserId(),
context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(),
context.getAppId(), entities);
@ -117,6 +158,174 @@ public void putEntitiesAsync(TimelineEntities entities,
}
}
public abstract TimelineCollectorContext getTimelineEntityContext();
/**
* Aggregate all metrics in given timeline entities with no predefined states.
*
* @param entities Entities to aggregate
* @param resultEntityId Id of the result entity
* @param resultEntityType Type of the result entity
* @param needsGroupIdInResult Marks if we want the aggregation group id in
* each aggregated metrics.
* @return A timeline entity that contains all aggregated TimelineMetric.
*/
public static TimelineEntity aggregateEntities(
TimelineEntities entities, String resultEntityId,
String resultEntityType, boolean needsGroupIdInResult) {
ConcurrentMap<String, AggregationStatusTable> aggregationGroups
= new ConcurrentHashMap<>();
updateAggregateStatus(entities, aggregationGroups, null);
if (needsGroupIdInResult) {
return aggregate(aggregationGroups, resultEntityId, resultEntityType);
} else {
return aggregateWithoutGroupId(
aggregationGroups, resultEntityId, resultEntityType);
}
}
/**
* Update the aggregation status table for a timeline collector.
*
* @param entities Entities to update
* @param aggregationGroups Aggregation status table
* @param typesToSkip Entity types that we can safely assume to skip updating
*/
static void updateAggregateStatus(
TimelineEntities entities,
ConcurrentMap<String, AggregationStatusTable> aggregationGroups,
Set<String> typesToSkip) {
for (TimelineEntity e : entities.getEntities()) {
if ((typesToSkip != null && typesToSkip.contains(e.getType()))
|| e.getMetrics().isEmpty()) {
continue;
}
AggregationStatusTable aggrTable = aggregationGroups.get(e.getType());
if (aggrTable == null) {
AggregationStatusTable table = new AggregationStatusTable();
aggrTable = aggregationGroups.putIfAbsent(e.getType(),
table);
if (aggrTable == null) {
aggrTable = table;
}
}
aggrTable.update(e);
}
}
/**
* Aggregate internal status and generate timeline entities for the
* aggregation results.
*
* @param aggregationGroups Aggregation status table
* @param resultEntityId Id of the result entity
* @param resultEntityType Type of the result entity
* @return A timeline entity that contains all aggregated TimelineMetric.
*/
static TimelineEntity aggregate(
Map<String, AggregationStatusTable> aggregationGroups,
String resultEntityId, String resultEntityType) {
TimelineEntity result = new TimelineEntity();
result.setId(resultEntityId);
result.setType(resultEntityType);
for (Map.Entry<String, AggregationStatusTable> entry
: aggregationGroups.entrySet()) {
entry.getValue().aggregateAllTo(result, entry.getKey());
}
return result;
}
/**
* Aggregate internal status and generate timeline entities for the
* aggregation results. The result metrics will not have aggregation group
* information.
*
* @param aggregationGroups Aggregation status table
* @param resultEntityId Id of the result entity
* @param resultEntityType Type of the result entity
* @return A timeline entity that contains all aggregated TimelineMetric.
*/
static TimelineEntity aggregateWithoutGroupId(
Map<String, AggregationStatusTable> aggregationGroups,
String resultEntityId, String resultEntityType) {
TimelineEntity result = new TimelineEntity();
result.setId(resultEntityId);
result.setType(resultEntityType);
for (Map.Entry<String, AggregationStatusTable> entry
: aggregationGroups.entrySet()) {
entry.getValue().aggregateAllTo(result, "");
}
return result;
}
// Note: In memory aggregation is performed in an eventually consistent
// fashion.
private static class AggregationStatusTable {
// On aggregation, for each metric, aggregate all per-entity accumulated
// metrics. We only use the id and type for TimelineMetrics in the key set
// of this table.
private ConcurrentMap<TimelineMetric, Map<String, TimelineMetric>>
aggregateTable;
public AggregationStatusTable() {
aggregateTable = new ConcurrentHashMap<>();
}
public void update(TimelineEntity incoming) {
String entityId = incoming.getId();
for (TimelineMetric m : incoming.getMetrics()) {
// Skip if the metric does not need aggregation
if (m.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
continue;
}
// Update aggregateTable
Map<String, TimelineMetric> aggrRow = aggregateTable.get(m);
if (aggrRow == null) {
Map<String, TimelineMetric> tempRow = new ConcurrentHashMap<>();
aggrRow = aggregateTable.putIfAbsent(m, tempRow);
if (aggrRow == null) {
aggrRow = tempRow;
}
}
aggrRow.put(entityId, m);
}
}
public TimelineEntity aggregateTo(TimelineMetric metric, TimelineEntity e,
String aggregationGroupId) {
if (metric.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
return e;
}
Map<String, TimelineMetric> aggrRow = aggregateTable.get(metric);
if (aggrRow != null) {
TimelineMetric aggrMetric = new TimelineMetric();
if (aggregationGroupId.length() > 0) {
aggrMetric.setId(metric.getId() + SEPARATOR + aggregationGroupId);
} else {
aggrMetric.setId(metric.getId());
}
aggrMetric.setRealtimeAggregationOp(TimelineMetricOperation.NOP);
Map<Object, Object> status = new HashMap<>();
for (TimelineMetric m : aggrRow.values()) {
TimelineMetric.aggregateTo(m, aggrMetric, status);
// getRealtimeAggregationOp returns an enum so we can directly
// compare with "!=".
if (m.getRealtimeAggregationOp()
!= aggrMetric.getRealtimeAggregationOp()) {
aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp());
}
}
Set<TimelineMetric> metrics = e.getMetrics();
metrics.remove(aggrMetric);
metrics.add(aggrMetric);
}
return e;
}
public TimelineEntity aggregateAllTo(TimelineEntity e,
String aggregationGroupId) {
for (TimelineMetric m : aggregateTable.keySet()) {
aggregateTo(m, e, aggregationGroupId);
}
return e;
}
}
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.collector;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class TestTimelineCollector {
private TimelineEntities generateTestEntities(int groups, int entities) {
TimelineEntities te = new TimelineEntities();
for (int j = 0; j < groups; j++) {
for (int i = 0; i < entities; i++) {
TimelineEntity entity = new TimelineEntity();
String containerId = "container_1000178881110_2002_" + i;
entity.setId(containerId);
String entityType = "TEST_" + j;
entity.setType(entityType);
long cTime = 1425016501000L;
entity.setCreatedTime(cTime);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric();
m1.setId("HDFS_BYTES_WRITE");
m1.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
long ts = System.currentTimeMillis();
m1.addValue(ts - 20000, 100L);
metrics.add(m1);
TimelineMetric m2 = new TimelineMetric();
m2.setId("VCORES_USED");
m2.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
m2.addValue(ts - 20000, 3L);
metrics.add(m2);
// m3 should not show up in the aggregation
TimelineMetric m3 = new TimelineMetric();
m3.setId("UNRELATED_VALUES");
m3.addValue(ts - 20000, 3L);
metrics.add(m3);
TimelineMetric m4 = new TimelineMetric();
m4.setId("TXN_FINISH_TIME");
m4.setRealtimeAggregationOp(TimelineMetricOperation.MAX);
m4.addValue(ts - 20000, i);
metrics.add(m4);
entity.addMetrics(metrics);
te.addEntity(entity);
}
}
return te;
}
@Test
public void testAggregation() throws Exception {
// Test aggregation with multiple groups.
int groups = 3;
int n = 50;
TimelineEntities testEntities = generateTestEntities(groups, n);
TimelineEntity resultEntity = TimelineCollector.aggregateEntities(
testEntities, "test_result", "TEST_AGGR", true);
assertEquals(resultEntity.getMetrics().size(), groups * 3);
for (int i = 0; i < groups; i++) {
Set<TimelineMetric> metrics = resultEntity.getMetrics();
for (TimelineMetric m : metrics) {
if (m.getId().startsWith("HDFS_BYTES_WRITE")) {
assertEquals(100 * n, m.getSingleDataValue().intValue());
} else if (m.getId().startsWith("VCORES_USED")) {
assertEquals(3 * n, m.getSingleDataValue().intValue());
} else if (m.getId().startsWith("TXN_FINISH_TIME")) {
assertEquals(n - 1, m.getSingleDataValue());
} else {
fail("Unrecognized metric! " + m.getId());
}
}
}
// Test aggregation with a single group.
TimelineEntities testEntities1 = generateTestEntities(1, n);
TimelineEntity resultEntity1 = TimelineCollector.aggregateEntities(
testEntities1, "test_result", "TEST_AGGR", false);
assertEquals(resultEntity1.getMetrics().size(), 3);
Set<TimelineMetric> metrics = resultEntity1.getMetrics();
for (TimelineMetric m : metrics) {
if (m.getId().equals("HDFS_BYTES_WRITE")) {
assertEquals(100 * n, m.getSingleDataValue().intValue());
} else if (m.getId().equals("VCORES_USED")) {
assertEquals(3 * n, m.getSingleDataValue().intValue());
} else if (m.getId().equals("TXN_FINISH_TIME")) {
assertEquals(n - 1, m.getSingleDataValue());
} else {
fail("Unrecognized metric! " + m.getId());
}
}
}
}

View File

@ -25,11 +25,15 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Test;
@ -51,6 +55,26 @@ public void testWriteEntityToFile() throws Exception {
entity.setCreatedTime(1425016501000L);
te.addEntity(entity);
TimelineMetric metric = new TimelineMetric();
String metricId = "CPU";
metric.setId(metricId);
metric.setType(TimelineMetric.Type.SINGLE_VALUE);
metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
metric.addValue(1425016501000L, 1234567L);
TimelineEntity entity2 = new TimelineEntity();
String id2 = "metric";
String type2 = "app";
entity2.setId(id2);
entity2.setType(type2);
entity2.setCreatedTime(1425016503000L);
entity2.addMetric(metric);
te.addEntity(entity2);
Map<String, TimelineMetric> aggregatedMetrics =
new HashMap<String, TimelineMetric>();
aggregatedMetrics.put(metricId, metric);
FileSystemTimelineWriterImpl fsi = null;
try {
fsi = new FileSystemTimelineWriterImpl();
@ -68,11 +92,27 @@ public void testWriteEntityToFile() throws Exception {
assertTrue(f.exists() && !f.isDirectory());
List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
// ensure there's only one entity + 1 new line
assertTrue(data.size() == 2);
assertTrue("data size is:" + data.size(), data.size() == 2);
String d = data.get(0);
// confirm the contents same as what was written
assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
// verify aggregated metrics
String fileName2 = fsi.getOutputRoot() +
"/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/"
+ type2 + "/" + id2 +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
Path path2 = Paths.get(fileName2);
File file = new File(fileName2);
assertTrue(file.exists() && !file.isDirectory());
List<String> data2 = Files.readAllLines(path2, StandardCharsets.UTF_8);
// ensure there's only one entity + 1 new line
assertTrue("data size is:" + data.size(), data2.size() == 2);
String metricToString = data2.get(0);
// confirm the contents same as what was written
assertEquals(metricToString,
TimelineUtils.dumpTimelineRecordtoJSON(entity2));
// delete the directory
File outputDir = new File(fsi.getOutputRoot());
FileUtils.deleteDirectory(outputDir);
@ -84,4 +124,5 @@ public void testWriteEntityToFile() throws Exception {
}
}
}
}

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@ -539,6 +540,26 @@ public void testWriteApplicationToHBase() throws Exception {
metrics.add(m1);
entity.addMetrics(metrics);
// add aggregated metrics
TimelineEntity aggEntity = new TimelineEntity();
String type = TimelineEntityType.YARN_APPLICATION.toString();
aggEntity.setId(appId);
aggEntity.setType(type);
long cTime2 = 1425016502000L;
long mTime2 = 1425026902000L;
aggEntity.setCreatedTime(cTime2);
TimelineMetric aggMetric = new TimelineMetric();
aggMetric.setId("MEM_USAGE");
Map<Long, Number> aggMetricValues = new HashMap<Long, Number>();
ts = System.currentTimeMillis();
aggMetricValues.put(ts - 120000, 102400000);
aggMetric.setType(Type.SINGLE_VALUE);
aggMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
aggMetric.setValues(aggMetricValues);
Set<TimelineMetric> aggMetrics = new HashSet<>();
aggMetrics.add(aggMetric);
entity.addMetrics(aggMetrics);
te.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
@ -564,7 +585,7 @@ public void testWriteApplicationToHBase() throws Exception {
Result result = new ApplicationTable().getResult(c1, conn, get);
assertTrue(result != null);
assertEquals(15, result.size());
assertEquals(16, result.size());
// check the row key
byte[] row1 = result.getRow();
@ -652,10 +673,17 @@ public void testWriteApplicationToHBase() throws Exception {
assertEquals(conf, conf2);
Set<TimelineMetric> metrics2 = e1.getMetrics();
assertEquals(metrics, metrics2);
assertEquals(2, metrics2.size());
for (TimelineMetric metric2 : metrics2) {
Map<Long, Number> metricValues2 = metric2.getValues();
matchMetrics(metricValues, metricValues2);
assertTrue(metric2.getId().equals("MAP_SLOT_MILLIS") ||
metric2.getId().equals("MEM_USAGE"));
if (metric2.getId().equals("MAP_SLOT_MILLIS")) {
matchMetrics(metricValues, metricValues2);
}
if (metric2.getId().equals("MEM_USAGE")) {
matchMetrics(aggMetricValues, metricValues2);
}
}
} finally {
if (hbi != null) {
@ -724,7 +752,6 @@ public void testWriteEntityToHBase() throws Exception {
m1.setValues(metricValues);
metrics.add(m1);
entity.addMetrics(metrics);
te.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;