YARN-4238. createdTime and modifiedTime is not reported while publishing entities to ATSv2. (Varun Saxena via Naganarasimha G R)
This commit is contained in:
parent
f5380d850e
commit
6934b05c71
@ -1135,10 +1135,10 @@ public void run() {
|
||||
// jobId, timestamp and entityType.
|
||||
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
createJobEntity(HistoryEvent event, long timestamp, JobId jobId,
|
||||
String entityType) {
|
||||
String entityType, boolean setCreatedTime) {
|
||||
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
||||
createBaseEntity(event, timestamp, entityType);
|
||||
createBaseEntity(event, timestamp, entityType, setCreatedTime);
|
||||
entity.setId(jobId.toString());
|
||||
return entity;
|
||||
}
|
||||
@ -1146,7 +1146,8 @@ public void run() {
|
||||
// create BaseEntity from HistoryEvent with adding other info, like:
|
||||
// timestamp and entityType.
|
||||
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
createBaseEntity(HistoryEvent event, long timestamp, String entityType) {
|
||||
createBaseEntity(HistoryEvent event, long timestamp, String entityType,
|
||||
boolean setCreatedTime) {
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent =
|
||||
event.toTimelineEvent();
|
||||
tEvent.setTimestamp(timestamp);
|
||||
@ -1155,6 +1156,9 @@ public void run() {
|
||||
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
|
||||
entity.addEvent(tEvent);
|
||||
entity.setType(entityType);
|
||||
if (setCreatedTime) {
|
||||
entity.setCreatedTime(timestamp);
|
||||
}
|
||||
return entity;
|
||||
}
|
||||
|
||||
@ -1162,9 +1166,10 @@ public void run() {
|
||||
// taskId, jobId, timestamp, entityType and relatedJobEntity.
|
||||
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
createTaskEntity(HistoryEvent event, long timestamp, String taskId,
|
||||
String entityType, String relatedJobEntity, JobId jobId) {
|
||||
String entityType, String relatedJobEntity, JobId jobId,
|
||||
boolean setCreatedTime) {
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
||||
createBaseEntity(event, timestamp, entityType);
|
||||
createBaseEntity(event, timestamp, entityType, setCreatedTime);
|
||||
entity.setId(taskId);
|
||||
entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
|
||||
return entity;
|
||||
@ -1175,9 +1180,9 @@ public void run() {
|
||||
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
createTaskAttemptEntity(HistoryEvent event, long timestamp,
|
||||
String taskAttemptId, String entityType, String relatedTaskEntity,
|
||||
String taskId) {
|
||||
String taskId, boolean setCreatedTime) {
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
||||
createBaseEntity(event, timestamp, entityType);
|
||||
createBaseEntity(event, timestamp, entityType, setCreatedTime);
|
||||
entity.setId(taskAttemptId);
|
||||
entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
|
||||
return entity;
|
||||
@ -1188,10 +1193,13 @@ private void processEventForNewTimelineService(HistoryEvent event, JobId jobId,
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity = null;
|
||||
String taskId = null;
|
||||
String taskAttemptId = null;
|
||||
boolean setCreatedTime = false;
|
||||
|
||||
switch (event.getEventType()) {
|
||||
// Handle job events
|
||||
case JOB_SUBMITTED:
|
||||
setCreatedTime = true;
|
||||
break;
|
||||
case JOB_STATUS_CHANGED:
|
||||
case JOB_INFO_CHANGED:
|
||||
case JOB_INITED:
|
||||
@ -1206,6 +1214,7 @@ private void processEventForNewTimelineService(HistoryEvent event, JobId jobId,
|
||||
break;
|
||||
// Handle task events
|
||||
case TASK_STARTED:
|
||||
setCreatedTime = true;
|
||||
taskId = ((TaskStartedEvent)event).getTaskId().toString();
|
||||
break;
|
||||
case TASK_FAILED:
|
||||
@ -1218,8 +1227,13 @@ private void processEventForNewTimelineService(HistoryEvent event, JobId jobId,
|
||||
taskId = ((TaskFinishedEvent)event).getTaskId().toString();
|
||||
break;
|
||||
case MAP_ATTEMPT_STARTED:
|
||||
case CLEANUP_ATTEMPT_STARTED:
|
||||
case REDUCE_ATTEMPT_STARTED:
|
||||
setCreatedTime = true;
|
||||
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
|
||||
taskAttemptId = ((TaskAttemptStartedEvent)event).
|
||||
getTaskAttemptId().toString();
|
||||
break;
|
||||
case CLEANUP_ATTEMPT_STARTED:
|
||||
case SETUP_ATTEMPT_STARTED:
|
||||
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
|
||||
taskAttemptId = ((TaskAttemptStartedEvent)event).
|
||||
@ -1258,17 +1272,18 @@ private void processEventForNewTimelineService(HistoryEvent event, JobId jobId,
|
||||
if (taskId == null) {
|
||||
// JobEntity
|
||||
tEntity = createJobEntity(event, timestamp, jobId,
|
||||
MAPREDUCE_JOB_ENTITY_TYPE);
|
||||
MAPREDUCE_JOB_ENTITY_TYPE, setCreatedTime);
|
||||
} else {
|
||||
if (taskAttemptId == null) {
|
||||
// TaskEntity
|
||||
tEntity = createTaskEntity(event, timestamp, taskId,
|
||||
MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE, jobId);
|
||||
MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE,
|
||||
jobId, setCreatedTime);
|
||||
} else {
|
||||
// TaskAttemptEntity
|
||||
tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
|
||||
MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
|
||||
taskId);
|
||||
taskId, setCreatedTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,7 +140,6 @@ public boolean equals(Object obj) {
|
||||
private HashMap<String, Set<String>> isRelatedToEntities = new HashMap<>();
|
||||
private HashMap<String, Set<String>> relatesToEntities = new HashMap<>();
|
||||
private long createdTime;
|
||||
private long modifiedTime;
|
||||
|
||||
public TimelineEntity() {
|
||||
identifier = new Identifier();
|
||||
@ -505,24 +504,6 @@ public void setCreatedTime(long createdTime) {
|
||||
}
|
||||
}
|
||||
|
||||
@XmlElement(name = "modifiedtime")
|
||||
public long getModifiedTime() {
|
||||
if (real == null) {
|
||||
return modifiedTime;
|
||||
} else {
|
||||
return real.getModifiedTime();
|
||||
}
|
||||
}
|
||||
|
||||
@JsonSetter("modifiedtime")
|
||||
public void setModifiedTime(long modifiedTime) {
|
||||
if (real == null) {
|
||||
this.modifiedTime = modifiedTime;
|
||||
} else {
|
||||
real.setModifiedTime(modifiedTime);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isValid() {
|
||||
return (getId() != null && getType() != null);
|
||||
}
|
||||
|
@ -139,7 +139,6 @@ public void testTimelineEntities() throws Exception {
|
||||
event1, event2);
|
||||
|
||||
entity.setCreatedTime(0L);
|
||||
entity.setModifiedTime(1L);
|
||||
entity.addRelatesToEntity("test type 2", "test id 2");
|
||||
entity.addRelatesToEntity("test type 3", "test id 3");
|
||||
entity.addIsRelatedToEntity("test type 4", "test id 4");
|
||||
|
@ -463,7 +463,6 @@ public void run() {
|
||||
pTree.updateProcessTree(); // update process-tree
|
||||
long currentVmemUsage = pTree.getVirtualMemorySize();
|
||||
long currentPmemUsage = pTree.getRssMemorySize();
|
||||
long currentTime = System.currentTimeMillis();
|
||||
|
||||
// if machine has 6 cores and 3 are used,
|
||||
// cpuUsagePercentPerCore should be 300% and
|
||||
@ -577,9 +576,8 @@ && isProcessTreeOverLimit(containerId.toString(),
|
||||
NMTimelinePublisher nmMetricsPublisher =
|
||||
container.getNMTimelinePublisher();
|
||||
if (nmMetricsPublisher != null) {
|
||||
nmMetricsPublisher.reportContainerResourceUsage(
|
||||
container, currentTime, pId, currentPmemUsage,
|
||||
cpuUsageTotalCoresPercentage);
|
||||
nmMetricsPublisher.reportContainerResourceUsage(container, pId,
|
||||
currentPmemUsage, cpuUsageTotalCoresPercentage);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// Log the exception and proceed to the next container.
|
||||
|
@ -113,9 +113,8 @@ protected void handleNMTimelineEvent(NMTimelineEvent event) {
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void reportContainerResourceUsage(Container container,
|
||||
long createdTime, String pId, Long pmemUsage,
|
||||
Float cpuUsageTotalCoresPercentage) {
|
||||
public void reportContainerResourceUsage(Container container, String pId,
|
||||
Long pmemUsage, Float cpuUsageTotalCoresPercentage) {
|
||||
if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
|
||||
cpuUsageTotalCoresPercentage !=
|
||||
ResourceCalculatorProcessTree.UNAVAILABLE) {
|
||||
@ -164,6 +163,7 @@ private void publishContainerCreatedEvent(ContainerEntity entity,
|
||||
tEvent.setTimestamp(timestamp);
|
||||
|
||||
entity.addEvent(tEvent);
|
||||
entity.setCreatedTime(timestamp);
|
||||
putEntity(entity, containerId.getApplicationAttemptId().getApplicationId());
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
@ -44,10 +45,15 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
@ -58,6 +64,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
import org.junit.AfterClass;
|
||||
@ -201,8 +208,7 @@ public void testPublishApplicationMetrics() throws Exception {
|
||||
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
File appFile = new File(outputDirApp, timelineServiceFileName);
|
||||
Assert.assertTrue(appFile.exists());
|
||||
Assert.assertEquals("Expected 3 events to be published", 3,
|
||||
getNumOfNonEmptyLines(appFile));
|
||||
verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
@ -236,8 +242,7 @@ public void testPublishAppAttemptMetrics() throws Exception {
|
||||
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
File appFile = new File(outputDirApp, timelineServiceFileName);
|
||||
Assert.assertTrue(appFile.exists());
|
||||
Assert.assertEquals("Expected 2 events to be published", 2,
|
||||
getNumOfNonEmptyLines(appFile));
|
||||
verifyEntity(appFile,2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
@ -268,8 +273,8 @@ public void testPublishContainerMetrics() throws Exception {
|
||||
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
File appFile = new File(outputDirApp, timelineServiceFileName);
|
||||
Assert.assertTrue(appFile.exists());
|
||||
Assert.assertEquals("Expected 2 events to be published", 2,
|
||||
getNumOfNonEmptyLines(appFile));
|
||||
verifyEntity(appFile, 2,
|
||||
ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
|
||||
}
|
||||
|
||||
private RMApp createAppAndRegister(ApplicationId appId) {
|
||||
@ -282,20 +287,31 @@ private RMApp createAppAndRegister(ApplicationId appId) {
|
||||
return app;
|
||||
}
|
||||
|
||||
private long getNumOfNonEmptyLines(File entityFile) throws IOException {
|
||||
private static void verifyEntity(File entityFile, long expectedEvents,
|
||||
String eventForCreatedTime) throws IOException {
|
||||
BufferedReader reader = null;
|
||||
String strLine;
|
||||
long count = 0;
|
||||
try {
|
||||
reader = new BufferedReader(new FileReader(entityFile));
|
||||
while ((strLine = reader.readLine()) != null) {
|
||||
if (strLine.trim().length() > 0)
|
||||
if (strLine.trim().length() > 0) {
|
||||
TimelineEntity entity = FileSystemTimelineReaderImpl.
|
||||
getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class);
|
||||
for (TimelineEvent event : entity.getEvents()) {
|
||||
if (event.getId().equals(eventForCreatedTime)) {
|
||||
assertTrue(entity.getCreatedTime() > 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
count++;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
return count;
|
||||
assertEquals("Expected " + expectedEvents + " events to be published",
|
||||
count, expectedEvents);
|
||||
}
|
||||
|
||||
private String getTimelineEntityDir(RMApp app) {
|
||||
|
@ -68,16 +68,15 @@ private static String getClusterID(String clusterId, Configuration conf) {
|
||||
Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
EnumSet<Field> fieldsToRetrieve) throws IOException {
|
||||
String cluster = getClusterID(clusterId, getConfig());
|
||||
return reader.getEntities(userId, cluster, flowName, flowRunId, appId,
|
||||
entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
|
||||
metricFilters, eventFilters, null, null, fieldsToRetrieve);
|
||||
entityType, limit, createdTimeBegin, createdTimeEnd, relatesTo,
|
||||
isRelatedTo, infoFilters, configFilters, metricFilters, eventFilters,
|
||||
null, null, fieldsToRetrieve);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -331,8 +331,6 @@ public Set<TimelineEntity> getEntities(
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@QueryParam("modifiedtimestart") String modifiedTimeStart,
|
||||
@QueryParam("modifiedtimeend") String modifiedTimeEnd,
|
||||
@QueryParam("relatesto") String relatesTo,
|
||||
@QueryParam("isrelatedto") String isRelatedTo,
|
||||
@QueryParam("infofilters") String infofilters,
|
||||
@ -341,9 +339,9 @@ public Set<TimelineEntity> getEntities(
|
||||
@QueryParam("eventfilters") String eventfilters,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, null, appId, entityType, userId, flowName,
|
||||
flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
|
||||
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
|
||||
fields);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -364,8 +362,6 @@ public Set<TimelineEntity> getEntities(
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@QueryParam("modifiedtimestart") String modifiedTimeStart,
|
||||
@QueryParam("modifiedtimeend") String modifiedTimeEnd,
|
||||
@QueryParam("relatesto") String relatesTo,
|
||||
@QueryParam("isrelatedto") String isRelatedTo,
|
||||
@QueryParam("infofilters") String infofilters,
|
||||
@ -387,8 +383,7 @@ public Set<TimelineEntity> getEntities(
|
||||
parseStr(userId), parseStr(clusterId), parseStr(flowName),
|
||||
parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
|
||||
parseLongStr(limit), parseLongStr(createdTimeStart),
|
||||
parseLongStr(createdTimeEnd), parseLongStr(modifiedTimeStart),
|
||||
parseLongStr(modifiedTimeEnd),
|
||||
parseLongStr(createdTimeEnd),
|
||||
parseKeyStrValuesStr(relatesTo, COMMA_DELIMITER, COLON_DELIMITER),
|
||||
parseKeyStrValuesStr(isRelatedTo, COMMA_DELIMITER, COLON_DELIMITER),
|
||||
parseKeyStrValueObj(infofilters, COMMA_DELIMITER, COLON_DELIMITER),
|
||||
@ -398,7 +393,7 @@ public Set<TimelineEntity> getEntities(
|
||||
parseFieldsStr(fields, COMMA_DELIMITER));
|
||||
} catch (Exception e) {
|
||||
handleException(e, url, startTime,
|
||||
"createdTime or modifiedTime start/end or limit or flowrunid");
|
||||
"createdTime start/end or limit or flowrunid");
|
||||
}
|
||||
long endTime = Time.monotonicNow();
|
||||
if (entities == null) {
|
||||
@ -585,8 +580,8 @@ public Set<TimelineEntity> getFlowRuns(
|
||||
entities = timelineReaderManager.getEntities(
|
||||
parseStr(userId), parseStr(clusterId), parseStr(flowName), null, null,
|
||||
TimelineEntityType.YARN_FLOW_RUN.toString(), parseLongStr(limit),
|
||||
parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd), null,
|
||||
null, null, null, null, null, null, null,
|
||||
parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd),
|
||||
null, null, null, null, null, null,
|
||||
parseFieldsStr(fields, COMMA_DELIMITER));
|
||||
} catch (Exception e) {
|
||||
handleException(e, url, startTime, "createdTime start/end or limit");
|
||||
@ -668,7 +663,7 @@ public Set<TimelineEntity> getFlows(
|
||||
null, parseStr(clusterId), null, null, null,
|
||||
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), parseLongStr(limit),
|
||||
range.dateStart, range.dateEnd, null, null, null, null, null, null,
|
||||
null, null, parseFieldsStr(fields, COMMA_DELIMITER));
|
||||
parseFieldsStr(fields, COMMA_DELIMITER));
|
||||
} catch (Exception e) {
|
||||
handleException(e, url, startTime, "limit");
|
||||
}
|
||||
@ -760,8 +755,6 @@ public Set<TimelineEntity> getFlowRunApps(
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@QueryParam("modifiedtimestart") String modifiedTimeStart,
|
||||
@QueryParam("modifiedtimeend") String modifiedTimeEnd,
|
||||
@QueryParam("relatesto") String relatesTo,
|
||||
@QueryParam("isrelatedto") String isRelatedTo,
|
||||
@QueryParam("infofilters") String infofilters,
|
||||
@ -771,9 +764,9 @@ public Set<TimelineEntity> getFlowRunApps(
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, null, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
|
||||
flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
|
||||
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
|
||||
fields);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -794,8 +787,6 @@ public Set<TimelineEntity> getFlowRunApps(
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@QueryParam("modifiedtimestart") String modifiedTimeStart,
|
||||
@QueryParam("modifiedtimeend") String modifiedTimeEnd,
|
||||
@QueryParam("relatesto") String relatesTo,
|
||||
@QueryParam("isrelatedto") String isRelatedTo,
|
||||
@QueryParam("infofilters") String infofilters,
|
||||
@ -805,9 +796,9 @@ public Set<TimelineEntity> getFlowRunApps(
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, clusterId, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
|
||||
flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
|
||||
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
|
||||
fields);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -827,8 +818,6 @@ public Set<TimelineEntity> getFlowApps(
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@QueryParam("modifiedtimestart") String modifiedTimeStart,
|
||||
@QueryParam("modifiedtimeend") String modifiedTimeEnd,
|
||||
@QueryParam("relatesto") String relatesTo,
|
||||
@QueryParam("isrelatedto") String isRelatedTo,
|
||||
@QueryParam("infofilters") String infofilters,
|
||||
@ -838,9 +827,8 @@ public Set<TimelineEntity> getFlowApps(
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, null, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
|
||||
null, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
|
||||
infofilters, conffilters, metricfilters, eventfilters, fields);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -860,8 +848,6 @@ public Set<TimelineEntity> getFlowApps(
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@QueryParam("modifiedtimestart") String modifiedTimeStart,
|
||||
@QueryParam("modifiedtimeend") String modifiedTimeEnd,
|
||||
@QueryParam("relatesto") String relatesTo,
|
||||
@QueryParam("isrelatedto") String isRelatedTo,
|
||||
@QueryParam("infofilters") String infofilters,
|
||||
@ -871,8 +857,7 @@ public Set<TimelineEntity> getFlowApps(
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, clusterId, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
|
||||
null, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
|
||||
infofilters, conffilters, metricfilters, eventfilters, fields);
|
||||
}
|
||||
}
|
@ -188,7 +188,6 @@ private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
|
||||
TimelineEntity entityToBeReturned = new TimelineEntity();
|
||||
entityToBeReturned.setIdentifier(entity.getIdentifier());
|
||||
entityToBeReturned.setCreatedTime(entity.getCreatedTime());
|
||||
entityToBeReturned.setModifiedTime(entity.getModifiedTime());
|
||||
if (fieldsToRetrieve != null) {
|
||||
fillFields(entityToBeReturned, entity, fieldsToRetrieve);
|
||||
}
|
||||
@ -206,9 +205,6 @@ private static void mergeEntities(TimelineEntity entity1,
|
||||
if (entity2.getCreatedTime() > 0) {
|
||||
entity1.setCreatedTime(entity2.getCreatedTime());
|
||||
}
|
||||
if (entity2.getModifiedTime() > 0) {
|
||||
entity1.setModifiedTime(entity2.getModifiedTime());
|
||||
}
|
||||
for (Entry<String, String> configEntry : entity2.getConfigs().entrySet()) {
|
||||
entity1.addConfig(configEntry.getKey(), configEntry.getValue());
|
||||
}
|
||||
@ -268,8 +264,7 @@ private static TimelineEntity readEntityFromFile(BufferedReader reader)
|
||||
}
|
||||
|
||||
private Set<TimelineEntity> getEntities(File dir, String entityType,
|
||||
Long limit, Long createdTimeBegin,
|
||||
Long createdTimeEnd, Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
@ -284,12 +279,6 @@ private Set<TimelineEntity> getEntities(File dir, String entityType,
|
||||
if (createdTimeEnd == null || createdTimeEnd <= 0) {
|
||||
createdTimeEnd = Long.MAX_VALUE;
|
||||
}
|
||||
if (modifiedTimeBegin == null || modifiedTimeBegin <= 0) {
|
||||
modifiedTimeBegin = 0L;
|
||||
}
|
||||
if (modifiedTimeEnd == null || modifiedTimeEnd <= 0) {
|
||||
modifiedTimeEnd = Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
// First sort the selected entities based on created/start time.
|
||||
Map<Long, Set<TimelineEntity>> sortedEntities =
|
||||
@ -318,10 +307,6 @@ public int compare(Long l1, Long l2) {
|
||||
createdTimeEnd)) {
|
||||
continue;
|
||||
}
|
||||
if (!isTimeInRange(entity.getModifiedTime(), modifiedTimeBegin,
|
||||
modifiedTimeEnd)) {
|
||||
continue;
|
||||
}
|
||||
if (relatesTo != null && !relatesTo.isEmpty() &&
|
||||
!TimelineStorageUtils
|
||||
.matchRelations(entity.getRelatesToEntities(), relatesTo)) {
|
||||
@ -413,7 +398,6 @@ public TimelineEntity getEntity(String userId, String clusterId,
|
||||
public Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
@ -424,8 +408,7 @@ public Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
File dir =
|
||||
new File(new File(rootPath, ENTITIES_DIR),
|
||||
clusterId + "/" + flowRunPath + "/" + appId + "/" + entityType);
|
||||
return getEntities(dir, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
return getEntities(dir, entityType, limit, createdTimeBegin, createdTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
|
||||
}
|
||||
|
@ -81,7 +81,6 @@ public TimelineEntity getEntity(String userId, String clusterId,
|
||||
public Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
@ -90,10 +89,9 @@ public Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
TimelineEntityReader reader =
|
||||
TimelineEntityReaderFactory.createMultipleEntitiesReader(userId,
|
||||
clusterId, flowName, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
|
||||
metricFilters, eventFilters, confsToRetrieve, metricsToRetrieve,
|
||||
fieldsToRetrieve);
|
||||
createdTimeBegin, createdTimeEnd, relatesTo, isRelatedTo,
|
||||
infoFilters, configFilters, metricFilters, eventFilters,
|
||||
confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
|
||||
return reader.readEntities(hbaseConf, conn);
|
||||
}
|
||||
}
|
||||
|
@ -315,8 +315,6 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion,
|
||||
ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId());
|
||||
ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null,
|
||||
te.getCreatedTime());
|
||||
ApplicationColumn.MODIFIED_TIME.store(rowKey, applicationTable, null,
|
||||
te.getModifiedTime());
|
||||
ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null,
|
||||
flowVersion);
|
||||
Map<String, Object> info = te.getInfo();
|
||||
@ -331,8 +329,6 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion,
|
||||
EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
|
||||
EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
|
||||
te.getCreatedTime());
|
||||
EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null,
|
||||
te.getModifiedTime());
|
||||
EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
|
||||
Map<String, Object> info = te.getInfo();
|
||||
if (info != null) {
|
||||
|
@ -143,10 +143,10 @@ public TimelineWriteResponse writeAggregatedEntity(
|
||||
TimelineWriteResponse response = new TimelineWriteResponse();
|
||||
String sql = "UPSERT INTO " + info.getTableName()
|
||||
+ " (" + StringUtils.join(info.getPrimaryKeyList(), ",")
|
||||
+ ", created_time, modified_time, metric_names) "
|
||||
+ ", created_time, metric_names) "
|
||||
+ "VALUES ("
|
||||
+ StringUtils.repeat("?,", info.getPrimaryKeyList().length)
|
||||
+ "?, ?, ?)";
|
||||
+ "?, ?)";
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("TimelineEntity write SQL: " + sql);
|
||||
}
|
||||
@ -162,7 +162,6 @@ public TimelineWriteResponse writeAggregatedEntity(
|
||||
}
|
||||
int idx = info.setStringsForPrimaryKey(ps, context, null, 1);
|
||||
ps.setLong(idx++, entity.getCreatedTime());
|
||||
ps.setLong(idx++, entity.getModifiedTime());
|
||||
ps.setString(idx++, StringUtils.join(formattedMetrics.keySet().toArray(),
|
||||
AGGREGATION_STORAGE_SEPARATOR));
|
||||
ps.execute();
|
||||
@ -197,7 +196,7 @@ public void createPhoenixTables() throws IOException {
|
||||
+ OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME
|
||||
+ "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
|
||||
+ "flow_name VARCHAR NOT NULL, "
|
||||
+ "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
|
||||
+ "created_time UNSIGNED_LONG, "
|
||||
+ METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
|
||||
+ "metric_names VARCHAR, info_keys VARCHAR "
|
||||
+ "CONSTRAINT pk PRIMARY KEY("
|
||||
@ -206,7 +205,7 @@ public void createPhoenixTables() throws IOException {
|
||||
sql = "CREATE TABLE IF NOT EXISTS "
|
||||
+ OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME
|
||||
+ "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
|
||||
+ "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
|
||||
+ "created_time UNSIGNED_LONG, "
|
||||
+ METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
|
||||
+ "metric_names VARCHAR, info_keys VARCHAR "
|
||||
+ "CONSTRAINT pk PRIMARY KEY(user, cluster))";
|
||||
|
@ -87,8 +87,8 @@ public enum Field {
|
||||
* @param fieldsToRetrieve
|
||||
* Specifies which fields of the entity object to retrieve(optional), see
|
||||
* {@link Field}. If null, retrieves 4 fields namely entity id,
|
||||
* entity type, entity created time and entity modified time. All
|
||||
* entities will be returned if {@link Field#ALL} is specified.
|
||||
* entity type and entity created time. All entities will be returned if
|
||||
* {@link Field#ALL} is specified.
|
||||
* @return a {@link TimelineEntity} instance or null. The entity will
|
||||
* contain the metadata plus the given fields to retrieve.
|
||||
* @throws IOException
|
||||
@ -101,13 +101,13 @@ TimelineEntity getEntity(String userId, String clusterId, String flowName,
|
||||
/**
|
||||
* <p>The API to search for a set of entities of the given the entity type in
|
||||
* the scope of the given context which matches the given predicates. The
|
||||
* predicates include the created/modified time window, limit to number of
|
||||
* entities to be returned, and the entities can be filtered by checking
|
||||
* whether they contain the given info/configs entries in the form of
|
||||
* key/value pairs, given metrics in the form of metricsIds and its relation
|
||||
* with metric values given events in the form of the Ids, and whether they
|
||||
* relate to/are related to other entities. For those parameters which have
|
||||
* multiple entries, the qualified entity needs to meet all or them.</p>
|
||||
* predicates include the created time window, limit to number of entities to
|
||||
* be returned, and the entities can be filtered by checking whether they
|
||||
* contain the given info/configs entries in the form of key/value pairs,
|
||||
* given metrics in the form of metricsIds and its relation with metric
|
||||
* values, given events in the form of the Ids, and whether they relate to/are
|
||||
* related to other entities. For those parameters which have multiple
|
||||
* entries, the qualified entity needs to meet all or them.</p>
|
||||
*
|
||||
* @param userId
|
||||
* Context user Id(optional).
|
||||
@ -130,12 +130,6 @@ TimelineEntity getEntity(String userId, String clusterId, String flowName,
|
||||
* @param createdTimeEnd
|
||||
* Matched entities should not be created after this timestamp (optional).
|
||||
* If null or <=0, defaults to {@link Long#MAX_VALUE}.
|
||||
* @param modifiedTimeBegin
|
||||
* Matched entities should not be modified before this timestamp
|
||||
* (optional). If null or <=0, defaults to 0.
|
||||
* @param modifiedTimeEnd
|
||||
* Matched entities should not be modified after this timestamp (optional).
|
||||
* If null or <=0, defaults to {@link Long#MAX_VALUE}.
|
||||
* @param relatesTo
|
||||
* Matched entities should relate to given entities (optional).
|
||||
* @param isRelatedTo
|
||||
@ -173,19 +167,17 @@ TimelineEntity getEntity(String userId, String clusterId, String flowName,
|
||||
* @param fieldsToRetrieve
|
||||
* Specifies which fields of the entity object to retrieve(optional), see
|
||||
* {@link Field}. If null, retrieves 4 fields namely entity id,
|
||||
* entity type, entity created time and entity modified time. All
|
||||
* entities will be returned if {@link Field#ALL} is specified.
|
||||
* entity type and entity created time. All entities will be returned if
|
||||
* {@link Field#ALL} is specified.
|
||||
* @return A set of {@link TimelineEntity} instances of the given entity type
|
||||
* in the given context scope which matches the given predicates
|
||||
* ordered by created time, descending. Each entity will only contain the
|
||||
* metadata(id, type, created and modified times) plus the given fields to
|
||||
* retrieve.
|
||||
* metadata(id, type and created time) plus the given fields to retrieve.
|
||||
* @throws IOException
|
||||
*/
|
||||
Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
|
@ -43,11 +43,6 @@ public enum ApplicationColumn implements Column<ApplicationTable> {
|
||||
*/
|
||||
CREATED_TIME(ApplicationColumnFamily.INFO, "created_time"),
|
||||
|
||||
/**
|
||||
* When it was modified.
|
||||
*/
|
||||
MODIFIED_TIME(ApplicationColumnFamily.INFO, "modified_time"),
|
||||
|
||||
/**
|
||||
* The version of the flow that this app belongs to.
|
||||
*/
|
||||
|
@ -50,15 +50,12 @@
|
||||
* | flowName! | created_time: | @timestamp1 | |
|
||||
* | flowRunId! | 1392993084018 | | configKey2: |
|
||||
* | AppId | | metriciD1: | configValue2 |
|
||||
* | | modified_time: | metricValue2 | |
|
||||
* | | 1392995081012 | @timestamp2 | |
|
||||
* | | i!infoKey: | metricValue2 | |
|
||||
* | | infoValue | @timestamp2 | |
|
||||
* | | | | |
|
||||
* | | i!infoKey: | metricId2: | |
|
||||
* | | infoValue | metricValue1 | |
|
||||
* | | r!relatesToKey: | metricId2: | |
|
||||
* | | id3=id4=id5 | metricValue1 | |
|
||||
* | | | @timestamp2 | |
|
||||
* | | r!relatesToKey: | | |
|
||||
* | | id3=id4=id5 | | |
|
||||
* | | | | |
|
||||
* | | s!isRelatedToKey: | | |
|
||||
* | | id7=id9=id6 | | |
|
||||
* | | | | |
|
||||
|
@ -48,11 +48,6 @@ public enum EntityColumn implements Column<EntityTable> {
|
||||
*/
|
||||
CREATED_TIME(EntityColumnFamily.INFO, "created_time"),
|
||||
|
||||
/**
|
||||
* When it was modified.
|
||||
*/
|
||||
MODIFIED_TIME(EntityColumnFamily.INFO, "modified_time"),
|
||||
|
||||
/**
|
||||
* The version of the flow that this entity belongs to.
|
||||
*/
|
||||
|
@ -48,16 +48,13 @@
|
||||
* | clusterId! | | metricValue1 | configValue1 |
|
||||
* | flowName! | type:entityType | @timestamp1 | |
|
||||
* | flowRunId! | | | configKey2: |
|
||||
* | AppId! | created_time: | metriciD1: | configValue2 |
|
||||
* | AppId! | created_time: | metricId1: | configValue2 |
|
||||
* | entityType!| 1392993084018 | metricValue2 | |
|
||||
* | entityId | | @timestamp2 | |
|
||||
* | | modified_time: | | |
|
||||
* | | 1392995081012 | metricId2: | |
|
||||
* | | i!infoKey: | | |
|
||||
* | | infoValue | metricId1: | |
|
||||
* | | | metricValue1 | |
|
||||
* | | i!infoKey: | @timestamp2 | |
|
||||
* | | infoValue | | |
|
||||
* | | | | |
|
||||
* | | r!relatesToKey: | | |
|
||||
* | | r!relatesToKey: | @timestamp2 | |
|
||||
* | | id3=id4=id5 | | |
|
||||
* | | | | |
|
||||
* | | s!isRelatedToKey | | |
|
||||
|
@ -63,17 +63,15 @@ class ApplicationEntityReader extends GenericEntityReader {
|
||||
public ApplicationEntityReader(String userId, String clusterId,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
|
||||
EnumSet<Field> fieldsToRetrieve) {
|
||||
super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
|
||||
true);
|
||||
createdTimeBegin, createdTimeEnd, relatesTo, isRelatedTo, infoFilters,
|
||||
configFilters, metricFilters, eventFilters, confsToRetrieve,
|
||||
metricsToRetrieve, fieldsToRetrieve, true);
|
||||
}
|
||||
|
||||
public ApplicationEntityReader(String userId, String clusterId,
|
||||
@ -230,12 +228,6 @@ protected void augmentParams(Configuration hbaseConf, Connection conn)
|
||||
if (createdTimeEnd == null) {
|
||||
createdTimeEnd = DEFAULT_END_TIME;
|
||||
}
|
||||
if (modifiedTimeBegin == null) {
|
||||
modifiedTimeBegin = DEFAULT_BEGIN_TIME;
|
||||
}
|
||||
if (modifiedTimeEnd == null) {
|
||||
modifiedTimeEnd = DEFAULT_END_TIME;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -278,15 +270,6 @@ protected TimelineEntity parseEntity(Result result) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
// fetch modified time
|
||||
Number modifiedTime =
|
||||
(Number)ApplicationColumn.MODIFIED_TIME.readResult(result);
|
||||
entity.setModifiedTime(modifiedTime.longValue());
|
||||
if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
|
||||
entity.getModifiedTime() > modifiedTimeEnd)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// fetch is related to entities
|
||||
boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
|
||||
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||
|
@ -52,15 +52,14 @@ class FlowActivityEntityReader extends TimelineEntityReader {
|
||||
public FlowActivityEntityReader(String userId, String clusterId,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
EnumSet<Field> fieldsToRetrieve) {
|
||||
super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, null, null, fieldsToRetrieve, true);
|
||||
createdTimeBegin, createdTimeEnd, relatesTo, isRelatedTo, infoFilters,
|
||||
configFilters, metricFilters, eventFilters, null, null,
|
||||
fieldsToRetrieve, true);
|
||||
}
|
||||
|
||||
public FlowActivityEntityReader(String userId, String clusterId,
|
||||
|
@ -61,16 +61,15 @@ class FlowRunEntityReader extends TimelineEntityReader {
|
||||
public FlowRunEntityReader(String userId, String clusterId,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
|
||||
EnumSet<Field> fieldsToRetrieve) {
|
||||
super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true);
|
||||
createdTimeBegin, createdTimeEnd, relatesTo, isRelatedTo, infoFilters,
|
||||
configFilters, metricFilters, eventFilters, null, metricsToRetrieve,
|
||||
fieldsToRetrieve, true);
|
||||
}
|
||||
|
||||
public FlowRunEntityReader(String userId, String clusterId,
|
||||
|
@ -79,17 +79,15 @@ class GenericEntityReader extends TimelineEntityReader {
|
||||
public GenericEntityReader(String userId, String clusterId,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
|
||||
EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
|
||||
super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
|
||||
sortedKeys);
|
||||
createdTimeBegin, createdTimeEnd, relatesTo, isRelatedTo, infoFilters,
|
||||
configFilters, metricFilters, eventFilters, confsToRetrieve,
|
||||
metricsToRetrieve, fieldsToRetrieve, sortedKeys);
|
||||
}
|
||||
|
||||
public GenericEntityReader(String userId, String clusterId,
|
||||
@ -257,12 +255,6 @@ protected void augmentParams(Configuration hbaseConf, Connection conn)
|
||||
if (createdTimeEnd == null) {
|
||||
createdTimeEnd = DEFAULT_END_TIME;
|
||||
}
|
||||
if (modifiedTimeBegin == null) {
|
||||
modifiedTimeBegin = DEFAULT_BEGIN_TIME;
|
||||
}
|
||||
if (modifiedTimeEnd == null) {
|
||||
modifiedTimeEnd = DEFAULT_END_TIME;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -314,14 +306,6 @@ protected TimelineEntity parseEntity(Result result) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
// fetch modified time
|
||||
Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result);
|
||||
entity.setModifiedTime(modifiedTime.longValue());
|
||||
if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
|
||||
entity.getModifiedTime() > modifiedTimeEnd)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// fetch is related to entities
|
||||
boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
|
||||
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||
|
@ -64,8 +64,6 @@ public abstract class TimelineEntityReader {
|
||||
protected Long limit;
|
||||
protected Long createdTimeBegin;
|
||||
protected Long createdTimeEnd;
|
||||
protected Long modifiedTimeBegin;
|
||||
protected Long modifiedTimeEnd;
|
||||
protected Map<String, Set<String>> relatesTo;
|
||||
protected Map<String, Set<String>> isRelatedTo;
|
||||
protected Map<String, Object> infoFilters;
|
||||
@ -94,7 +92,6 @@ public abstract class TimelineEntityReader {
|
||||
protected TimelineEntityReader(String userId, String clusterId,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
@ -112,8 +109,6 @@ protected TimelineEntityReader(String userId, String clusterId,
|
||||
this.limit = limit;
|
||||
this.createdTimeBegin = createdTimeBegin;
|
||||
this.createdTimeEnd = createdTimeEnd;
|
||||
this.modifiedTimeBegin = modifiedTimeBegin;
|
||||
this.modifiedTimeEnd = modifiedTimeEnd;
|
||||
this.relatesTo = relatesTo;
|
||||
this.isRelatedTo = isRelatedTo;
|
||||
this.infoFilters = infoFilters;
|
||||
|
@ -62,7 +62,6 @@ public static TimelineEntityReader createSingleEntityReader(String userId,
|
||||
public static TimelineEntityReader createMultipleEntitiesReader(String userId,
|
||||
String clusterId, String flowName, Long flowRunId, String appId,
|
||||
String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
@ -72,29 +71,25 @@ public static TimelineEntityReader createMultipleEntitiesReader(String userId,
|
||||
// table are application, flow run, and flow activity entities
|
||||
if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
|
||||
return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId,
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
|
||||
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
|
||||
infoFilters, configFilters, metricFilters, eventFilters, confs,
|
||||
metrics, fieldsToRetrieve);
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd, relatesTo,
|
||||
isRelatedTo, infoFilters, configFilters, metricFilters, eventFilters,
|
||||
confs, metrics, fieldsToRetrieve);
|
||||
} else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
|
||||
return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId,
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
|
||||
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
|
||||
infoFilters, configFilters, metricFilters, eventFilters,
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd, relatesTo,
|
||||
isRelatedTo, infoFilters, configFilters, metricFilters, eventFilters,
|
||||
fieldsToRetrieve);
|
||||
} else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
|
||||
return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId,
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
|
||||
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
|
||||
infoFilters, configFilters, metricFilters, eventFilters, confs,
|
||||
metrics, fieldsToRetrieve);
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd, relatesTo,
|
||||
isRelatedTo, infoFilters, configFilters, metricFilters, eventFilters,
|
||||
confs, metrics, fieldsToRetrieve);
|
||||
} else {
|
||||
// assume we're dealing with a generic entity read
|
||||
return new GenericEntityReader(userId, clusterId, flowName, flowRunId,
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
|
||||
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
|
||||
infoFilters, configFilters, metricFilters, eventFilters, confs,
|
||||
metrics, fieldsToRetrieve, false);
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd, relatesTo,
|
||||
isRelatedTo, infoFilters, configFilters, metricFilters, eventFilters,
|
||||
confs, metrics, fieldsToRetrieve, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -174,9 +174,8 @@ public void testGetEntityDefaultView() throws Exception {
|
||||
assertEquals("id_1", entity.getId());
|
||||
assertEquals("app", entity.getType());
|
||||
assertEquals(1425016502000L, entity.getCreatedTime());
|
||||
assertEquals(1425016503000L, entity.getModifiedTime());
|
||||
// Default view i.e. when no fields are specified, entity contains only
|
||||
// entity id, entity type, created and modified time.
|
||||
// entity id, entity type and created time.
|
||||
assertEquals(0, entity.getConfigs().size());
|
||||
assertEquals(0, entity.getMetrics().size());
|
||||
} finally {
|
||||
@ -198,7 +197,6 @@ public void testGetEntityWithUserAndFlowInfo() throws Exception {
|
||||
assertEquals("id_1", entity.getId());
|
||||
assertEquals("app", entity.getType());
|
||||
assertEquals(1425016502000L, entity.getCreatedTime());
|
||||
assertEquals(1425016503000L, entity.getModifiedTime());
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
@ -381,50 +379,6 @@ public void testGetEntitiesBasedOnCreatedTime() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntitiesBasedOnModifiedTime() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/cluster1/app1/app?modifiedtimestart=1425016502090"
|
||||
+ "&modifiedtimeend=1425016503020");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
assertNotNull(entities);
|
||||
assertEquals(2, entities.size());
|
||||
assertTrue("Entities with id_1 and id_4 should have been" +
|
||||
" present in response.",
|
||||
entities.contains(newEntity("app", "id_1")) &&
|
||||
entities.contains(newEntity("app", "id_4")));
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
|
||||
"entities/cluster1/app1/app?modifiedtimeend=1425016502090");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
assertNotNull(entities);
|
||||
assertEquals(2, entities.size());
|
||||
assertTrue("Entities with id_2 and id_3 should have been " +
|
||||
"present in response.",
|
||||
entities.contains(newEntity("app", "id_2")) &&
|
||||
entities.contains(newEntity("app", "id_3")));
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
|
||||
"entities/cluster1/app1/app?modifiedtimestart=1425016503005");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
assertNotNull(entities);
|
||||
assertEquals(1, entities.size());
|
||||
assertTrue("Entity with id_4 should have been present in response.",
|
||||
entities.contains(newEntity("app", "id_4")));
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntitiesByRelations() throws Exception {
|
||||
Client client = createClient();
|
||||
|
@ -107,7 +107,6 @@ private static void loadEntityData() throws Exception {
|
||||
entity11.setId("id_1");
|
||||
entity11.setType("app");
|
||||
entity11.setCreatedTime(1425016502000L);
|
||||
entity11.setModifiedTime(1425016502050L);
|
||||
Map<String, Object> info1 = new HashMap<String, Object>();
|
||||
info1.put("info1", "val1");
|
||||
entity11.addInfo(info1);
|
||||
@ -136,7 +135,6 @@ private static void loadEntityData() throws Exception {
|
||||
TimelineEntity entity12 = new TimelineEntity();
|
||||
entity12.setId("id_1");
|
||||
entity12.setType("app");
|
||||
entity12.setModifiedTime(1425016503000L);
|
||||
configs.clear();
|
||||
configs.put("config_2", "23");
|
||||
configs.put("config_3", "abc");
|
||||
@ -166,7 +164,6 @@ private static void loadEntityData() throws Exception {
|
||||
entity2.setId("id_2");
|
||||
entity2.setType("app");
|
||||
entity2.setCreatedTime(1425016501050L);
|
||||
entity2.setModifiedTime(1425016502010L);
|
||||
Map<String, Object> info2 = new HashMap<String, Object>();
|
||||
info1.put("info2", 4);
|
||||
entity2.addInfo(info2);
|
||||
@ -203,7 +200,6 @@ private static void loadEntityData() throws Exception {
|
||||
entity3.setId("id_3");
|
||||
entity3.setType("app");
|
||||
entity3.setCreatedTime(1425016501050L);
|
||||
entity3.setModifiedTime(1425016502010L);
|
||||
Map<String, Object> info3 = new HashMap<String, Object>();
|
||||
info3.put("info2", 3.5);
|
||||
entity3.addInfo(info3);
|
||||
@ -239,7 +235,6 @@ private static void loadEntityData() throws Exception {
|
||||
entity4.setId("id_4");
|
||||
entity4.setType("app");
|
||||
entity4.setCreatedTime(1425016502050L);
|
||||
entity4.setModifiedTime(1425016503010L);
|
||||
TimelineEvent event44 = new TimelineEvent();
|
||||
event44.setId("event_4");
|
||||
event44.setTimestamp(1425016502003L);
|
||||
@ -252,7 +247,6 @@ private static void loadEntityData() throws Exception {
|
||||
entity5.setId("id_5");
|
||||
entity5.setType("app");
|
||||
entity5.setCreatedTime(1425016502050L);
|
||||
entity5.setModifiedTime(1425016503010L);
|
||||
writeEntityFile(entity5, appDir2);
|
||||
}
|
||||
|
||||
@ -263,7 +257,7 @@ public TimelineReader getTimelineReader() {
|
||||
@Test
|
||||
public void testGetEntityDefaultView() throws Exception {
|
||||
// If no fields are specified, entity is returned with default view i.e.
|
||||
// only the id, created and modified time
|
||||
// only the id, type and created time.
|
||||
TimelineEntity result =
|
||||
reader.getEntity("user1", "cluster1", "flow1", 1L, "app1",
|
||||
"app", "id_1", null, null, null);
|
||||
@ -271,7 +265,6 @@ public void testGetEntityDefaultView() throws Exception {
|
||||
(new TimelineEntity.Identifier("app", "id_1")).toString(),
|
||||
result.getIdentifier().toString());
|
||||
Assert.assertEquals(1425016502000L, result.getCreatedTime());
|
||||
Assert.assertEquals(1425016503000L, result.getModifiedTime());
|
||||
Assert.assertEquals(0, result.getConfigs().size());
|
||||
Assert.assertEquals(0, result.getMetrics().size());
|
||||
}
|
||||
@ -286,7 +279,6 @@ public void testGetEntityByClusterAndApp() throws Exception {
|
||||
(new TimelineEntity.Identifier("app", "id_1")).toString(),
|
||||
result.getIdentifier().toString());
|
||||
Assert.assertEquals(1425016502000L, result.getCreatedTime());
|
||||
Assert.assertEquals(1425016503000L, result.getModifiedTime());
|
||||
Assert.assertEquals(0, result.getConfigs().size());
|
||||
Assert.assertEquals(0, result.getMetrics().size());
|
||||
}
|
||||
@ -303,7 +295,6 @@ public void testAppFlowMappingCsv() throws Exception {
|
||||
(new TimelineEntity.Identifier("app", "id_5")).toString(),
|
||||
result.getIdentifier().toString());
|
||||
Assert.assertEquals(1425016502050L, result.getCreatedTime());
|
||||
Assert.assertEquals(1425016503010L, result.getModifiedTime());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -317,7 +308,6 @@ public void testGetEntityCustomFields() throws Exception {
|
||||
(new TimelineEntity.Identifier("app", "id_1")).toString(),
|
||||
result.getIdentifier().toString());
|
||||
Assert.assertEquals(1425016502000L, result.getCreatedTime());
|
||||
Assert.assertEquals(1425016503000L, result.getModifiedTime());
|
||||
Assert.assertEquals(3, result.getConfigs().size());
|
||||
Assert.assertEquals(3, result.getMetrics().size());
|
||||
Assert.assertEquals(1, result.getInfo().size());
|
||||
@ -335,7 +325,6 @@ public void testGetEntityAllFields() throws Exception {
|
||||
(new TimelineEntity.Identifier("app", "id_1")).toString(),
|
||||
result.getIdentifier().toString());
|
||||
Assert.assertEquals(1425016502000L, result.getCreatedTime());
|
||||
Assert.assertEquals(1425016503000L, result.getModifiedTime());
|
||||
Assert.assertEquals(3, result.getConfigs().size());
|
||||
Assert.assertEquals(3, result.getMetrics().size());
|
||||
// All fields including events will be returned.
|
||||
@ -346,8 +335,8 @@ public void testGetEntityAllFields() throws Exception {
|
||||
public void testGetAllEntities() throws Exception {
|
||||
Set<TimelineEntity> result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, null, null, null, null, null, null, null, null, null,
|
||||
null, null, null, null);
|
||||
null, null, null, null, null, null, null, null, null, null, null,
|
||||
null);
|
||||
// All 3 entities will be returned
|
||||
Assert.assertEquals(4, result.size());
|
||||
}
|
||||
@ -356,8 +345,8 @@ public void testGetAllEntities() throws Exception {
|
||||
public void testGetEntitiesWithLimit() throws Exception {
|
||||
Set<TimelineEntity> result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
2L, null, null, null, null, null, null, null, null, null,
|
||||
null, null, null, null);
|
||||
2L, null, null, null, null, null, null, null, null, null, null,
|
||||
null);
|
||||
Assert.assertEquals(2, result.size());
|
||||
// Needs to be rewritten once hashcode and equals for
|
||||
// TimelineEntity is implemented
|
||||
@ -370,8 +359,8 @@ public void testGetEntitiesWithLimit() throws Exception {
|
||||
}
|
||||
result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
3L, null, null, null, null, null, null, null, null, null,
|
||||
null, null, null, null);
|
||||
3L, null, null, null, null, null, null, null, null, null, null,
|
||||
null);
|
||||
// Even though 2 entities out of 4 have same created time, one entity
|
||||
// is left out due to limit
|
||||
Assert.assertEquals(3, result.size());
|
||||
@ -383,7 +372,7 @@ public void testGetEntitiesByTimeWindows() throws Exception {
|
||||
Set<TimelineEntity> result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, 1425016502030L, 1425016502060L, null, null, null, null, null,
|
||||
null, null, null, null, null, null);
|
||||
null, null, null, null);
|
||||
Assert.assertEquals(1, result.size());
|
||||
// Only one entity with ID id_4 should be returned.
|
||||
for (TimelineEntity entity : result) {
|
||||
@ -396,7 +385,7 @@ public void testGetEntitiesByTimeWindows() throws Exception {
|
||||
result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, null, 1425016502010L, null, null, null, null, null, null,
|
||||
null, null, null, null, null);
|
||||
null, null, null);
|
||||
Assert.assertEquals(3, result.size());
|
||||
for (TimelineEntity entity : result) {
|
||||
if (entity.getId().equals("id_4")) {
|
||||
@ -408,50 +397,13 @@ public void testGetEntitiesByTimeWindows() throws Exception {
|
||||
result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, 1425016502010L, null, null, null, null, null, null, null,
|
||||
null, null, null, null, null);
|
||||
null, null, null);
|
||||
Assert.assertEquals(1, result.size());
|
||||
for (TimelineEntity entity : result) {
|
||||
if (!entity.getId().equals("id_4")) {
|
||||
Assert.fail("Incorrect filtering based on created time range");
|
||||
}
|
||||
}
|
||||
|
||||
// Get entities based on modified time start and end time range.
|
||||
result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, null, null, 1425016502090L, 1425016503020L, null, null, null,
|
||||
null, null, null, null, null, null);
|
||||
Assert.assertEquals(2, result.size());
|
||||
// Two entities with IDs' id_1 and id_4 should be returned.
|
||||
for (TimelineEntity entity : result) {
|
||||
if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) {
|
||||
Assert.fail("Incorrect filtering based on modified time range");
|
||||
}
|
||||
}
|
||||
|
||||
// Get entities if only modified time end is specified.
|
||||
result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, null, null, null, 1425016502090L, null, null, null, null,
|
||||
null, null, null, null, null);
|
||||
Assert.assertEquals(2, result.size());
|
||||
for (TimelineEntity entity : result) {
|
||||
if (entity.getId().equals("id_1") || entity.getId().equals("id_4")) {
|
||||
Assert.fail("Incorrect filtering based on modified time range");
|
||||
}
|
||||
}
|
||||
|
||||
// Get entities if only modified time start is specified.
|
||||
result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, null, null, 1425016503005L, null, null, null, null, null,
|
||||
null, null, null, null, null);
|
||||
Assert.assertEquals(1, result.size());
|
||||
for (TimelineEntity entity : result) {
|
||||
if (!entity.getId().equals("id_4")) {
|
||||
Assert.fail("Incorrect filtering based on modified time range");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -461,8 +413,8 @@ public void testGetFilteredEntities() throws Exception {
|
||||
infoFilters.put("info2", 3.5);
|
||||
Set<TimelineEntity> result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, null, null, null, null, null, null, infoFilters, null, null,
|
||||
null, null, null, null);
|
||||
null, null, null, null, null, infoFilters, null, null, null, null,
|
||||
null, null);
|
||||
Assert.assertEquals(1, result.size());
|
||||
// Only one entity with ID id_3 should be returned.
|
||||
for (TimelineEntity entity : result) {
|
||||
@ -477,8 +429,8 @@ public void testGetFilteredEntities() throws Exception {
|
||||
configFilters.put("config_3", "abc");
|
||||
result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, null, null, null, null, null, null, null, configFilters, null,
|
||||
null, null, null, null);
|
||||
null, null, null, null, null, null, configFilters, null, null, null,
|
||||
null, null);
|
||||
Assert.assertEquals(2, result.size());
|
||||
for (TimelineEntity entity : result) {
|
||||
if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
|
||||
@ -492,8 +444,8 @@ public void testGetFilteredEntities() throws Exception {
|
||||
eventFilters.add("event_4");
|
||||
result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, null, null, null, null, null, null, null, null, null,
|
||||
eventFilters, null, null, null);
|
||||
null, null, null, null, null, null, null, null, eventFilters, null,
|
||||
null, null);
|
||||
Assert.assertEquals(1, result.size());
|
||||
for (TimelineEntity entity : result) {
|
||||
if (!entity.getId().equals("id_3")) {
|
||||
@ -506,8 +458,8 @@ public void testGetFilteredEntities() throws Exception {
|
||||
metricFilters.add("metric3");
|
||||
result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, null, null, null, null, null, null, null, null, metricFilters,
|
||||
null, null, null, null);
|
||||
null, null, null, null, null, null, null, metricFilters, null, null,
|
||||
null, null);
|
||||
Assert.assertEquals(2, result.size());
|
||||
// Two entities with IDs' id_1 and id_2 should be returned.
|
||||
for (TimelineEntity entity : result) {
|
||||
@ -526,8 +478,8 @@ public void testGetEntitiesByRelations() throws Exception {
|
||||
relatesTo.put("flow", relatesToIds);
|
||||
Set<TimelineEntity> result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, null, null, null, null, relatesTo, null, null, null, null,
|
||||
null, null, null, null);
|
||||
null, null, null, relatesTo, null, null, null, null, null, null,
|
||||
null, null);
|
||||
Assert.assertEquals(1, result.size());
|
||||
// Only one entity with ID id_1 should be returned.
|
||||
for (TimelineEntity entity : result) {
|
||||
@ -543,8 +495,8 @@ public void testGetEntitiesByRelations() throws Exception {
|
||||
isRelatedTo.put("type1", isRelatedToIds);
|
||||
result =
|
||||
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
|
||||
null, null, null, null, null, null, isRelatedTo, null, null, null,
|
||||
null, null, null, null);
|
||||
null, null, null, null, isRelatedTo, null, null, null, null, null,
|
||||
null, null);
|
||||
Assert.assertEquals(2, result.size());
|
||||
// Two entities with IDs' id_1 and id_3 should be returned.
|
||||
for (TimelineEntity entity : result) {
|
||||
|
@ -49,7 +49,6 @@ public void testWriteEntityToFile() throws Exception {
|
||||
entity.setId(id);
|
||||
entity.setType(type);
|
||||
entity.setCreatedTime(1425016501000L);
|
||||
entity.setModifiedTime(1425016502000L);
|
||||
te.addEntity(entity);
|
||||
|
||||
FileSystemTimelineWriterImpl fsi = null;
|
||||
@ -80,7 +79,7 @@ public void testWriteEntityToFile() throws Exception {
|
||||
assertTrue(!(f.exists()));
|
||||
} finally {
|
||||
if (fsi != null) {
|
||||
fsi.stop();
|
||||
fsi.close();
|
||||
FileUtils.deleteDirectory(new File(fsi.getOutputRoot()));
|
||||
}
|
||||
}
|
||||
|
@ -74,7 +74,6 @@
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* Various tests to test writing entities to HBase and reading them back from
|
||||
@ -112,9 +111,7 @@ private static void loadApps() throws IOException {
|
||||
entity.setId(id);
|
||||
entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
||||
Long cTime = 1425016501000L;
|
||||
Long mTime = 1425026901000L;
|
||||
entity.setCreatedTime(cTime);
|
||||
entity.setModifiedTime(mTime);
|
||||
// add the info map in Timeline Entity
|
||||
Map<String, Object> infoMap = new HashMap<String, Object>();
|
||||
infoMap.put("infoMapKey1", "infoMapValue1");
|
||||
@ -177,7 +174,6 @@ private static void loadApps() throws IOException {
|
||||
entity1.setId(id1);
|
||||
entity1.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
||||
entity1.setCreatedTime(cTime);
|
||||
entity1.setModifiedTime(mTime);
|
||||
|
||||
// add the info map in Timeline Entity
|
||||
Map<String, Object> infoMap1 = new HashMap<String, Object>();
|
||||
@ -235,7 +231,6 @@ private static void loadApps() throws IOException {
|
||||
entity2.setId(id2);
|
||||
entity2.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
||||
entity2.setCreatedTime(cTime);
|
||||
entity2.setModifiedTime(mTime);
|
||||
te2.addEntity(entity2);
|
||||
HBaseTimelineWriterImpl hbi = null;
|
||||
try {
|
||||
@ -270,9 +265,7 @@ private static void loadEntities() throws IOException {
|
||||
entity.setId(id);
|
||||
entity.setType(type);
|
||||
Long cTime = 1425016501000L;
|
||||
Long mTime = 1425026901000L;
|
||||
entity.setCreatedTime(cTime);
|
||||
entity.setModifiedTime(mTime);
|
||||
// add the info map in Timeline Entity
|
||||
Map<String, Object> infoMap = new HashMap<String, Object>();
|
||||
infoMap.put("infoMapKey1", "infoMapValue1");
|
||||
@ -333,7 +326,6 @@ private static void loadEntities() throws IOException {
|
||||
entity1.setId(id1);
|
||||
entity1.setType(type);
|
||||
entity1.setCreatedTime(cTime);
|
||||
entity1.setModifiedTime(mTime);
|
||||
|
||||
// add the info map in Timeline Entity
|
||||
Map<String, Object> infoMap1 = new HashMap<String, Object>();
|
||||
@ -390,7 +382,6 @@ private static void loadEntities() throws IOException {
|
||||
entity2.setId(id2);
|
||||
entity2.setType(type);
|
||||
entity2.setCreatedTime(cTime);
|
||||
entity2.setModifiedTime(mTime);
|
||||
te.addEntity(entity2);
|
||||
HBaseTimelineWriterImpl hbi = null;
|
||||
try {
|
||||
@ -444,9 +435,7 @@ public void testWriteApplicationToHBase() throws Exception {
|
||||
String appId = "application_1000178881110_2002";
|
||||
entity.setId(appId);
|
||||
long cTime = 1425016501000L;
|
||||
long mTime = 1425026901000L;
|
||||
entity.setCreatedTime(cTime);
|
||||
entity.setModifiedTime(mTime);
|
||||
|
||||
// add the info map in Timeline Entity
|
||||
Map<String, Object> infoMap = new HashMap<String, Object>();
|
||||
@ -522,7 +511,7 @@ public void testWriteApplicationToHBase() throws Exception {
|
||||
Result result = new ApplicationTable().getResult(c1, conn, get);
|
||||
|
||||
assertTrue(result != null);
|
||||
assertEquals(16, result.size());
|
||||
assertEquals(15, result.size());
|
||||
|
||||
// check the row key
|
||||
byte[] row1 = result.getRow();
|
||||
@ -538,10 +527,6 @@ public void testWriteApplicationToHBase() throws Exception {
|
||||
long cTime1 = val.longValue();
|
||||
assertEquals(cTime1, cTime);
|
||||
|
||||
val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result);
|
||||
long mTime1 = val.longValue();
|
||||
assertEquals(mTime1, mTime);
|
||||
|
||||
Map<String, Object> infoColumns =
|
||||
ApplicationColumnPrefix.INFO.readResults(result);
|
||||
assertEquals(infoMap, infoColumns);
|
||||
@ -599,7 +584,6 @@ public void testWriteApplicationToHBase() throws Exception {
|
||||
assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
|
||||
e1.getType());
|
||||
assertEquals(cTime, e1.getCreatedTime());
|
||||
assertEquals(mTime, e1.getModifiedTime());
|
||||
Map<String, Object> infoMap2 = e1.getInfo();
|
||||
assertEquals(infoMap, infoMap2);
|
||||
|
||||
@ -635,9 +619,7 @@ public void testWriteEntityToHBase() throws Exception {
|
||||
entity.setId(id);
|
||||
entity.setType(type);
|
||||
long cTime = 1425016501000L;
|
||||
long mTime = 1425026901000L;
|
||||
entity.setCreatedTime(cTime);
|
||||
entity.setModifiedTime(mTime);
|
||||
|
||||
// add the info map in Timeline Entity
|
||||
Map<String, Object> infoMap = new HashMap<String, Object>();
|
||||
@ -736,10 +718,6 @@ public void testWriteEntityToHBase() throws Exception {
|
||||
long cTime1 = val.longValue();
|
||||
assertEquals(cTime1, cTime);
|
||||
|
||||
val = (Number) EntityColumn.MODIFIED_TIME.readResult(result);
|
||||
long mTime1 = val.longValue();
|
||||
assertEquals(mTime1, mTime);
|
||||
|
||||
Map<String, Object> infoColumns =
|
||||
EntityColumnPrefix.INFO.readResults(result);
|
||||
assertEquals(infoMap, infoColumns);
|
||||
@ -790,7 +768,7 @@ public void testWriteEntityToHBase() throws Exception {
|
||||
}
|
||||
}
|
||||
assertEquals(1, rowCount);
|
||||
assertEquals(17, colCount);
|
||||
assertEquals(16, colCount);
|
||||
|
||||
// read the timeline entity using the reader this time
|
||||
TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
|
||||
@ -798,8 +776,7 @@ public void testWriteEntityToHBase() throws Exception {
|
||||
EnumSet.of(TimelineReader.Field.ALL));
|
||||
Set<TimelineEntity> es1 = reader.getEntities(user, cluster, flow, runid,
|
||||
appName, entity.getType(), null, null, null, null, null, null, null,
|
||||
null, null, null, null, null, null,
|
||||
EnumSet.of(TimelineReader.Field.ALL));
|
||||
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
||||
assertNotNull(e1);
|
||||
assertEquals(1, es1.size());
|
||||
|
||||
@ -807,7 +784,6 @@ public void testWriteEntityToHBase() throws Exception {
|
||||
assertEquals(id, e1.getId());
|
||||
assertEquals(type, e1.getType());
|
||||
assertEquals(cTime, e1.getCreatedTime());
|
||||
assertEquals(mTime, e1.getModifiedTime());
|
||||
Map<String, Object> infoMap2 = e1.getInfo();
|
||||
assertEquals(infoMap, infoMap2);
|
||||
|
||||
@ -1041,8 +1017,7 @@ public void testEventsWithEmptyInfo() throws IOException {
|
||||
EnumSet.of(TimelineReader.Field.ALL));
|
||||
Set<TimelineEntity> es1 = reader.getEntities(user, cluster, flow, runid,
|
||||
appName, entity.getType(), null, null, null, null, null, null, null,
|
||||
null, null, null, null, null, null,
|
||||
EnumSet.of(TimelineReader.Field.ALL));
|
||||
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
||||
assertNotNull(e1);
|
||||
assertEquals(1, es1.size());
|
||||
|
||||
@ -1069,7 +1044,6 @@ public void testNonIntegralMetricValues() throws IOException {
|
||||
String appId = "application_1000178881110_2002";
|
||||
entityApp.setId(appId);
|
||||
entityApp.setCreatedTime(1425016501000L);
|
||||
entityApp.setModifiedTime(1425026901000L);
|
||||
// add metrics with floating point values
|
||||
Set<TimelineMetric> metricsApp = new HashSet<>();
|
||||
TimelineMetric mApp = new TimelineMetric();
|
||||
@ -1089,7 +1063,6 @@ public void testNonIntegralMetricValues() throws IOException {
|
||||
entity.setId("hello");
|
||||
entity.setType("world");
|
||||
entity.setCreatedTime(1425016501000L);
|
||||
entity.setModifiedTime(1425026901000L);
|
||||
// add metrics with floating point values
|
||||
Set<TimelineMetric> metricsEntity = new HashSet<>();
|
||||
TimelineMetric mEntity = new TimelineMetric();
|
||||
@ -1135,8 +1108,8 @@ public void testReadEntities() throws Exception {
|
||||
assertEquals(1, e1.getIsRelatedToEntities().size());
|
||||
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
||||
null, null, null, null, null, null, null, null, null, null, null, null,
|
||||
null, EnumSet.of(Field.ALL));
|
||||
null, null, null, null, null, null, null, null, null, null, null,
|
||||
EnumSet.of(Field.ALL));
|
||||
assertEquals(3, es1.size());
|
||||
}
|
||||
|
||||
@ -1151,8 +1124,7 @@ public void testReadEntitiesDefaultView() throws Exception {
|
||||
e1.getRelatesToEntities().isEmpty());
|
||||
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
||||
null, null, null, null, null, null, null, null, null, null, null, null,
|
||||
null, null);
|
||||
null, null, null, null, null, null, null, null, null, null, null, null);
|
||||
assertEquals(3, es1.size());
|
||||
for (TimelineEntity e : es1) {
|
||||
assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
|
||||
@ -1172,8 +1144,8 @@ public void testReadEntitiesByFields() throws Exception {
|
||||
assertEquals(0, e1.getIsRelatedToEntities().size());
|
||||
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
||||
null, null, null, null, null, null, null, null, null, null, null, null,
|
||||
null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS));
|
||||
null, null, null, null, null, null, null, null, null, null, null,
|
||||
EnumSet.of(Field.IS_RELATED_TO, Field.METRICS));
|
||||
assertEquals(3, es1.size());
|
||||
int metricsCnt = 0;
|
||||
int isRelatedToCnt = 0;
|
||||
@ -1200,8 +1172,7 @@ public void testReadEntitiesConfigPrefix() throws Exception {
|
||||
assertEquals(1, e1.getConfigs().size());
|
||||
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
||||
null, null, null, null, null, null, null, null, null, null, null,
|
||||
list, null, null);
|
||||
null, null, null, null, null, null, null, null, null, list, null, null);
|
||||
int cfgCnt = 0;
|
||||
for (TimelineEntity entity : es1) {
|
||||
cfgCnt += entity.getConfigs().size();
|
||||
@ -1217,8 +1188,8 @@ public void testReadEntitiesConfigFilterPrefix() throws Exception {
|
||||
new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
|
||||
Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
||||
null, null, null, null, null, null, null, null, confFilters, null, null,
|
||||
list, null, null);
|
||||
null, null, null, null, null, null, confFilters, null, null, list, null,
|
||||
null);
|
||||
assertEquals(1, entities.size());
|
||||
int cfgCnt = 0;
|
||||
for (TimelineEntity entity : entities) {
|
||||
@ -1239,8 +1210,7 @@ public void testReadEntitiesMetricPrefix() throws Exception {
|
||||
assertEquals(1, e1.getMetrics().size());
|
||||
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
||||
null, null, null, null, null, null, null, null, null, null, null, null,
|
||||
list, null);
|
||||
null, null, null, null, null, null, null, null, null, null, list, null);
|
||||
int metricCnt = 0;
|
||||
for (TimelineEntity entity : es1) {
|
||||
metricCnt += entity.getMetrics().size();
|
||||
@ -1256,8 +1226,8 @@ public void testReadEntitiesMetricFilterPrefix() throws Exception {
|
||||
new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
|
||||
Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
||||
null, null, null, null, null, null, null, null, null, metricFilters,
|
||||
null, null, list, null);
|
||||
null, null, null, null, null, null, null, metricFilters, null, null,
|
||||
list, null);
|
||||
assertEquals(1, entities.size());
|
||||
int metricCnt = 0;
|
||||
for (TimelineEntity entity : entities) {
|
||||
@ -1278,8 +1248,7 @@ public void testReadApps() throws Exception {
|
||||
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
||||
null, null, null, null, null, null, null, null, null,
|
||||
EnumSet.of(Field.ALL));
|
||||
null, null, null, null, null, null, null, EnumSet.of(Field.ALL));
|
||||
assertEquals(3, es1.size());
|
||||
}
|
||||
|
||||
@ -1295,7 +1264,7 @@ public void testReadAppsDefaultView() throws Exception {
|
||||
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
||||
null, null, null, null, null, null, null, null, null, null);
|
||||
null, null, null, null, null, null, null, null);
|
||||
assertEquals(3, es1.size());
|
||||
for (TimelineEntity e : es1) {
|
||||
assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
|
||||
@ -1313,10 +1282,10 @@ public void testReadAppsByFields() throws Exception {
|
||||
assertNotNull(e1);
|
||||
assertEquals(3, e1.getConfigs().size());
|
||||
assertEquals(0, e1.getIsRelatedToEntities().size());
|
||||
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, null,
|
||||
Set<TimelineEntity> es1 = reader.getEntities(
|
||||
"user1", "cluster1", "some_flow_name", 1002345678919L, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
||||
null, null, null, null, null, null, null, null, null,
|
||||
null, null, null, null, null, null, null,
|
||||
EnumSet.of(Field.IS_RELATED_TO, Field.METRICS));
|
||||
assertEquals(3, es1.size());
|
||||
int metricsCnt = 0;
|
||||
@ -1342,10 +1311,10 @@ public void testReadAppsConfigPrefix() throws Exception {
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), null, list, null, null);
|
||||
assertNotNull(e1);
|
||||
assertEquals(1, e1.getConfigs().size());
|
||||
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, null,
|
||||
Set<TimelineEntity> es1 = reader.getEntities(
|
||||
"user1", "cluster1", "some_flow_name", 1002345678919L, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
||||
null, null, null, null, null, null, null, list, null, null);
|
||||
null, null, null, null, null, list, null, null);
|
||||
int cfgCnt = 0;
|
||||
for (TimelineEntity entity : es1) {
|
||||
cfgCnt += entity.getConfigs().size();
|
||||
@ -1359,10 +1328,10 @@ public void testReadAppsConfigFilterPrefix() throws Exception {
|
||||
TimelineFilterList list =
|
||||
new TimelineFilterList(Operator.OR,
|
||||
new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
|
||||
Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, null,
|
||||
Set<TimelineEntity> entities = reader.getEntities(
|
||||
"user1", "cluster1", "some_flow_name", 1002345678919L, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
||||
null, null, null, null, confFilters, null, null, list, null, null);
|
||||
null, null, confFilters, null, null, list, null, null);
|
||||
assertEquals(1, entities.size());
|
||||
int cfgCnt = 0;
|
||||
for (TimelineEntity entity : entities) {
|
||||
@ -1381,10 +1350,10 @@ public void testReadAppsMetricPrefix() throws Exception {
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), null, null, list, null);
|
||||
assertNotNull(e1);
|
||||
assertEquals(1, e1.getMetrics().size());
|
||||
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, null,
|
||||
Set<TimelineEntity> es1 = reader.getEntities(
|
||||
"user1", "cluster1", "some_flow_name", 1002345678919L, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
||||
null, null, null, null, null, null, null, null, list, null);
|
||||
null, null, null, null, null, null, list, null);
|
||||
int metricCnt = 0;
|
||||
for (TimelineEntity entity : es1) {
|
||||
metricCnt += entity.getMetrics().size();
|
||||
@ -1398,10 +1367,10 @@ public void testReadAppsMetricFilterPrefix() throws Exception {
|
||||
new TimelineFilterList(Operator.OR,
|
||||
new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
|
||||
Set<String> metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS");
|
||||
Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
|
||||
"some_flow_name", 1002345678919L, null,
|
||||
Set<TimelineEntity> entities = reader.getEntities(
|
||||
"user1", "cluster1", "some_flow_name", 1002345678919L, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
||||
null, null, null, null, null, metricFilters, null, null, list, null);
|
||||
null, null, null, metricFilters, null, null, list, null);
|
||||
int metricCnt = 0;
|
||||
assertEquals(1, entities.size());
|
||||
for (TimelineEntity entity : entities) {
|
||||
|
@ -111,7 +111,6 @@ private static TimelineEntity getTestAggregationTimelineEntity() {
|
||||
entity.setId(id);
|
||||
entity.setType(type);
|
||||
entity.setCreatedTime(1425016501000L);
|
||||
entity.setModifiedTime(1425016502000L);
|
||||
|
||||
TimelineMetric metric = new TimelineMetric();
|
||||
metric.setId("HDFS_BYTES_READ");
|
||||
|
@ -106,7 +106,6 @@ static TimelineEntity getEntity1() {
|
||||
long cTime = 20000000000000L;
|
||||
long mTime = 1425026901000L;
|
||||
entity.setCreatedTime(cTime);
|
||||
entity.setModifiedTime(mTime);
|
||||
// add metrics
|
||||
Set<TimelineMetric> metrics = new HashSet<>();
|
||||
TimelineMetric m1 = new TimelineMetric();
|
||||
|
@ -182,7 +182,7 @@ public void testWriteFlowRunMinMax() throws Exception {
|
||||
Set<TimelineEntity> entities =
|
||||
hbr.getEntities(null, cluster, null, null, null,
|
||||
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, null, null, null, null, null, null, null, null);
|
||||
assertEquals(1, entities.size());
|
||||
for (TimelineEntity e : entities) {
|
||||
FlowActivityEntity flowActivity = (FlowActivityEntity)e;
|
||||
@ -238,7 +238,7 @@ public void testWriteFlowActivityOneFlow() throws Exception {
|
||||
Set<TimelineEntity> entities =
|
||||
hbr.getEntities(user, cluster, flow, null, null,
|
||||
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, null, null, null, null, null, null, null, null);
|
||||
assertEquals(1, entities.size());
|
||||
for (TimelineEntity e : entities) {
|
||||
FlowActivityEntity entity = (FlowActivityEntity)e;
|
||||
@ -353,7 +353,7 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
|
||||
Set<TimelineEntity> entities =
|
||||
hbr.getEntities(null, cluster, null, null, null,
|
||||
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, null, null, null, null, null, null, null, null);
|
||||
assertEquals(1, entities.size());
|
||||
for (TimelineEntity e : entities) {
|
||||
FlowActivityEntity flowActivity = (FlowActivityEntity)e;
|
||||
|
@ -375,8 +375,7 @@ public void testWriteFlowRunMetricsPrefix() throws Exception {
|
||||
|
||||
Set<TimelineEntity> entities = hbr.getEntities(user, cluster, flow, runid,
|
||||
null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null,
|
||||
null, null, null, null, null, null, null, null, null,
|
||||
metricsToRetrieve, null);
|
||||
null, null, null, null, null, null, null, metricsToRetrieve, null);
|
||||
assertEquals(1, entities.size());
|
||||
for (TimelineEntity timelineEntity : entities) {
|
||||
Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
|
||||
@ -444,7 +443,7 @@ public void testWriteFlowRunsMetricFields() throws Exception {
|
||||
hbr.start();
|
||||
Set<TimelineEntity> entities = hbr.getEntities(user, cluster, flow, runid,
|
||||
null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null,
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, null, null, null, null, null, null, null, null);
|
||||
assertEquals(1, entities.size());
|
||||
for (TimelineEntity timelineEntity : entities) {
|
||||
assertEquals(0, timelineEntity.getMetrics().size());
|
||||
@ -452,8 +451,8 @@ public void testWriteFlowRunsMetricFields() throws Exception {
|
||||
|
||||
entities = hbr.getEntities(user, cluster, flow, runid,
|
||||
null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null,
|
||||
null, null, null, null, null, null, null, null, null,
|
||||
null, EnumSet.of(Field.METRICS));
|
||||
null, null, null, null, null, null, null, null,
|
||||
EnumSet.of(Field.METRICS));
|
||||
assertEquals(1, entities.size());
|
||||
for (TimelineEntity timelineEntity : entities) {
|
||||
Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
|
||||
|
Loading…
Reference in New Issue
Block a user