diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index 302f8e034d..b589206928 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -36,6 +36,7 @@ import javax.ws.rs.core.MediaType; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -334,16 +336,21 @@ private static void loadData() throws Exception { HBaseTimelineWriterImpl hbi = null; Configuration c1 = getHBaseTestingUtility().getConfiguration(); + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser(user); try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); - hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te); - hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1); - hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4); - hbi.write(cluster, user, flow2, - flowVersion2, runid2, entity3.getId(), te3); - hbi.write(cluster, user, flow, flowVersion, runid, - "application_1111111111_1111", userEntities); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, entity.getId()), te, remoteUser); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, entity1.getId()), te1, remoteUser); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid1, entity4.getId()), te4, remoteUser); + hbi.write(new TimelineCollectorContext(cluster, user, flow2, flowVersion2, + runid2, entity3.getId()), te3, remoteUser); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, "application_1111111111_1111"), userEntities, remoteUser); writeApplicationEntities(hbi, ts); hbi.flush(); } finally { @@ -375,8 +382,9 @@ static void writeApplicationEntities(HBaseTimelineWriterImpl hbi, appEntity.addEvent(finished); te.addEntity(appEntity); - hbi.write("cluster1", "user1", "flow1", "CF7022C10F1354", i, - appEntity.getId(), te); + hbi.write(new TimelineCollectorContext("cluster1", "user1", "flow1", + "CF7022C10F1354", i, appEntity.getId()), te, + UserGroupInformation.createRemoteUser("user1")); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java index 926d8bb157..cf6a854424 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; /** * Utility class that creates the schema and generates test data. @@ -155,17 +157,20 @@ public static void loadApps(HBaseTestingUtility util, long ts) hbi = new HBaseTimelineWriterImpl(); hbi.init(util.getConfiguration()); hbi.start(); - String cluster = "cluster1"; - String user = "user1"; - String flow = "some_flow_name"; - String flowVersion = "AB7822C10F1111"; - long runid = 1002345678919L; - String appName = "application_1111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); - appName = "application_1111111111_3333"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); - appName = "application_1111111111_4444"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te2); + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser("user1"); + hbi.write( + new TimelineCollectorContext("cluster1", "user1", "some_flow_name", + "AB7822C10F1111", 1002345678919L, "application_1111111111_2222"), + te, remoteUser); + hbi.write( + new TimelineCollectorContext("cluster1", "user1", "some_flow_name", + "AB7822C10F1111", 1002345678919L, "application_1111111111_3333"), + te1, remoteUser); + hbi.write( + new TimelineCollectorContext("cluster1", "user1", "some_flow_name", + "AB7822C10F1111", 1002345678919L, "application_1111111111_4444"), + te2, remoteUser); hbi.stop(); } finally { if (hbi != null) { @@ -433,15 +438,19 @@ public static void loadEntities(HBaseTestingUtility util, long ts) hbi = new HBaseTimelineWriterImpl(); hbi.init(util.getConfiguration()); hbi.start(); - String cluster = "cluster1"; - String user = "user1"; - String flow = "some_flow_name"; - String flowVersion = "AB7822C10F1111"; - long runid = 1002345678919L; - hbi.write(cluster, user, flow, flowVersion, runid, appName1, te); - hbi.write(cluster, user, flow, flowVersion, runid, appName2, te); - hbi.write(cluster, user, flow, flowVersion, runid, appName1, appTe1); - hbi.write(cluster, user, flow, flowVersion, runid, appName2, appTe2); + + UserGroupInformation user = + UserGroupInformation.createRemoteUser("user1"); + TimelineCollectorContext context = + new TimelineCollectorContext("cluster1", "user1", "some_flow_name", + "AB7822C10F1111", 1002345678919L, appName1); + hbi.write(context, te, user); + hbi.write(context, appTe1, user); + + context = new TimelineCollectorContext("cluster1", "user1", + "some_flow_name", "AB7822C10F1111", 1002345678919L, appName2); + hbi.write(context, te, user); + hbi.write(context, appTe2, user); hbi.stop(); } finally { if (hbi != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java index b2271853cd..111008a5d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; @@ -162,7 +164,8 @@ public void testWriteNullApplicationToHBase() throws Exception { String flow = null; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; - hbi.write(cluster, user, flow, flowVersion, runid, appId, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appId), te, UserGroupInformation.createRemoteUser(user)); hbi.stop(); // retrieve the row @@ -280,7 +283,8 @@ public void testWriteApplicationToHBase() throws Exception { String flow = "s!ome_f\tlow _n am!e"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; - hbi.write(cluster, user, flow, flowVersion, runid, appId, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appId), te, UserGroupInformation.createRemoteUser(user)); // Write entity again, this time without created time. entity = new ApplicationEntity(); @@ -292,7 +296,8 @@ public void testWriteApplicationToHBase() throws Exception { entity.addInfo(infoMap1); te = new TimelineEntities(); te.addEntity(entity); - hbi.write(cluster, user, flow, flowVersion, runid, appId, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appId), te, UserGroupInformation.createRemoteUser(user)); hbi.stop(); infoMap.putAll(infoMap1); @@ -514,7 +519,9 @@ public void testEvents() throws IOException { String flowVersion = "1111F01C2287BA"; long runid = 1009876543218L; String appName = "application_123465899910_1001"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), entities, + UserGroupInformation.createRemoteUser(user)); hbi.stop(); // retrieve the row @@ -629,14 +636,17 @@ public void testNonIntegralMetricValues() throws IOException { hbi.init(c1); hbi.start(); // Writing application entity. + TimelineCollectorContext context = new TimelineCollectorContext("c1", + "u1", "f1", "v1", 1002345678919L, appId); + UserGroupInformation user = UserGroupInformation.createRemoteUser("u1"); try { - hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teApp); + hbi.write(context, teApp, user); Assert.fail("Expected an exception as metric values are non integral"); } catch (IOException e) {} // Writing generic entity. try { - hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teEntity); + hbi.write(context, teEntity, user); Assert.fail("Expected an exception as metric values are non integral"); } catch (IOException e) {} hbi.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java index 37560916e8..5e089992ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; @@ -72,6 +74,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -197,13 +204,16 @@ public void testWriteEntityToHBase() throws Exception { hbi.start(); String cluster = "cluster_test_write_entity"; String user = "user1"; + String subAppUser = "subAppUser1"; String flow = "some_flow_name"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; String appName = HBaseTimelineStorageUtils.convertApplicationIdToString( ApplicationId.newInstance(System.currentTimeMillis() + 9000000L, 1) ); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, + UserGroupInformation.createRemoteUser(subAppUser)); hbi.stop(); // scan the table and see that entity exists @@ -354,6 +364,11 @@ public void testWriteEntityToHBase() throws Exception { assertEquals(metricValues.get(ts - 20000), metric.getValues().get(ts - 20000)); } + + // verify for sub application table entities. + verifySubApplicationTableEntities(cluster, user, flow, flowVersion, runid, + appName, subAppUser, c1, entity, id, type, infoMap, isRelatedTo, + relatesTo, conf, metricValues, metrics, cTime, m1); } finally { if (hbi != null) { hbi.stop(); @@ -362,6 +377,98 @@ public void testWriteEntityToHBase() throws Exception { } } + private void verifySubApplicationTableEntities(String cluster, String user, + String flow, String flowVersion, Long runid, String appName, + String subAppUser, Configuration c1, TimelineEntity entity, String id, + String type, Map infoMap, + Map> isRelatedTo, Map> relatesTo, + Map conf, Map metricValues, + Set metrics, Long cTime, TimelineMetric m1) + throws IOException { + Scan s = new Scan(); + // read from SubApplicationTable + byte[] startRow = new SubApplicationRowKeyPrefix(cluster, subAppUser, null, + null, null, null).getRowKeyPrefix(); + s.setStartRow(startRow); + s.setMaxVersions(Integer.MAX_VALUE); + Connection conn = ConnectionFactory.createConnection(c1); + ResultScanner scanner = + new SubApplicationTable().getResultScanner(c1, conn, s); + + int rowCount = 0; + int colCount = 0; + KeyConverter stringKeyConverter = new StringKeyConverter(); + for (Result result : scanner) { + if (result != null && !result.isEmpty()) { + rowCount++; + colCount += result.size(); + byte[] row1 = result.getRow(); + assertTrue(verifyRowKeyForSubApplication(row1, subAppUser, cluster, + user, entity)); + + // check info column family + String id1 = SubApplicationColumn.ID.readResult(result).toString(); + assertEquals(id, id1); + + String type1 = SubApplicationColumn.TYPE.readResult(result).toString(); + assertEquals(type, type1); + + Long cTime1 = + (Long) SubApplicationColumn.CREATED_TIME.readResult(result); + assertEquals(cTime1, cTime); + + Map infoColumns = SubApplicationColumnPrefix.INFO + .readResults(result, new StringKeyConverter()); + assertEquals(infoMap, infoColumns); + + // Remember isRelatedTo is of type Map> + for (Map.Entry> isRelatedToEntry : isRelatedTo + .entrySet()) { + Object isRelatedToValue = SubApplicationColumnPrefix.IS_RELATED_TO + .readResult(result, isRelatedToEntry.getKey()); + String compoundValue = isRelatedToValue.toString(); + // id7?id9?id6 + Set isRelatedToValues = + new HashSet(Separator.VALUES.splitEncoded(compoundValue)); + assertEquals(isRelatedTo.get(isRelatedToEntry.getKey()).size(), + isRelatedToValues.size()); + for (String v : isRelatedToEntry.getValue()) { + assertTrue(isRelatedToValues.contains(v)); + } + } + + // RelatesTo + for (Map.Entry> relatesToEntry : relatesTo + .entrySet()) { + String compoundValue = SubApplicationColumnPrefix.RELATES_TO + .readResult(result, relatesToEntry.getKey()).toString(); + // id3?id4?id5 + Set relatesToValues = + new HashSet(Separator.VALUES.splitEncoded(compoundValue)); + assertEquals(relatesTo.get(relatesToEntry.getKey()).size(), + relatesToValues.size()); + for (String v : relatesToEntry.getValue()) { + assertTrue(relatesToValues.contains(v)); + } + } + + // Configuration + Map configColumns = SubApplicationColumnPrefix.CONFIG + .readResults(result, stringKeyConverter); + assertEquals(conf, configColumns); + + NavigableMap> metricsResult = + SubApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result, + stringKeyConverter); + + NavigableMap metricMap = metricsResult.get(m1.getId()); + matchMetrics(metricValues, metricMap); + } + } + assertEquals(1, rowCount); + assertEquals(16, colCount); + } + private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, String flow, Long runid, String appName, TimelineEntity te) { @@ -409,7 +516,9 @@ public void testEventsWithEmptyInfo() throws IOException { byte[] startRow = new EntityRowKeyPrefix(cluster, user, flow, runid, appName) .getRowKeyPrefix(); - hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), entities, + UserGroupInformation.createRemoteUser(user)); hbi.stop(); // scan the table and see that entity exists Scan s = new Scan(); @@ -514,7 +623,9 @@ public void testEventsEscapeTs() throws IOException { String flowVersion = "1111F01C2287BA"; long runid = 1009876543218L; String appName = "application_123465899910_2001"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), entities, + UserGroupInformation.createRemoteUser(user)); hbi.stop(); // read the timeline entity using the reader this time @@ -1762,4 +1873,15 @@ public void testListTypesInApp() throws Exception { public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster(); } + + private boolean verifyRowKeyForSubApplication(byte[] rowKey, String suAppUser, + String cluster, String user, TimelineEntity te) { + SubApplicationRowKey key = SubApplicationRowKey.parseRowKey(rowKey); + assertEquals(suAppUser, key.getSubAppUserId()); + assertEquals(cluster, key.getClusterId()); + assertEquals(te.getType(), key.getEntityType()); + assertEquals(te.getId(), key.getEntityId()); + assertEquals(user, key.getUserId()); + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 0923105547..4bf221e2ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -38,12 +38,14 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; @@ -117,13 +119,18 @@ public void testWriteFlowRunMinMax() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser(user); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, remoteUser); // write another entity with the right min start time te = new TimelineEntities(); te.addEntity(entityMinStartTime); appName = "application_100000000000_3333"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, remoteUser); // writer another entity for max end time TimelineEntity entityMaxEndTime = TestFlowDataGenerator @@ -131,7 +138,8 @@ public void testWriteFlowRunMinMax() throws Exception { te = new TimelineEntities(); te.addEntity(entityMaxEndTime); appName = "application_100000000000_4444"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, remoteUser); // writer another entity with greater start time TimelineEntity entityGreaterStartTime = TestFlowDataGenerator @@ -139,7 +147,8 @@ public void testWriteFlowRunMinMax() throws Exception { te = new TimelineEntities(); te.addEntity(entityGreaterStartTime); appName = "application_1000000000000000_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, remoteUser); // flush everything to hbase hbi.flush(); @@ -227,7 +236,8 @@ public void testWriteFlowActivityOneFlow() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); String appName = "application_1111999999_1234"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, UserGroupInformation.createRemoteUser(user)); hbi.flush(); } finally { if (hbi != null) { @@ -340,20 +350,27 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); + + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser(user); + String appName = "application_11888888888_1111"; - hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion1, + runid1, appName), te, remoteUser); // write an application with to this flow but a different runid/ version te = new TimelineEntities(); te.addEntity(entityApp1); appName = "application_11888888888_2222"; - hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion2, + runid2, appName), te, remoteUser); // write an application with to this flow but a different runid/ version te = new TimelineEntities(); te.addEntity(entityApp1); appName = "application_11888888888_3333"; - hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion3, + runid3, appName), te, remoteUser); hbi.flush(); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index acfdc4df98..1ad02e1886 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -43,11 +43,13 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; @@ -181,13 +183,18 @@ public void testWriteFlowRunMinMax() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser(user); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, remoteUser); // write another entity with the right min start time te = new TimelineEntities(); te.addEntity(entityMinStartTime); appName = "application_100000000000_3333"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, remoteUser); // writer another entity for max end time TimelineEntity entityMaxEndTime = TestFlowDataGenerator @@ -195,7 +202,8 @@ public void testWriteFlowRunMinMax() throws Exception { te = new TimelineEntities(); te.addEntity(entityMaxEndTime); appName = "application_100000000000_4444"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, remoteUser); // writer another entity with greater start time TimelineEntity entityGreaterStartTime = TestFlowDataGenerator @@ -203,7 +211,8 @@ public void testWriteFlowRunMinMax() throws Exception { te = new TimelineEntities(); te.addEntity(entityGreaterStartTime); appName = "application_1000000000000000_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, remoteUser); // flush everything to hbase hbi.flush(); @@ -287,15 +296,19 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser(user); String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, remoteUser); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, remoteUser); hbi.flush(); } finally { if (hbi != null) { @@ -556,15 +569,22 @@ public void testWriteFlowRunMetricsPrefix() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser(user); + String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + 1002345678919L, appName), te, + remoteUser); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + 1002345678918L, appName), te, + remoteUser); hbi.flush(); } finally { if (hbi != null) { @@ -643,15 +663,20 @@ public void testWriteFlowRunsMetricFields() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser(user); + String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, remoteUser); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te, remoteUser); hbi.flush(); } finally { if (hbi != null) { @@ -737,6 +762,8 @@ public void testWriteFlowRunFlush() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser(user); for (int i = start; i < count; i++) { String appName = "application_1060350000000_" + appIdSuffix; @@ -746,7 +773,8 @@ public void testWriteFlowRunFlush() throws Exception { te1.addEntity(entityApp1); entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); te1.addEntity(entityApp2); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te1, remoteUser); Thread.sleep(1); appName = "application_1001199480000_7" + appIdSuffix; @@ -758,7 +786,9 @@ public void testWriteFlowRunFlush() throws Exception { entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); te1.addEntity(entityApp2); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te1, + remoteUser); if (i % 1000 == 0) { hbi.flush(); checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, @@ -826,16 +856,23 @@ public void testFilterFlowRunsByCreatedTime() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); - hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, - "application_11111111111111_1111", te); + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser(user); + + hbi.write( + new TimelineCollectorContext(cluster, user, flow, "CF7022C10F1354", + 1002345678919L, "application_11111111111111_1111"), + te, remoteUser); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( System.currentTimeMillis()); entityApp2.setCreatedTime(1425016502000L); te.addEntity(entityApp2); - hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, - "application_11111111111111_2222", te); + hbi.write( + new TimelineCollectorContext(cluster, user, flow, "CF7022C10F1354", + 1002345678918L, "application_11111111111111_2222"), + te, remoteUser); hbi.flush(); } finally { if (hbi != null) { @@ -911,15 +948,22 @@ public void testMetricFilters() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); - hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, - "application_11111111111111_1111", te); + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser(user); + + hbi.write( + new TimelineCollectorContext(cluster, user, flow, "CF7022C10F1354", + 1002345678919L, "application_11111111111111_1111"), + te, remoteUser); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( System.currentTimeMillis()); te.addEntity(entityApp2); - hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, - "application_11111111111111_2222", te); + hbi.write( + new TimelineCollectorContext(cluster, user, flow, "CF7022C10F1354", + 1002345678918L, "application_11111111111111_2222"), + te, remoteUser); hbi.flush(); } finally { if (hbi != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index fa9d02973f..0ef8260dee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -48,8 +48,10 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; @@ -280,9 +282,12 @@ public void testWriteFlowRunCompaction() throws Exception { Configuration c1 = util.getConfiguration(); TimelineEntities te1 = null; TimelineEntity entityApp1 = null; + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser(user); try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); + // now insert count * ( 100 + 100) metrics // each call to getEntityMetricsApp1 brings back 100 values // of metric1 and 100 of metric2 @@ -292,14 +297,16 @@ public void testWriteFlowRunCompaction() throws Exception { te1 = new TimelineEntities(); entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1); te1.addEntity(entityApp1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te1, remoteUser); appName = "application_2048000000000_7" + appIdSuffix; insertTs++; te1 = new TimelineEntities(); entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs); te1.addEntity(entityApp1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te1, remoteUser); } } finally { String appName = "application_10240000000000_" + appIdSuffix; @@ -308,7 +315,8 @@ public void testWriteFlowRunCompaction() throws Exception { insertTs + 1, c1); te1.addEntity(entityApp1); if (hbi != null) { - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, + runid, appName), te1, remoteUser); hbi.flush(); hbi.close(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index df33024b2d..9e9134cf3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; @@ -63,6 +65,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,6 +91,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements private TypedBufferedMutator applicationTable; private TypedBufferedMutator flowActivityTable; private TypedBufferedMutator flowRunTable; + private TypedBufferedMutator subApplicationTable; /** * Used to convert strings key components to and from storage format. @@ -97,6 +104,10 @@ public class HBaseTimelineWriterImpl extends AbstractService implements */ private final KeyConverter longKeyConverter = new LongKeyConverter(); + private enum Tables { + APPLICATION_TABLE, ENTITY_TABLE, SUBAPPLICATION_TABLE + }; + public HBaseTimelineWriterImpl() { super(HBaseTimelineWriterImpl.class.getName()); } @@ -116,17 +127,28 @@ protected void serviceInit(Configuration conf) throws Exception { flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn); flowActivityTable = new FlowActivityTable().getTableMutator(hbaseConf, conn); + subApplicationTable = + new SubApplicationTable().getTableMutator(hbaseConf, conn); } /** * Stores the entire information in TimelineEntities to the timeline store. */ @Override - public TimelineWriteResponse write(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities data) throws IOException { + public TimelineWriteResponse write(TimelineCollectorContext context, + TimelineEntities data, UserGroupInformation callerUgi) + throws IOException { TimelineWriteResponse putStatus = new TimelineWriteResponse(); + + String clusterId = context.getClusterId(); + String userId = context.getUserId(); + String flowName = context.getFlowName(); + String flowVersion = context.getFlowVersion(); + long flowRunId = context.getFlowRunId(); + String appId = context.getAppId(); + String subApplicationUser = callerUgi.getShortUserName(); + // defensive coding to avoid NPE during row key construction if ((flowName == null) || (appId == null) || (clusterId == null) || (userId == null)) { @@ -152,18 +174,22 @@ public TimelineWriteResponse write(String clusterId, String userId, new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId); rowKey = applicationRowKey.getRowKey(); + store(rowKey, te, flowVersion, Tables.APPLICATION_TABLE); } else { EntityRowKey entityRowKey = new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, te.getType(), te.getIdPrefix(), te.getId()); rowKey = entityRowKey.getRowKey(); + store(rowKey, te, flowVersion, Tables.ENTITY_TABLE); } - storeInfo(rowKey, te, flowVersion, isApplication); - storeEvents(rowKey, te.getEvents(), isApplication); - storeConfig(rowKey, te.getConfigs(), isApplication); - storeMetrics(rowKey, te.getMetrics(), isApplication); - storeRelations(rowKey, te, isApplication); + if (!isApplication && !userId.equals(subApplicationUser)) { + SubApplicationRowKey subApplicationRowKey = + new SubApplicationRowKey(subApplicationUser, clusterId, + te.getType(), te.getIdPrefix(), te.getId(), userId); + rowKey = subApplicationRowKey.getRowKey(); + store(rowKey, te, flowVersion, Tables.SUBAPPLICATION_TABLE); + } if (isApplication) { TimelineEvent event = @@ -304,72 +330,108 @@ private void storeFlowMetrics(byte[] rowKey, Set metrics, } } - private void storeRelations(byte[] rowKey, TimelineEntity te, - boolean isApplication) throws IOException { - if (isApplication) { - storeRelations(rowKey, te.getIsRelatedToEntities(), - ApplicationColumnPrefix.IS_RELATED_TO, applicationTable); - storeRelations(rowKey, te.getRelatesToEntities(), - ApplicationColumnPrefix.RELATES_TO, applicationTable); - } else { - storeRelations(rowKey, te.getIsRelatedToEntities(), - EntityColumnPrefix.IS_RELATED_TO, entityTable); - storeRelations(rowKey, te.getRelatesToEntities(), - EntityColumnPrefix.RELATES_TO, entityTable); - } - } - /** * Stores the Relations from the {@linkplain TimelineEntity} object. */ private void storeRelations(byte[] rowKey, - Map> connectedEntities, - ColumnPrefix columnPrefix, TypedBufferedMutator table) - throws IOException { - for (Map.Entry> connectedEntity : connectedEntities - .entrySet()) { - // id3?id4?id5 - String compoundValue = - Separator.VALUES.joinEncoded(connectedEntity.getValue()); - columnPrefix.store(rowKey, table, - stringKeyConverter.encode(connectedEntity.getKey()), null, - compoundValue); + Map> connectedEntities, ColumnPrefix columnPrefix, + TypedBufferedMutator table) throws IOException { + if (connectedEntities != null) { + for (Map.Entry> connectedEntity : connectedEntities + .entrySet()) { + // id3?id4?id5 + String compoundValue = + Separator.VALUES.joinEncoded(connectedEntity.getValue()); + columnPrefix.store(rowKey, table, + stringKeyConverter.encode(connectedEntity.getKey()), null, + compoundValue); + } } } /** * Stores information from the {@linkplain TimelineEntity} object. */ - private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, - boolean isApplication) throws IOException { - - if (isApplication) { + private void store(byte[] rowKey, TimelineEntity te, + String flowVersion, + Tables table) throws IOException { + switch (table) { + case APPLICATION_TABLE: ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId()); ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null, te.getCreatedTime()); ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null, flowVersion); - Map info = te.getInfo(); - if (info != null) { - for (Map.Entry entry : info.entrySet()) { - ApplicationColumnPrefix.INFO.store(rowKey, applicationTable, - stringKeyConverter.encode(entry.getKey()), null, - entry.getValue()); - } - } - } else { + storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO, + applicationTable); + storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC, + applicationTable); + storeEvents(rowKey, te.getEvents(), ApplicationColumnPrefix.EVENT, + applicationTable); + storeConfig(rowKey, te.getConfigs(), ApplicationColumnPrefix.CONFIG, + applicationTable); + storeRelations(rowKey, te.getIsRelatedToEntities(), + ApplicationColumnPrefix.IS_RELATED_TO, applicationTable); + storeRelations(rowKey, te.getRelatesToEntities(), + ApplicationColumnPrefix.RELATES_TO, applicationTable); + break; + case ENTITY_TABLE: EntityColumn.ID.store(rowKey, entityTable, null, te.getId()); EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, te.getCreatedTime()); EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); - Map info = te.getInfo(); - if (info != null) { - for (Map.Entry entry : info.entrySet()) { - EntityColumnPrefix.INFO.store(rowKey, entityTable, - stringKeyConverter.encode(entry.getKey()), null, - entry.getValue()); - } + storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO, + entityTable); + storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC, + entityTable); + storeEvents(rowKey, te.getEvents(), EntityColumnPrefix.EVENT, + entityTable); + storeConfig(rowKey, te.getConfigs(), EntityColumnPrefix.CONFIG, + entityTable); + storeRelations(rowKey, te.getIsRelatedToEntities(), + EntityColumnPrefix.IS_RELATED_TO, entityTable); + storeRelations(rowKey, te.getRelatesToEntities(), + EntityColumnPrefix.RELATES_TO, entityTable); + break; + case SUBAPPLICATION_TABLE: + SubApplicationColumn.ID.store(rowKey, subApplicationTable, null, + te.getId()); + SubApplicationColumn.TYPE.store(rowKey, subApplicationTable, null, + te.getType()); + SubApplicationColumn.CREATED_TIME.store(rowKey, subApplicationTable, null, + te.getCreatedTime()); + SubApplicationColumn.FLOW_VERSION.store(rowKey, subApplicationTable, null, + flowVersion); + storeInfo(rowKey, te.getInfo(), flowVersion, + SubApplicationColumnPrefix.INFO, subApplicationTable); + storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC, + subApplicationTable); + storeEvents(rowKey, te.getEvents(), SubApplicationColumnPrefix.EVENT, + subApplicationTable); + storeConfig(rowKey, te.getConfigs(), SubApplicationColumnPrefix.CONFIG, + subApplicationTable); + storeRelations(rowKey, te.getIsRelatedToEntities(), + SubApplicationColumnPrefix.IS_RELATED_TO, subApplicationTable); + storeRelations(rowKey, te.getRelatesToEntities(), + SubApplicationColumnPrefix.RELATES_TO, subApplicationTable); + break; + default: + LOG.info("Invalid table name provided."); + break; + } + } + + /** + * stores the info information from {@linkplain TimelineEntity}. + */ + private void storeInfo(byte[] rowKey, Map info, + String flowVersion, ColumnPrefix columnPrefix, + TypedBufferedMutator table) throws IOException { + if (info != null) { + for (Map.Entry entry : info.entrySet()) { + columnPrefix.store(rowKey, table, + stringKeyConverter.encode(entry.getKey()), null, entry.getValue()); } } } @@ -377,19 +439,13 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, /** * stores the config information from {@linkplain TimelineEntity}. */ - private void storeConfig(byte[] rowKey, Map config, - boolean isApplication) throws IOException { - if (config == null) { - return; - } - for (Map.Entry entry : config.entrySet()) { - byte[] configKey = stringKeyConverter.encode(entry.getKey()); - if (isApplication) { - ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable, - configKey, null, entry.getValue()); - } else { - EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey, - null, entry.getValue()); + private void storeConfig(byte[] rowKey, Map config, + ColumnPrefix columnPrefix, TypedBufferedMutator table) + throws IOException { + if (config != null) { + for (Map.Entry entry : config.entrySet()) { + byte[] configKey = stringKeyConverter.encode(entry.getKey()); + columnPrefix.store(rowKey, table, configKey, null, entry.getValue()); } } } @@ -398,8 +454,9 @@ private void storeConfig(byte[] rowKey, Map config, * stores the {@linkplain TimelineMetric} information from the * {@linkplain TimelineEvent} object. */ - private void storeMetrics(byte[] rowKey, Set metrics, - boolean isApplication) throws IOException { + private void storeMetrics(byte[] rowKey, Set metrics, + ColumnPrefix columnPrefix, TypedBufferedMutator table) + throws IOException { if (metrics != null) { for (TimelineMetric metric : metrics) { byte[] metricColumnQualifier = @@ -407,13 +464,8 @@ private void storeMetrics(byte[] rowKey, Set metrics, Map timeseries = metric.getValues(); for (Map.Entry timeseriesEntry : timeseries.entrySet()) { Long timestamp = timeseriesEntry.getKey(); - if (isApplication) { - ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable, - metricColumnQualifier, timestamp, timeseriesEntry.getValue()); - } else { - EntityColumnPrefix.METRIC.store(rowKey, entityTable, - metricColumnQualifier, timestamp, timeseriesEntry.getValue()); - } + columnPrefix.store(rowKey, table, metricColumnQualifier, timestamp, + timeseriesEntry.getValue()); } } } @@ -422,8 +474,9 @@ private void storeMetrics(byte[] rowKey, Set metrics, /** * Stores the events from the {@linkplain TimelineEvent} object. */ - private void storeEvents(byte[] rowKey, Set events, - boolean isApplication) throws IOException { + private void storeEvents(byte[] rowKey, Set events, + ColumnPrefix columnPrefix, TypedBufferedMutator table) + throws IOException { if (events != null) { for (TimelineEvent event : events) { if (event != null) { @@ -441,26 +494,16 @@ private void storeEvents(byte[] rowKey, Set events, byte[] columnQualifierBytes = new EventColumnName(eventId, eventTimestamp, null) .getColumnQualifier(); - if (isApplication) { - ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, - columnQualifierBytes, null, Separator.EMPTY_BYTES); - } else { - EntityColumnPrefix.EVENT.store(rowKey, entityTable, - columnQualifierBytes, null, Separator.EMPTY_BYTES); - } + columnPrefix.store(rowKey, table, columnQualifierBytes, null, + Separator.EMPTY_BYTES); } else { for (Map.Entry info : eventInfo.entrySet()) { // eventId=infoKey byte[] columnQualifierBytes = new EventColumnName(eventId, eventTimestamp, info.getKey()) .getColumnQualifier(); - if (isApplication) { - ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, - columnQualifierBytes, null, info.getValue()); - } else { - EntityColumnPrefix.EVENT.store(rowKey, entityTable, - columnQualifierBytes, null, info.getValue()); - } + columnPrefix.store(rowKey, table, columnQualifierBytes, null, + info.getValue()); } // for info: eventInfo } } @@ -500,6 +543,7 @@ public void flush() throws IOException { applicationTable.flush(); flowRunTable.flush(); flowActivityTable.flush(); + subApplicationTable.flush(); } /** @@ -532,11 +576,13 @@ protected void serviceStop() throws Exception { // The close API performs flushing and releases any resources held flowActivityTable.close(); } + if (subApplicationTable != null) { + subApplicationTable.close(); + } if (conn != null) { LOG.info("closing the hbase Connection"); conn.close(); } super.serviceStop(); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java index e42c6cda6c..0c04959658 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java @@ -56,26 +56,6 @@ public SubApplicationRowKeyPrefix(String subAppUserId, String clusterId, userId); } - /** - * Creates a prefix which generates the following rowKeyPrefixes for the sub - * application table: - * {@code subAppUserId!clusterId!entityType!entityPrefix!entityId!userId}. - * - * subAppUserId is usually the doAsUser. - * userId is the yarn user that the AM runs as. - * - * @param clusterId - * identifying the cluster - * @param subAppUserId - * identifying the sub app user - * @param userId - * identifying the user who runs the AM - */ - public SubApplicationRowKeyPrefix(String clusterId, String subAppUserId, - String userId) { - this(subAppUserId, clusterId, null, null, null, userId); - } - /* * (non-Javadoc) * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 37387f1f76..306806fe0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -139,7 +139,7 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, // flush the writer buffer concurrently and swallow any exception // caused by the timeline enitites that are being put here. synchronized (writer) { - response = writeTimelineEntities(entities); + response = writeTimelineEntities(entities, callerUgi); flushBufferedTimelineEntities(); } @@ -147,15 +147,14 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, } private TimelineWriteResponse writeTimelineEntities( - TimelineEntities entities) throws IOException { + TimelineEntities entities, UserGroupInformation callerUgi) + throws IOException { // Update application metrics for aggregation updateAggregateStatus(entities, aggregationGroups, getEntityTypesSkipAggregation()); final TimelineCollectorContext context = getTimelineEntityContext(); - return writer.write(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowVersion(), - context.getFlowRunId(), context.getAppId(), entities); + return writer.write(context, entities, callerUgi); } /** @@ -187,7 +186,7 @@ public void putEntitiesAsync(TimelineEntities entities, callerUgi + ")"); } - writeTimelineEntities(entities); + writeTimelineEntities(entities, callerUgi); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 1f527f2450..ee4197000b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -28,12 +28,14 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -68,10 +70,17 @@ public class FileSystemTimelineWriterImpl extends AbstractService } @Override - public TimelineWriteResponse write(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities entities) throws IOException { + public TimelineWriteResponse write(TimelineCollectorContext context, + TimelineEntities entities, UserGroupInformation callerUgi) + throws IOException { TimelineWriteResponse response = new TimelineWriteResponse(); + String clusterId = context.getClusterId(); + String userId = context.getUserId(); + String flowName = context.getFlowName(); + String flowVersion = context.getFlowVersion(); + long flowRunId = context.getFlowRunId(); + String appId = context.getAppId(); + for (TimelineEntity entity : entities.getEntities()) { write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, response); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 663a18a495..12bc1cb3f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -21,10 +21,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; /** * This interface is for storing application timeline information. @@ -34,25 +36,19 @@ public interface TimelineWriter extends Service { /** - * Stores the entire information in {@link TimelineEntities} to the - * timeline store. Any errors occurring for individual write request objects - * will be reported in the response. + * Stores the entire information in {@link TimelineEntities} to the timeline + * store. Any errors occurring for individual write request objects will be + * reported in the response. * - * @param clusterId context cluster ID - * @param userId context user ID - * @param flowName context flow name - * @param flowVersion context flow version - * @param flowRunId run id for the flow. - * @param appId context app ID. - * @param data - * a {@link TimelineEntities} object. + * @param context a {@link TimelineCollectorContext} + * @param data a {@link TimelineEntities} object. + * @param callerUgi {@link UserGroupInformation}. * @return a {@link TimelineWriteResponse} object. - * @throws IOException if there is any exception encountered while storing - * or writing entities to the backend storage. + * @throws IOException if there is any exception encountered while storing or + * writing entities to the back end storage. */ - TimelineWriteResponse write(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities data) throws IOException; + TimelineWriteResponse write(TimelineCollectorContext context, + TimelineEntities data, UserGroupInformation callerUgi) throws IOException; /** * Aggregates the entity information to the timeline store based on which diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index 0f175537b2..ec454284da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -41,8 +41,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -156,9 +154,8 @@ public void testPutEntity() throws IOException { collector.putEntities( entities, UserGroupInformation.createRemoteUser("test-user")); - verify(writer, times(1)).write( - anyString(), anyString(), anyString(), anyString(), anyLong(), - anyString(), any(TimelineEntities.class)); + verify(writer, times(1)).write(any(TimelineCollectorContext.class), + any(TimelineEntities.class), any(UserGroupInformation.class)); verify(writer, times(1)).flush(); } @@ -175,9 +172,8 @@ public void testPutEntityAsync() throws IOException { collector.putEntitiesAsync( entities, UserGroupInformation.createRemoteUser("test-user")); - verify(writer, times(1)).write( - anyString(), anyString(), anyString(), anyString(), anyLong(), - anyString(), any(TimelineEntities.class)); + verify(writer, times(1)).write(any(TimelineCollectorContext.class), + any(TimelineEntities.class), any(UserGroupInformation.class)); verify(writer, never()).flush(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 4f12c57f7b..bb9f54f937 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -30,11 +30,13 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Rule; import org.junit.Test; @@ -89,8 +91,10 @@ public void testWriteEntityToFile() throws Exception { outputRoot); fsi.init(conf); fsi.start(); - fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L, - "app_id", te); + fsi.write( + new TimelineCollectorContext("cluster_id", "user_id", "flow_name", + "flow_version", 12345678L, "app_id"), + te, UserGroupInformation.createRemoteUser("user_id")); String fileName = fsi.getOutputRoot() + File.separator + "entities" + File.separator + "cluster_id" + File.separator + "user_id" +