From a9fab9b644e636c1f1b2632130d4eaea70111f16 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Mon, 27 Jul 2015 15:50:28 -0700 Subject: [PATCH] YARN-3908. Fixed bugs in HBaseTimelineWriterImpl. Contributed by Vrushali C and Sangjin Lee. (cherry picked from commit df0ec473a84871b0effd7ca6faac776210d7df09) --- .../timelineservice/TimelineEvent.java | 4 +- .../storage/HBaseTimelineWriterImpl.java | 18 +++++- .../storage/common/ColumnHelper.java | 21 ++++--- .../storage/common/ColumnPrefix.java | 7 ++- .../storage/common/Separator.java | 7 +++ .../storage/entity/EntityColumnPrefix.java | 15 +++-- .../storage/entity/EntityTable.java | 6 +- .../storage/TestHBaseTimelineWriterImpl.java | 56 +++++++++++++++++-- 8 files changed, 108 insertions(+), 26 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java index 1dbf7e527b..a563658894 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java @@ -33,6 +33,8 @@ import java.util.Map; @InterfaceAudience.Public @InterfaceStability.Unstable public class TimelineEvent implements Comparable { + public static final long INVALID_TIMESTAMP = 0L; + private String id; private HashMap info = new HashMap<>(); private long timestamp; @@ -83,7 +85,7 @@ public class TimelineEvent implements Comparable { } public boolean isValid() { - return (id != null && timestamp != 0L); + return (id != null && timestamp != INVALID_TIMESTAMP); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 876ad6ab34..cd2e76e8f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -141,6 +141,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null, te.getModifiedTime()); EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); + Map info = te.getInfo(); + if (info != null) { + for (Map.Entry entry : info.entrySet()) { + EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(), + null, entry.getValue()); + } + } } /** @@ -186,6 +193,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements if (event != null) { String eventId = event.getId(); if (eventId != null) { + long eventTimestamp = event.getTimestamp(); + // if the timestamp is not set, use the current timestamp + if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) { + LOG.warn("timestamp is not set for event " + eventId + + "! Using the current timestamp"); + eventTimestamp = System.currentTimeMillis(); + } Map eventInfo = event.getInfo(); if (eventInfo != null) { for (Map.Entry info : eventInfo.entrySet()) { @@ -198,8 +212,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements // convert back to string to avoid additional API on store. String compoundColumnQualifier = Bytes.toString(compoundColumnQualifierBytes); - EntityColumnPrefix.METRIC.store(rowKey, entityTable, - compoundColumnQualifier, null, info.getValue()); + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + compoundColumnQualifier, eventTimestamp, info.getValue()); } // for info: eventInfo } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index 6a204dc5e7..a9029245a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -113,19 +113,22 @@ public class ColumnHelper { } /** - * @param result from which to reads timeseries data + * @param result from which to reads data with timestamps * @param columnPrefixBytes optional prefix to limit columns. If null all * columns are returned. + * @param the type of the values. The values will be cast into that type. * @return the cell values at each respective time in for form * {idA={timestamp1->value1}, idA={timestamp2->value2}, * idB={timestamp3->value3}, idC={timestamp1->value4}} * @throws IOException */ - public NavigableMap> readTimeseriesResults( - Result result, byte[] columnPrefixBytes) throws IOException { + @SuppressWarnings("unchecked") + public NavigableMap> + readResultsWithTimestamps(Result result, byte[] columnPrefixBytes) + throws IOException { - NavigableMap> results = - new TreeMap>(); + NavigableMap> results = + new TreeMap>(); if (result != null) { NavigableMap>> resultMap = @@ -157,13 +160,13 @@ public class ColumnHelper { // If this column has the prefix we want if (columnName != null) { - NavigableMap cellResults = - new TreeMap(); + NavigableMap cellResults = + new TreeMap(); NavigableMap cells = entry.getValue(); if (cells != null) { for (Entry cell : cells.entrySet()) { - Number value = - (Number) GenericObjectMapper.read(cell.getValue()); + V value = + (V) GenericObjectMapper.read(cell.getValue()); cellResults.put(cell.getKey(), value); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java index 2eedea073f..671c824920 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -72,12 +72,13 @@ public interface ColumnPrefix { public Map readResults(Result result) throws IOException; /** - * @param result from which to reads timeseries data + * @param result from which to reads data with timestamps + * @param the type of the values. The values will be cast into that type. * @return the cell values at each respective time in for form * {idA={timestamp1->value1}, idA={timestamp2->value2}, * idB={timestamp3->value3}, idC={timestamp1->value4}} * @throws IOException */ - public NavigableMap> readTimeseriesResults( - Result result) throws IOException; + public NavigableMap> + readResultsWithTimestamps(Result result) throws IOException; } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java index ee578900ad..3319419ef8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java @@ -89,6 +89,13 @@ public enum Separator { this.quotedValue = Pattern.quote(value); } + /** + * @return the original value of the separator + */ + public String getValue() { + return value; + } + /** * Used to make token safe to be used with this separator without collisions. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 4459868c76..8b7bc3e6b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -44,6 +44,11 @@ public enum EntityColumnPrefix implements ColumnPrefix { */ RELATES_TO(EntityColumnFamily.INFO, "r"), + /** + * To store TimelineEntity info values. + */ + INFO(EntityColumnFamily.INFO, "i"), + /** * Lifecycle events for an entity */ @@ -92,7 +97,7 @@ public enum EntityColumnPrefix implements ColumnPrefix { /** * @return the column name value */ - private String getColumnPrefix() { + public String getColumnPrefix() { return columnPrefix; } @@ -150,11 +155,11 @@ public enum EntityColumnPrefix implements ColumnPrefix { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readTimeseriesResults(org.apache.hadoop.hbase.client.Result) + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) */ - public NavigableMap> readTimeseriesResults( - Result result) throws IOException { - return column.readTimeseriesResults(result, columnPrefixBytes); + public NavigableMap> + readResultsWithTimestamps(Result result) throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java index 61f7c4c118..2ae7d39c9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java @@ -54,7 +54,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti * | | modified_time: | | | * | | 1392995081012 | metricId2: | | * | | | metricValue1 | | - * | | r!relatesToKey: | @timestamp2 | | + * | | i!infoKey: | @timestamp2 | | + * | | infoValue | | | + * | | | | | + * | | r!relatesToKey: | | | * | | id3?id4?id5 | | | * | | | | | * | | s!isRelatedToKey | | | @@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti * | | | | | * | | e!eventId?eventInfoKey: | | | * | | eventInfoValue | | | + * | | @timestamp | | | * | | | | | * | | flowVersion: | | | * | | versionValue | | | diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java index 6abf2405a7..31cb5d2e7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -43,8 +43,10 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; @@ -84,6 +86,12 @@ public class TestHBaseTimelineWriterImpl { entity.setCreatedTime(cTime); entity.setModifiedTime(mTime); + // add the info map in Timeline Entity + Map infoMap = new HashMap(); + infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("infoMapKey2", 10); + entity.addInfo(infoMap); + // add the isRelatedToEntity info String key = "task"; String value = "is_related_to_entity_id_here"; @@ -177,6 +185,14 @@ public class TestHBaseTimelineWriterImpl { Long mTime1 = val.longValue(); assertEquals(mTime1, mTime); + Map infoColumns = + EntityColumnPrefix.INFO.readResults(result); + assertEquals(infoMap.size(), infoColumns.size()); + for (String infoItem : infoMap.keySet()) { + assertEquals(infoMap.get(infoItem), + infoColumns.get(infoItem)); + } + // Remember isRelatedTo is of type Map> for (String isRelatedToKey : isRelatedTo.keySet()) { Object isRelatedToValue = @@ -219,7 +235,7 @@ public class TestHBaseTimelineWriterImpl { } NavigableMap> metricsResult = - EntityColumnPrefix.METRIC.readTimeseriesResults(result); + EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); NavigableMap metricMap = metricsResult.get(m1.getId()); // We got metrics back @@ -237,7 +253,7 @@ public class TestHBaseTimelineWriterImpl { } } assertEquals(1, rowCount); - assertEquals(15, colCount); + assertEquals(17, colCount); } finally { hbi.stop(); @@ -267,13 +283,18 @@ public class TestHBaseTimelineWriterImpl { private void testAdditionalEntity() throws IOException { TimelineEvent event = new TimelineEvent(); - event.setId("foo_event_id"); - event.setTimestamp(System.currentTimeMillis()); - event.addInfo("foo_event", "test"); + String eventId = "foo_event_id"; + event.setId(eventId); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); final TimelineEntity entity = new TimelineEntity(); entity.setId("attempt_1329348432655_0001_m_000008_18"); entity.setType("FOO_ATTEMPT"); + entity.addEvent(event); TimelineEntities entities = new TimelineEntities(); entities.addEntity(entity); @@ -304,6 +325,31 @@ public class TestHBaseTimelineWriterImpl { for (Result result : scanner) { if (result != null && !result.isEmpty()) { rowCount++; + + // check the row key + byte[] row1 = result.getRow(); + assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, + entity)); + + // check the events + NavigableMap> eventsResult = + EntityColumnPrefix.EVENT.readResultsWithTimestamps(result); + // there should be only one event + assertEquals(1, eventsResult.size()); + // key name for the event + String valueKey = eventId + Separator.VALUES.getValue() + expKey; + for (Map.Entry> e : + eventsResult.entrySet()) { + // the value key must match + assertEquals(valueKey, e.getKey()); + NavigableMap value = e.getValue(); + // there should be only one timestamp and value + assertEquals(1, value.size()); + for (Map.Entry e2: value.entrySet()) { + assertEquals(expTs, e2.getKey()); + assertEquals(expVal, e2.getValue()); + } + } } } assertEquals(1, rowCount);