YARN-4445. Unify the term flowId and flowName in timeline v2 codebase.
Contributed by Zhan Zhang.
This commit is contained in:
parent
c4d7bbda5c
commit
8ef546c1ee
@ -245,7 +245,7 @@ private void checkNewTimelineEvent(ApplicationId appId) throws IOException {
|
||||
Assert.assertTrue(tmpRootFolder.isDirectory());
|
||||
String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
|
||||
UserGroupInformation.getCurrentUser().getShortUserName() +
|
||||
"/" + TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
|
||||
"/" + TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId) +
|
||||
"/1/1/" + appId.toString();
|
||||
// for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs
|
||||
String outputDirJob = basePath + "/MAPREDUCE_JOB/";
|
||||
|
@ -493,7 +493,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
|
||||
YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
|
||||
UserGroupInformation.getCurrentUser().getShortUserName() +
|
||||
(defaultFlow ? "/" +
|
||||
TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
|
||||
TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId) +
|
||||
"/1/1/" : "/test_flow_name/test_flow_version/12345678/") +
|
||||
appId.toString();
|
||||
// for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
|
||||
|
@ -160,7 +160,7 @@ public static Text buildTimelineTokenService(Configuration conf) {
|
||||
return SecurityUtil.buildTokenService(timelineServiceAddr);
|
||||
}
|
||||
|
||||
public static String generateDefaultFlowIdBasedOnAppId(ApplicationId appId) {
|
||||
public static String generateDefaultFlowNameBasedOnAppId(ApplicationId appId) {
|
||||
return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId();
|
||||
}
|
||||
|
||||
|
@ -307,7 +307,7 @@ private String getTimelineEntityDir(RMApp app) {
|
||||
+ "/"
|
||||
+ app.getUser()
|
||||
+ "/"
|
||||
+ TimelineUtils.generateDefaultFlowIdBasedOnAppId(app
|
||||
+ TimelineUtils.generateDefaultFlowNameBasedOnAppId(app
|
||||
.getApplicationId()) + "/" + DEFAULT_FLOW_VERSION + "/"
|
||||
+ DEFAULT_FLOW_RUN + "/" + app.getApplicationId();
|
||||
return outputDirApp;
|
||||
|
@ -55,7 +55,7 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
// Current user usually is not the app user, but keep this field non-null
|
||||
context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName());
|
||||
// Use app ID to generate a default flow name for orphan app
|
||||
context.setFlowName(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId));
|
||||
context.setFlowName(TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId));
|
||||
// Set the flow version to string 1 if it's an orphan app
|
||||
context.setFlowVersion("1");
|
||||
// Set the flow run ID to 1 if it's an orphan app
|
||||
|
@ -66,7 +66,7 @@ private static String getClusterID(String clusterId, Configuration conf) {
|
||||
* @see TimelineReader#getEntities
|
||||
*/
|
||||
Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
@ -74,7 +74,7 @@ Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
EnumSet<Field> fieldsToRetrieve) throws IOException {
|
||||
String cluster = getClusterID(clusterId, getConfig());
|
||||
return reader.getEntities(userId, cluster, flowId, flowRunId, appId,
|
||||
return reader.getEntities(userId, cluster, flowName, flowRunId, appId,
|
||||
entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
|
||||
metricFilters, eventFilters, null, null, fieldsToRetrieve);
|
||||
@ -87,10 +87,10 @@ Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
* @see TimelineReader#getEntity
|
||||
*/
|
||||
public TimelineEntity getEntity(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
String entityId, EnumSet<Field> fields) throws IOException {
|
||||
String cluster = getClusterID(clusterId, getConfig());
|
||||
return reader.getEntity(userId, cluster, flowId, flowRunId, appId,
|
||||
return reader.getEntity(userId, cluster, flowName, flowRunId, appId,
|
||||
entityType, entityId, null, null, fields);
|
||||
}
|
||||
}
|
||||
|
@ -326,7 +326,7 @@ public Set<TimelineEntity> getEntities(
|
||||
@PathParam("appid") String appId,
|
||||
@PathParam("entitytype") String entityType,
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("flowid") String flowId,
|
||||
@QueryParam("flowname") String flowName,
|
||||
@QueryParam("flowrunid") String flowRunId,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@ -340,7 +340,7 @@ public Set<TimelineEntity> getEntities(
|
||||
@QueryParam("metricfilters") String metricfilters,
|
||||
@QueryParam("eventfilters") String eventfilters,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, null, appId, entityType, userId, flowId,
|
||||
return getEntities(req, res, null, appId, entityType, userId, flowName,
|
||||
flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
@ -359,7 +359,7 @@ public Set<TimelineEntity> getEntities(
|
||||
@PathParam("appid") String appId,
|
||||
@PathParam("entitytype") String entityType,
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("flowid") String flowId,
|
||||
@QueryParam("flowname") String flowName,
|
||||
@QueryParam("flowrunid") String flowRunId,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@ -384,7 +384,7 @@ public Set<TimelineEntity> getEntities(
|
||||
Set<TimelineEntity> entities = null;
|
||||
try {
|
||||
entities = timelineReaderManager.getEntities(
|
||||
parseStr(userId), parseStr(clusterId), parseStr(flowId),
|
||||
parseStr(userId), parseStr(clusterId), parseStr(flowName),
|
||||
parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
|
||||
parseLongStr(limit), parseLongStr(createdTimeStart),
|
||||
parseLongStr(createdTimeEnd), parseLongStr(modifiedTimeStart),
|
||||
@ -423,11 +423,11 @@ public TimelineEntity getEntity(
|
||||
@PathParam("entitytype") String entityType,
|
||||
@PathParam("entityid") String entityId,
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("flowid") String flowId,
|
||||
@QueryParam("flowname") String flowName,
|
||||
@QueryParam("flowrunid") String flowRunId,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntity(req, res, null, appId, entityType, entityId, userId,
|
||||
flowId, flowRunId, fields);
|
||||
flowName, flowRunId, fields);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -444,7 +444,7 @@ public TimelineEntity getEntity(
|
||||
@PathParam("entitytype") String entityType,
|
||||
@PathParam("entityid") String entityId,
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("flowid") String flowId,
|
||||
@QueryParam("flowname") String flowName,
|
||||
@QueryParam("flowrunid") String flowRunId,
|
||||
@QueryParam("fields") String fields) {
|
||||
String url = req.getRequestURI() +
|
||||
@ -458,7 +458,7 @@ public TimelineEntity getEntity(
|
||||
TimelineEntity entity = null;
|
||||
try {
|
||||
entity = timelineReaderManager.getEntity(
|
||||
parseStr(userId), parseStr(clusterId), parseStr(flowId),
|
||||
parseStr(userId), parseStr(clusterId), parseStr(flowName),
|
||||
parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
|
||||
parseStr(entityId), parseFieldsStr(fields, COMMA_DELIMITER));
|
||||
} catch (Exception e) {
|
||||
@ -481,30 +481,30 @@ public TimelineEntity getEntity(
|
||||
* Cluster ID is not provided by client so default cluster ID has to be taken.
|
||||
*/
|
||||
@GET
|
||||
@Path("/flowrun/{userid}/{flowid}/{flowrunid}/")
|
||||
@Path("/flowrun/{userid}/{flowname}/{flowrunid}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public TimelineEntity getFlowRun(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("userid") String userId,
|
||||
@PathParam("flowid") String flowId,
|
||||
@PathParam("flowname") String flowName,
|
||||
@PathParam("flowrunid") String flowRunId,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getFlowRun(req, res, userId, null, flowId, flowRunId, fields);
|
||||
return getFlowRun(req, res, userId, null, flowName, flowRunId, fields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a single flow run for the given user, cluster, flow id and run id.
|
||||
*/
|
||||
@GET
|
||||
@Path("/flowrun/{userid}/{clusterid}/{flowid}/{flowrunid}/")
|
||||
@Path("/flowrun/{userid}/{clusterid}/{flowname}/{flowrunid}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public TimelineEntity getFlowRun(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("userid") String userId,
|
||||
@PathParam("clusterid") String clusterId,
|
||||
@PathParam("flowid") String flowId,
|
||||
@PathParam("flowname") String flowName,
|
||||
@PathParam("flowrunid") String flowRunId,
|
||||
@QueryParam("fields") String fields) {
|
||||
String url = req.getRequestURI() +
|
||||
@ -518,7 +518,7 @@ public TimelineEntity getFlowRun(
|
||||
TimelineEntity entity = null;
|
||||
try {
|
||||
entity = timelineReaderManager.getEntity(parseStr(userId),
|
||||
parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId), null,
|
||||
parseStr(clusterId), parseStr(flowName), parseLongStr(flowRunId), null,
|
||||
TimelineEntityType.YARN_FLOW_RUN.toString(), null,
|
||||
parseFieldsStr(fields, COMMA_DELIMITER));
|
||||
} catch (Exception e) {
|
||||
@ -528,7 +528,7 @@ public TimelineEntity getFlowRun(
|
||||
if (entity == null) {
|
||||
LOG.info("Processed URL " + url + " but flowrun not found (Took " +
|
||||
(endTime - startTime) + " ms.)");
|
||||
throw new NotFoundException("Flow run {flow id: " + parseStr(flowId) +
|
||||
throw new NotFoundException("Flow run {flow id: " + parseStr(flowName) +
|
||||
", run id: " + parseLongStr(flowRunId) + " } is not found");
|
||||
}
|
||||
LOG.info("Processed URL " + url +
|
||||
@ -541,18 +541,18 @@ public TimelineEntity getFlowRun(
|
||||
* Cluster ID is not provided by client so default cluster ID has to be taken.
|
||||
*/
|
||||
@GET
|
||||
@Path("/flowruns/{userid}/{flowid}/")
|
||||
@Path("/flowruns/{userid}/{flowname}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Set<TimelineEntity> getFlowRuns(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("userid") String userId,
|
||||
@PathParam("flowid") String flowId,
|
||||
@PathParam("flowname") String flowName,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getFlowRuns(req, res, userId, null, flowId, limit, createdTimeStart,
|
||||
return getFlowRuns(req, res, userId, null, flowName, limit, createdTimeStart,
|
||||
createdTimeEnd, fields);
|
||||
}
|
||||
|
||||
@ -560,14 +560,14 @@ public Set<TimelineEntity> getFlowRuns(
|
||||
* Return a set of flow runs for the given user, cluster and flow id.
|
||||
*/
|
||||
@GET
|
||||
@Path("/flowruns/{userid}/{clusterid}/{flowid}/")
|
||||
@Path("/flowruns/{userid}/{clusterid}/{flowname}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Set<TimelineEntity> getFlowRuns(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("userid") String userId,
|
||||
@PathParam("clusterid") String clusterId,
|
||||
@PathParam("flowid") String flowId,
|
||||
@PathParam("flowname") String flowName,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@ -583,7 +583,7 @@ public Set<TimelineEntity> getFlowRuns(
|
||||
Set<TimelineEntity> entities = null;
|
||||
try {
|
||||
entities = timelineReaderManager.getEntities(
|
||||
parseStr(userId), parseStr(clusterId), parseStr(flowId), null, null,
|
||||
parseStr(userId), parseStr(clusterId), parseStr(flowName), null, null,
|
||||
TimelineEntityType.YARN_FLOW_RUN.toString(), parseLongStr(limit),
|
||||
parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd), null,
|
||||
null, null, null, null, null, null, null,
|
||||
@ -692,11 +692,11 @@ public TimelineEntity getApp(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("appid") String appId,
|
||||
@QueryParam("flowid") String flowId,
|
||||
@QueryParam("flowname") String flowName,
|
||||
@QueryParam("flowrunid") String flowRunId,
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getApp(req, res, null, appId, flowId, flowRunId, userId, fields);
|
||||
return getApp(req, res, null, appId, flowName, flowRunId, userId, fields);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -710,7 +710,7 @@ public TimelineEntity getApp(
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("clusterid") String clusterId,
|
||||
@PathParam("appid") String appId,
|
||||
@QueryParam("flowid") String flowId,
|
||||
@QueryParam("flowname") String flowName,
|
||||
@QueryParam("flowrunid") String flowRunId,
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("fields") String fields) {
|
||||
@ -725,7 +725,7 @@ public TimelineEntity getApp(
|
||||
TimelineEntity entity = null;
|
||||
try {
|
||||
entity = timelineReaderManager.getEntity(parseStr(userId),
|
||||
parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId),
|
||||
parseStr(clusterId), parseStr(flowName), parseLongStr(flowRunId),
|
||||
parseStr(appId), TimelineEntityType.YARN_APPLICATION.toString(), null,
|
||||
parseFieldsStr(fields, COMMA_DELIMITER));
|
||||
} catch (Exception e) {
|
||||
@ -749,13 +749,13 @@ public TimelineEntity getApp(
|
||||
* is reached, will be returned.
|
||||
*/
|
||||
@GET
|
||||
@Path("/flowrunapps/{userid}/{flowid}/{flowrunid}/")
|
||||
@Path("/flowrunapps/{userid}/{flowname}/{flowrunid}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Set<TimelineEntity> getFlowRunApps(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("userid") String userId,
|
||||
@PathParam("flowid") String flowId,
|
||||
@PathParam("flowname") String flowName,
|
||||
@PathParam("flowrunid") String flowRunId,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@ -770,7 +770,7 @@ public Set<TimelineEntity> getFlowRunApps(
|
||||
@QueryParam("eventfilters") String eventfilters,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, null, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
|
||||
flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
@ -782,14 +782,14 @@ public Set<TimelineEntity> getFlowRunApps(
|
||||
* till the limit is reached, will be returned.
|
||||
*/
|
||||
@GET
|
||||
@Path("/flowrunapps/{userid}/{clusterid}/{flowid}/{flowrunid}/")
|
||||
@Path("/flowrunapps/{userid}/{clusterid}/{flowname}/{flowrunid}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Set<TimelineEntity> getFlowRunApps(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("userid") String userId,
|
||||
@PathParam("clusterid") String clusterId,
|
||||
@PathParam("flowid") String flowId,
|
||||
@PathParam("flowname") String flowName,
|
||||
@PathParam("flowrunid") String flowRunId,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@ -804,7 +804,7 @@ public Set<TimelineEntity> getFlowRunApps(
|
||||
@QueryParam("eventfilters") String eventfilters,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, clusterId, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
|
||||
flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
@ -817,13 +817,13 @@ public Set<TimelineEntity> getFlowRunApps(
|
||||
* reached, will be returned.
|
||||
*/
|
||||
@GET
|
||||
@Path("/flowapps/{userid}/{flowid}/")
|
||||
@Path("/flowapps/{userid}/{flowname}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Set<TimelineEntity> getFlowApps(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("userid") String userId,
|
||||
@PathParam("flowid") String flowId,
|
||||
@PathParam("flowname") String flowName,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@ -837,7 +837,7 @@ public Set<TimelineEntity> getFlowApps(
|
||||
@QueryParam("eventfilters") String eventfilters,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, null, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
|
||||
null, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
@ -849,14 +849,14 @@ public Set<TimelineEntity> getFlowApps(
|
||||
* is reached, will be returned.
|
||||
*/
|
||||
@GET
|
||||
@Path("/flowapps/{userid}/{clusterid}/{flowid}/")
|
||||
@Path("/flowapps/{userid}/{clusterid}/{flowname}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Set<TimelineEntity> getFlowApps(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("userid") String userId,
|
||||
@PathParam("clusterid") String clusterId,
|
||||
@PathParam("flowid") String flowId,
|
||||
@PathParam("flowname") String flowName,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@ -870,7 +870,7 @@ public Set<TimelineEntity> getFlowApps(
|
||||
@QueryParam("eventfilters") String eventfilters,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, clusterId, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
|
||||
null, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
|
@ -60,7 +60,7 @@ class ApplicationEntityReader extends GenericEntityReader {
|
||||
new ApplicationTable();
|
||||
|
||||
public ApplicationEntityReader(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
@ -68,7 +68,7 @@ public ApplicationEntityReader(String userId, String clusterId,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
|
||||
EnumSet<Field> fieldsToRetrieve) {
|
||||
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
|
||||
super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
|
||||
@ -76,10 +76,10 @@ public ApplicationEntityReader(String userId, String clusterId,
|
||||
}
|
||||
|
||||
public ApplicationEntityReader(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
String entityId, TimelineFilterList confsToRetrieve,
|
||||
TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
|
||||
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
|
||||
super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
|
||||
confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
|
||||
}
|
||||
|
||||
@ -173,7 +173,7 @@ protected FilterList constructFilterListBasedOnFields() {
|
||||
protected Result getResult(Configuration hbaseConf, Connection conn,
|
||||
FilterList filterList) throws IOException {
|
||||
byte[] rowKey =
|
||||
ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
|
||||
ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
|
||||
appId);
|
||||
Get get = new Get(rowKey);
|
||||
get.setMaxVersions(Integer.MAX_VALUE);
|
||||
@ -191,7 +191,7 @@ protected void validateParams() {
|
||||
Preconditions.checkNotNull(appId, "appId shouldn't be null");
|
||||
} else {
|
||||
Preconditions.checkNotNull(userId, "userId shouldn't be null");
|
||||
Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
|
||||
Preconditions.checkNotNull(flowName, "flowName shouldn't be null");
|
||||
}
|
||||
}
|
||||
|
||||
@ -199,10 +199,10 @@ protected void validateParams() {
|
||||
protected void augmentParams(Configuration hbaseConf, Connection conn)
|
||||
throws IOException {
|
||||
if (singleEntityRead) {
|
||||
if (flowId == null || flowRunId == null || userId == null) {
|
||||
if (flowName == null || flowRunId == null || userId == null) {
|
||||
FlowContext context =
|
||||
lookupFlowContext(clusterId, appId, hbaseConf, conn);
|
||||
flowId = context.flowId;
|
||||
flowName = context.flowName;
|
||||
flowRunId = context.flowRunId;
|
||||
userId = context.userId;
|
||||
}
|
||||
@ -244,10 +244,10 @@ protected ResultScanner getResults(Configuration hbaseConf,
|
||||
Scan scan = new Scan();
|
||||
if (flowRunId != null) {
|
||||
scan.setRowPrefixFilter(ApplicationRowKey.
|
||||
getRowKeyPrefix(clusterId, userId, flowId, flowRunId));
|
||||
getRowKeyPrefix(clusterId, userId, flowName, flowRunId));
|
||||
} else {
|
||||
scan.setRowPrefixFilter(ApplicationRowKey.
|
||||
getRowKeyPrefix(clusterId, userId, flowId));
|
||||
getRowKeyPrefix(clusterId, userId, flowName));
|
||||
}
|
||||
FilterList newList = new FilterList();
|
||||
newList.addFilter(new PageFilter(limit));
|
||||
|
@ -150,11 +150,11 @@ private static void fillFields(TimelineEntity finalEntity,
|
||||
}
|
||||
}
|
||||
|
||||
private String getFlowRunPath(String userId, String clusterId, String flowId,
|
||||
private String getFlowRunPath(String userId, String clusterId, String flowName,
|
||||
Long flowRunId, String appId)
|
||||
throws IOException {
|
||||
if (userId != null && flowId != null && flowRunId != null) {
|
||||
return userId + "/" + flowId + "/" + flowRunId;
|
||||
if (userId != null && flowName != null && flowRunId != null) {
|
||||
return userId + "/" + flowName + "/" + flowRunId;
|
||||
}
|
||||
if (clusterId == null || appId == null) {
|
||||
throw new IOException("Unable to get flow info");
|
||||
@ -387,11 +387,11 @@ public void serviceInit(Configuration conf) throws Exception {
|
||||
|
||||
@Override
|
||||
public TimelineEntity getEntity(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
String entityId, TimelineFilterList confsToRetrieve,
|
||||
TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve)
|
||||
throws IOException {
|
||||
String flowRunPath = getFlowRunPath(userId, clusterId, flowId,
|
||||
String flowRunPath = getFlowRunPath(userId, clusterId, flowName,
|
||||
flowRunId, appId);
|
||||
File dir = new File(new File(rootPath, ENTITIES_DIR),
|
||||
clusterId + "/" + flowRunPath + "/" + appId + "/" + entityType);
|
||||
@ -411,7 +411,7 @@ public TimelineEntity getEntity(String userId, String clusterId,
|
||||
|
||||
@Override
|
||||
public Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
@ -420,7 +420,7 @@ public Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
|
||||
EnumSet<Field> fieldsToRetrieve) throws IOException {
|
||||
String flowRunPath =
|
||||
getFlowRunPath(userId, clusterId, flowId, flowRunId, appId);
|
||||
getFlowRunPath(userId, clusterId, flowName, flowRunId, appId);
|
||||
File dir =
|
||||
new File(new File(rootPath, ENTITIES_DIR),
|
||||
clusterId + "/" + flowRunPath + "/" + appId + "/" + entityType);
|
||||
|
@ -49,23 +49,23 @@ class FlowActivityEntityReader extends TimelineEntityReader {
|
||||
new FlowActivityTable();
|
||||
|
||||
public FlowActivityEntityReader(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
EnumSet<Field> fieldsToRetrieve) {
|
||||
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
|
||||
super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, null, null, fieldsToRetrieve, true);
|
||||
}
|
||||
|
||||
public FlowActivityEntityReader(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
String entityId, EnumSet<Field> fieldsToRetrieve) {
|
||||
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
|
||||
super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
|
||||
null, null, fieldsToRetrieve);
|
||||
}
|
||||
|
||||
@ -135,7 +135,7 @@ protected TimelineEntity parseEntity(Result result) throws IOException {
|
||||
|
||||
long time = rowKey.getDayTimestamp();
|
||||
String user = rowKey.getUserId();
|
||||
String flowName = rowKey.getFlowId();
|
||||
String flowName = rowKey.getFlowName();
|
||||
|
||||
FlowActivityEntity flowActivity =
|
||||
new FlowActivityEntity(clusterId, time, user, flowName);
|
||||
|
@ -58,7 +58,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
|
||||
private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable();
|
||||
|
||||
public FlowRunEntityReader(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
@ -66,17 +66,17 @@ public FlowRunEntityReader(String userId, String clusterId,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
|
||||
EnumSet<Field> fieldsToRetrieve) {
|
||||
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
|
||||
super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true);
|
||||
}
|
||||
|
||||
public FlowRunEntityReader(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
String entityId, TimelineFilterList confsToRetrieve,
|
||||
TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
|
||||
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
|
||||
super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
|
||||
null, metricsToRetrieve, fieldsToRetrieve);
|
||||
}
|
||||
|
||||
@ -92,7 +92,7 @@ protected BaseTable<?> getTable() {
|
||||
protected void validateParams() {
|
||||
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
|
||||
Preconditions.checkNotNull(userId, "userId shouldn't be null");
|
||||
Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
|
||||
Preconditions.checkNotNull(flowName, "flowName shouldn't be null");
|
||||
if (singleEntityRead) {
|
||||
Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null");
|
||||
}
|
||||
@ -155,7 +155,7 @@ protected FilterList constructFilterListBasedOnFields() {
|
||||
protected Result getResult(Configuration hbaseConf, Connection conn,
|
||||
FilterList filterList) throws IOException {
|
||||
byte[] rowKey =
|
||||
FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId);
|
||||
FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId);
|
||||
Get get = new Get(rowKey);
|
||||
get.setMaxVersions(Integer.MAX_VALUE);
|
||||
if (filterList != null && !filterList.getFilters().isEmpty()) {
|
||||
@ -169,7 +169,7 @@ protected ResultScanner getResults(Configuration hbaseConf,
|
||||
Connection conn, FilterList filterList) throws IOException {
|
||||
Scan scan = new Scan();
|
||||
scan.setRowPrefixFilter(
|
||||
FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowId));
|
||||
FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowName));
|
||||
FilterList newList = new FilterList();
|
||||
newList.addFilter(new PageFilter(limit));
|
||||
if (filterList != null && !filterList.getFilters().isEmpty()) {
|
||||
@ -183,7 +183,7 @@ protected ResultScanner getResults(Configuration hbaseConf,
|
||||
protected TimelineEntity parseEntity(Result result) throws IOException {
|
||||
FlowRunEntity flowRun = new FlowRunEntity();
|
||||
flowRun.setUser(userId);
|
||||
flowRun.setName(flowId);
|
||||
flowRun.setName(flowName);
|
||||
if (singleEntityRead) {
|
||||
flowRun.setRunId(flowRunId);
|
||||
} else {
|
||||
|
@ -76,7 +76,7 @@ class GenericEntityReader extends TimelineEntityReader {
|
||||
private final AppToFlowTable appToFlowTable = new AppToFlowTable();
|
||||
|
||||
public GenericEntityReader(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
@ -84,7 +84,7 @@ public GenericEntityReader(String userId, String clusterId,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
|
||||
EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
|
||||
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
|
||||
super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
|
||||
@ -92,10 +92,10 @@ public GenericEntityReader(String userId, String clusterId,
|
||||
}
|
||||
|
||||
public GenericEntityReader(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
String entityId, TimelineFilterList confsToRetrieve,
|
||||
TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
|
||||
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
|
||||
super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
|
||||
confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
|
||||
}
|
||||
|
||||
@ -204,11 +204,11 @@ protected FlowContext lookupFlowContext(String clusterId, String appId,
|
||||
|
||||
protected static class FlowContext {
|
||||
protected final String userId;
|
||||
protected final String flowId;
|
||||
protected final String flowName;
|
||||
protected final Long flowRunId;
|
||||
public FlowContext(String user, String flowId, Long flowRunId) {
|
||||
public FlowContext(String user, String flowName, Long flowRunId) {
|
||||
this.userId = user;
|
||||
this.flowId = flowId;
|
||||
this.flowName = flowName;
|
||||
this.flowRunId = flowRunId;
|
||||
}
|
||||
}
|
||||
@ -227,10 +227,10 @@ protected void validateParams() {
|
||||
protected void augmentParams(Configuration hbaseConf, Connection conn)
|
||||
throws IOException {
|
||||
// In reality all three should be null or neither should be null
|
||||
if (flowId == null || flowRunId == null || userId == null) {
|
||||
if (flowName == null || flowRunId == null || userId == null) {
|
||||
FlowContext context =
|
||||
lookupFlowContext(clusterId, appId, hbaseConf, conn);
|
||||
flowId = context.flowId;
|
||||
flowName = context.flowName;
|
||||
flowRunId = context.flowRunId;
|
||||
userId = context.userId;
|
||||
}
|
||||
@ -269,7 +269,7 @@ protected void augmentParams(Configuration hbaseConf, Connection conn)
|
||||
protected Result getResult(Configuration hbaseConf, Connection conn,
|
||||
FilterList filterList) throws IOException {
|
||||
byte[] rowKey =
|
||||
EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
|
||||
EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
|
||||
entityType, entityId);
|
||||
Get get = new Get(rowKey);
|
||||
get.setMaxVersions(Integer.MAX_VALUE);
|
||||
@ -286,7 +286,7 @@ protected ResultScanner getResults(Configuration hbaseConf,
|
||||
// and one type
|
||||
Scan scan = new Scan();
|
||||
scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
|
||||
clusterId, userId, flowId, flowRunId, appId, entityType));
|
||||
clusterId, userId, flowName, flowRunId, appId, entityType));
|
||||
scan.setMaxVersions(Integer.MAX_VALUE);
|
||||
if (filterList != null && !filterList.getFilters().isEmpty()) {
|
||||
scan.setFilter(filterList);
|
||||
|
@ -64,20 +64,20 @@ protected void serviceStop() throws Exception {
|
||||
|
||||
@Override
|
||||
public TimelineEntity getEntity(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
String entityId, TimelineFilterList confsToRetrieve,
|
||||
TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve)
|
||||
throws IOException {
|
||||
TimelineEntityReader reader =
|
||||
TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId,
|
||||
flowId, flowRunId, appId, entityType, entityId, confsToRetrieve,
|
||||
flowName, flowRunId, appId, entityType, entityId, confsToRetrieve,
|
||||
metricsToRetrieve, fieldsToRetrieve);
|
||||
return reader.readEntity(hbaseConf, conn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
@ -87,7 +87,7 @@ public Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
EnumSet<Field> fieldsToRetrieve) throws IOException {
|
||||
TimelineEntityReader reader =
|
||||
TimelineEntityReaderFactory.createMultipleEntitiesReader(userId,
|
||||
clusterId, flowId, flowRunId, appId, entityType, limit,
|
||||
clusterId, flowName, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
|
||||
metricFilters, eventFilters, confsToRetrieve, metricsToRetrieve,
|
||||
|
@ -53,7 +53,7 @@ abstract class TimelineEntityReader {
|
||||
|
||||
protected String userId;
|
||||
protected String clusterId;
|
||||
protected String flowId;
|
||||
protected String flowName;
|
||||
protected Long flowRunId;
|
||||
protected String appId;
|
||||
protected String entityType;
|
||||
@ -92,7 +92,7 @@ abstract class TimelineEntityReader {
|
||||
* Instantiates a reader for multiple-entity reads.
|
||||
*/
|
||||
protected TimelineEntityReader(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
@ -104,7 +104,7 @@ protected TimelineEntityReader(String userId, String clusterId,
|
||||
this.sortedKeys = sortedKeys;
|
||||
this.userId = userId;
|
||||
this.clusterId = clusterId;
|
||||
this.flowId = flowId;
|
||||
this.flowName = flowName;
|
||||
this.flowRunId = flowRunId;
|
||||
this.appId = appId;
|
||||
this.entityType = entityType;
|
||||
@ -130,13 +130,13 @@ protected TimelineEntityReader(String userId, String clusterId,
|
||||
* Instantiates a reader for single-entity reads.
|
||||
*/
|
||||
protected TimelineEntityReader(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
String entityId, TimelineFilterList confsToRetrieve,
|
||||
TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
|
||||
this.singleEntityRead = true;
|
||||
this.userId = userId;
|
||||
this.clusterId = clusterId;
|
||||
this.flowId = flowId;
|
||||
this.flowName = flowName;
|
||||
this.flowRunId = flowRunId;
|
||||
this.appId = appId;
|
||||
this.entityType = entityType;
|
||||
|
@ -34,23 +34,23 @@ class TimelineEntityReaderFactory {
|
||||
* the specified input.
|
||||
*/
|
||||
public static TimelineEntityReader createSingleEntityReader(String userId,
|
||||
String clusterId, String flowId, Long flowRunId, String appId,
|
||||
String clusterId, String flowName, Long flowRunId, String appId,
|
||||
String entityType, String entityId, TimelineFilterList confs,
|
||||
TimelineFilterList metrics, EnumSet<Field> fieldsToRetrieve) {
|
||||
// currently the types that are handled separate from the generic entity
|
||||
// table are application, flow run, and flow activity entities
|
||||
if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
|
||||
return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
|
||||
return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId,
|
||||
appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
|
||||
} else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
|
||||
return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
|
||||
return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId,
|
||||
appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
|
||||
} else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
|
||||
return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
|
||||
return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId,
|
||||
appId, entityType, entityId, fieldsToRetrieve);
|
||||
} else {
|
||||
// assume we're dealing with a generic entity read
|
||||
return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
|
||||
return new GenericEntityReader(userId, clusterId, flowName, flowRunId,
|
||||
appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
|
||||
}
|
||||
}
|
||||
@ -60,7 +60,7 @@ public static TimelineEntityReader createSingleEntityReader(String userId,
|
||||
* the specified input and predicates.
|
||||
*/
|
||||
public static TimelineEntityReader createMultipleEntitiesReader(String userId,
|
||||
String clusterId, String flowId, Long flowRunId, String appId,
|
||||
String clusterId, String flowName, Long flowRunId, String appId,
|
||||
String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
@ -71,26 +71,26 @@ public static TimelineEntityReader createMultipleEntitiesReader(String userId,
|
||||
// currently the types that are handled separate from the generic entity
|
||||
// table are application, flow run, and flow activity entities
|
||||
if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
|
||||
return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
|
||||
return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId,
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
|
||||
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
|
||||
infoFilters, configFilters, metricFilters, eventFilters, confs,
|
||||
metrics, fieldsToRetrieve);
|
||||
} else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
|
||||
return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
|
||||
return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId,
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
|
||||
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
|
||||
infoFilters, configFilters, metricFilters, eventFilters,
|
||||
fieldsToRetrieve);
|
||||
} else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
|
||||
return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
|
||||
return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId,
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
|
||||
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
|
||||
infoFilters, configFilters, metricFilters, eventFilters, confs,
|
||||
metrics, fieldsToRetrieve);
|
||||
} else {
|
||||
// assume we're dealing with a generic entity read
|
||||
return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
|
||||
return new GenericEntityReader(userId, clusterId, flowName, flowRunId,
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
|
||||
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
|
||||
infoFilters, configFilters, metricFilters, eventFilters, confs,
|
||||
|
@ -62,7 +62,7 @@ public enum Field {
|
||||
* Context user Id(optional).
|
||||
* @param clusterId
|
||||
* Context cluster Id(mandatory).
|
||||
* @param flowId
|
||||
* @param flowName
|
||||
* Context flow Id (optional).
|
||||
* @param flowRunId
|
||||
* Context flow run Id (optional).
|
||||
@ -93,7 +93,7 @@ public enum Field {
|
||||
* contain the metadata plus the given fields to retrieve.
|
||||
* @throws IOException
|
||||
*/
|
||||
TimelineEntity getEntity(String userId, String clusterId, String flowId,
|
||||
TimelineEntity getEntity(String userId, String clusterId, String flowName,
|
||||
Long flowRunId, String appId, String entityType, String entityId,
|
||||
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
|
||||
EnumSet<Field> fieldsToRetrieve) throws IOException;
|
||||
@ -113,7 +113,7 @@ TimelineEntity getEntity(String userId, String clusterId, String flowId,
|
||||
* Context user Id(optional).
|
||||
* @param clusterId
|
||||
* Context cluster Id(mandatory).
|
||||
* @param flowId
|
||||
* @param flowName
|
||||
* Context flow Id (optional).
|
||||
* @param flowRunId
|
||||
* Context flow run Id (optional).
|
||||
@ -183,7 +183,7 @@ TimelineEntity getEntity(String userId, String clusterId, String flowId,
|
||||
* @throws IOException
|
||||
*/
|
||||
Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
|
@ -27,15 +27,15 @@
|
||||
public class ApplicationRowKey {
|
||||
private final String clusterId;
|
||||
private final String userId;
|
||||
private final String flowId;
|
||||
private final String flowName;
|
||||
private final long flowRunId;
|
||||
private final String appId;
|
||||
|
||||
public ApplicationRowKey(String clusterId, String userId, String flowId,
|
||||
public ApplicationRowKey(String clusterId, String userId, String flowName,
|
||||
long flowRunId, String appId) {
|
||||
this.clusterId = clusterId;
|
||||
this.userId = userId;
|
||||
this.flowId = flowId;
|
||||
this.flowName = flowName;
|
||||
this.flowRunId = flowRunId;
|
||||
this.appId = appId;
|
||||
}
|
||||
@ -48,8 +48,8 @@ public String getUserId() {
|
||||
return userId;
|
||||
}
|
||||
|
||||
public String getFlowId() {
|
||||
return flowId;
|
||||
public String getFlowName() {
|
||||
return flowName;
|
||||
}
|
||||
|
||||
public long getFlowRunId() {
|
||||
@ -62,54 +62,54 @@ public String getAppId() {
|
||||
|
||||
/**
|
||||
* Constructs a row key prefix for the application table as follows:
|
||||
* {@code clusterId!userName!flowId!}
|
||||
* {@code clusterId!userName!flowName!}
|
||||
*
|
||||
* @param clusterId
|
||||
* @param userId
|
||||
* @param flowId
|
||||
* @param flowName
|
||||
* @return byte array with the row key prefix
|
||||
*/
|
||||
public static byte[] getRowKeyPrefix(String clusterId, String userId,
|
||||
String flowId) {
|
||||
String flowName) {
|
||||
byte[] first = Bytes.toBytes(
|
||||
Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
|
||||
Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowName));
|
||||
return Separator.QUALIFIERS.join(first, new byte[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a row key prefix for the application table as follows:
|
||||
* {@code clusterId!userName!flowId!flowRunId!}
|
||||
* {@code clusterId!userName!flowName!flowRunId!}
|
||||
*
|
||||
* @param clusterId
|
||||
* @param userId
|
||||
* @param flowId
|
||||
* @param flowName
|
||||
* @param flowRunId
|
||||
* @return byte array with the row key prefix
|
||||
*/
|
||||
public static byte[] getRowKeyPrefix(String clusterId, String userId,
|
||||
String flowId, Long flowRunId) {
|
||||
String flowName, Long flowRunId) {
|
||||
byte[] first = Bytes.toBytes(
|
||||
Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
|
||||
Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowName));
|
||||
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
|
||||
return Separator.QUALIFIERS.join(first, second, new byte[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a row key for the application table as follows:
|
||||
* {@code clusterId!userName!flowId!flowRunId!AppId}
|
||||
* {@code clusterId!userName!flowName!flowRunId!AppId}
|
||||
*
|
||||
* @param clusterId
|
||||
* @param userId
|
||||
* @param flowId
|
||||
* @param flowName
|
||||
* @param flowRunId
|
||||
* @param appId
|
||||
* @return byte array with the row key
|
||||
*/
|
||||
public static byte[] getRowKey(String clusterId, String userId,
|
||||
String flowId, Long flowRunId, String appId) {
|
||||
String flowName, Long flowRunId, String appId) {
|
||||
byte[] first =
|
||||
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId,
|
||||
flowId));
|
||||
flowName));
|
||||
// Note that flowRunId is a long, so we can't encode them all at the same
|
||||
// time.
|
||||
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
|
||||
@ -132,11 +132,11 @@ public static ApplicationRowKey parseRowKey(byte[] rowKey) {
|
||||
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
|
||||
String userId =
|
||||
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
|
||||
String flowId =
|
||||
String flowName =
|
||||
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
|
||||
long flowRunId =
|
||||
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
|
||||
String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
|
||||
return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId);
|
||||
return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId);
|
||||
}
|
||||
}
|
||||
|
@ -47,7 +47,7 @@
|
||||
* |-------------------------------------------------------------------------|
|
||||
* | clusterId! | id:appId | metricId1: | configKey1: |
|
||||
* | userName! | | metricValue1 | configValue1 |
|
||||
* | flowId! | created_time: | @timestamp1 | |
|
||||
* | flowName! | created_time: | @timestamp1 | |
|
||||
* | flowRunId! | 1392993084018 | | configKey2: |
|
||||
* | AppId | | metriciD1: | configValue2 |
|
||||
* | | modified_time: | metricValue2 | |
|
||||
|
@ -26,7 +26,7 @@
|
||||
*/
|
||||
public enum AppToFlowColumnFamily implements ColumnFamily<AppToFlowTable> {
|
||||
/**
|
||||
* Mapping column family houses known columns such as flowId and flowRunId
|
||||
* Mapping column family houses known columns such as flowName and flowRunId
|
||||
*/
|
||||
MAPPING("m");
|
||||
|
||||
|
@ -34,7 +34,7 @@
|
||||
|
||||
/**
|
||||
* The app_flow table as column families mapping. Mapping stores
|
||||
* appId to flowId and flowRunId mapping information
|
||||
* appId to flowName and flowRunId mapping information
|
||||
*
|
||||
* Example app_flow table record:
|
||||
*
|
||||
@ -43,7 +43,7 @@
|
||||
* | Row | Column Family |
|
||||
* | key | info |
|
||||
* |--------------------------------------|
|
||||
* | clusterId! | flowId: |
|
||||
* | clusterId! | flowName: |
|
||||
* | AppId | foo@daily_hive_report |
|
||||
* | | |
|
||||
* | | flowRunId: |
|
||||
|
@ -27,17 +27,17 @@
|
||||
public class EntityRowKey {
|
||||
private final String clusterId;
|
||||
private final String userId;
|
||||
private final String flowId;
|
||||
private final String flowName;
|
||||
private final long flowRunId;
|
||||
private final String appId;
|
||||
private final String entityType;
|
||||
private final String entityId;
|
||||
|
||||
public EntityRowKey(String clusterId, String userId, String flowId,
|
||||
public EntityRowKey(String clusterId, String userId, String flowName,
|
||||
long flowRunId, String appId, String entityType, String entityId) {
|
||||
this.clusterId = clusterId;
|
||||
this.userId = userId;
|
||||
this.flowId = flowId;
|
||||
this.flowName = flowName;
|
||||
this.flowRunId = flowRunId;
|
||||
this.appId = appId;
|
||||
this.entityType = entityType;
|
||||
@ -52,8 +52,8 @@ public String getUserId() {
|
||||
return userId;
|
||||
}
|
||||
|
||||
public String getFlowId() {
|
||||
return flowId;
|
||||
public String getFlowName() {
|
||||
return flowName;
|
||||
}
|
||||
|
||||
public long getFlowRunId() {
|
||||
@ -74,20 +74,20 @@ public String getEntityId() {
|
||||
|
||||
/**
|
||||
* Constructs a row key prefix for the entity table as follows:
|
||||
* {@code userName!clusterId!flowId!flowRunId!AppId}
|
||||
* {@code userName!clusterId!flowName!flowRunId!AppId}
|
||||
*
|
||||
* @param clusterId
|
||||
* @param userId
|
||||
* @param flowId
|
||||
* @param flowName
|
||||
* @param flowRunId
|
||||
* @param appId
|
||||
* @return byte array with the row key prefix
|
||||
*/
|
||||
public static byte[] getRowKeyPrefix(String clusterId, String userId,
|
||||
String flowId, Long flowRunId, String appId) {
|
||||
String flowName, Long flowRunId, String appId) {
|
||||
byte[] first =
|
||||
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
|
||||
flowId));
|
||||
flowName));
|
||||
// Note that flowRunId is a long, so we can't encode them all at the same
|
||||
// time.
|
||||
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
|
||||
@ -97,21 +97,21 @@ public static byte[] getRowKeyPrefix(String clusterId, String userId,
|
||||
|
||||
/**
|
||||
* Constructs a row key prefix for the entity table as follows:
|
||||
* {@code userName!clusterId!flowId!flowRunId!AppId!entityType!}
|
||||
* {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}
|
||||
*
|
||||
* @param clusterId
|
||||
* @param userId
|
||||
* @param flowId
|
||||
* @param flowName
|
||||
* @param flowRunId
|
||||
* @param appId
|
||||
* @param entityType
|
||||
* @return byte array with the row key prefix
|
||||
*/
|
||||
public static byte[] getRowKeyPrefix(String clusterId, String userId,
|
||||
String flowId, Long flowRunId, String appId, String entityType) {
|
||||
String flowName, Long flowRunId, String appId, String entityType) {
|
||||
byte[] first =
|
||||
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
|
||||
flowId));
|
||||
flowName));
|
||||
// Note that flowRunId is a long, so we can't encode them all at the same
|
||||
// time.
|
||||
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
|
||||
@ -123,11 +123,11 @@ public static byte[] getRowKeyPrefix(String clusterId, String userId,
|
||||
|
||||
/**
|
||||
* Constructs a row key for the entity table as follows:
|
||||
* {@code userName!clusterId!flowId!flowRunId!AppId!entityType!entityId}
|
||||
* {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}
|
||||
*
|
||||
* @param clusterId
|
||||
* @param userId
|
||||
* @param flowId
|
||||
* @param flowName
|
||||
* @param flowRunId
|
||||
* @param appId
|
||||
* @param entityType
|
||||
@ -135,11 +135,11 @@ public static byte[] getRowKeyPrefix(String clusterId, String userId,
|
||||
* @return byte array with the row key
|
||||
*/
|
||||
public static byte[] getRowKey(String clusterId, String userId,
|
||||
String flowId, Long flowRunId, String appId, String entityType,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
String entityId) {
|
||||
byte[] first =
|
||||
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
|
||||
flowId));
|
||||
flowName));
|
||||
// Note that flowRunId is a long, so we can't encode them all at the same
|
||||
// time.
|
||||
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
|
||||
@ -164,7 +164,7 @@ public static EntityRowKey parseRowKey(byte[] rowKey) {
|
||||
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
|
||||
String clusterId =
|
||||
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
|
||||
String flowId =
|
||||
String flowName =
|
||||
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
|
||||
long flowRunId =
|
||||
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
|
||||
@ -173,7 +173,7 @@ public static EntityRowKey parseRowKey(byte[] rowKey) {
|
||||
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
|
||||
String entityId =
|
||||
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[6]));
|
||||
return new EntityRowKey(clusterId, userId, flowId, flowRunId, appId,
|
||||
return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
|
||||
entityType, entityId);
|
||||
}
|
||||
}
|
||||
|
@ -46,7 +46,7 @@
|
||||
* |-------------------------------------------------------------------------|
|
||||
* | userName! | id:entityId | metricId1: | configKey1: |
|
||||
* | clusterId! | | metricValue1 | configValue1 |
|
||||
* | flowId! | type:entityType | @timestamp1 | |
|
||||
* | flowName! | type:entityType | @timestamp1 | |
|
||||
* | flowRunId! | | | configKey2: |
|
||||
* | AppId! | created_time: | metriciD1: | configValue2 |
|
||||
* | entityType!| 1392993084018 | metricValue2 | |
|
||||
|
@ -29,14 +29,14 @@ public class FlowActivityRowKey {
|
||||
private final String clusterId;
|
||||
private final long dayTs;
|
||||
private final String userId;
|
||||
private final String flowId;
|
||||
private final String flowName;
|
||||
|
||||
public FlowActivityRowKey(String clusterId, long dayTs, String userId,
|
||||
String flowId) {
|
||||
String flowName) {
|
||||
this.clusterId = clusterId;
|
||||
this.dayTs = dayTs;
|
||||
this.userId = userId;
|
||||
this.flowId = flowId;
|
||||
this.flowName = flowName;
|
||||
}
|
||||
|
||||
public String getClusterId() {
|
||||
@ -51,8 +51,8 @@ public String getUserId() {
|
||||
return userId;
|
||||
}
|
||||
|
||||
public String getFlowId() {
|
||||
return flowId;
|
||||
public String getFlowName() {
|
||||
return flowName;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -82,38 +82,38 @@ public static byte[] getRowKeyPrefix(String clusterId, long dayTs) {
|
||||
|
||||
/**
|
||||
* Constructs a row key for the flow activity table as follows:
|
||||
* {@code clusterId!dayTimestamp!user!flowId}
|
||||
* {@code clusterId!dayTimestamp!user!flowName}
|
||||
*
|
||||
* Will insert into current day's record in the table
|
||||
* @param clusterId
|
||||
* @param userId
|
||||
* @param flowId
|
||||
* @param flowName
|
||||
* @return byte array with the row key prefix
|
||||
*/
|
||||
public static byte[] getRowKey(String clusterId, String userId,
|
||||
String flowId) {
|
||||
String flowName) {
|
||||
long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
|
||||
.currentTimeMillis());
|
||||
return getRowKey(clusterId, dayTs, userId, flowId);
|
||||
return getRowKey(clusterId, dayTs, userId, flowName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a row key for the flow activity table as follows:
|
||||
* {@code clusterId!dayTimestamp!user!flowId}
|
||||
* {@code clusterId!dayTimestamp!user!flowName}
|
||||
*
|
||||
* @param clusterId
|
||||
* @param dayTs
|
||||
* @param userId
|
||||
* @param flowId
|
||||
* @param flowName
|
||||
* @return byte array for the row key
|
||||
*/
|
||||
public static byte[] getRowKey(String clusterId, long dayTs, String userId,
|
||||
String flowId) {
|
||||
String flowName) {
|
||||
return Separator.QUALIFIERS.join(
|
||||
Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
|
||||
Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)),
|
||||
Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
|
||||
Bytes.toBytes(Separator.QUALIFIERS.encode(flowId)));
|
||||
Bytes.toBytes(Separator.QUALIFIERS.encode(flowName)));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -133,8 +133,8 @@ public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
|
||||
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
|
||||
String userId = Separator.QUALIFIERS.decode(Bytes
|
||||
.toString(rowKeyComponents[2]));
|
||||
String flowId = Separator.QUALIFIERS.decode(Bytes
|
||||
String flowName = Separator.QUALIFIERS.decode(Bytes
|
||||
.toString(rowKeyComponents[3]));
|
||||
return new FlowActivityRowKey(clusterId, dayTs, userId, flowId);
|
||||
return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
|
||||
}
|
||||
}
|
||||
|
@ -47,7 +47,7 @@
|
||||
* | inv Top of | |
|
||||
* | Day! | r!runid2:version7 |
|
||||
* | userName! | |
|
||||
* | flowId | |
|
||||
* | flowName | |
|
||||
* |-------------------------------------------|
|
||||
* </pre>
|
||||
*/
|
||||
|
@ -27,14 +27,14 @@
|
||||
public class FlowRunRowKey {
|
||||
private final String clusterId;
|
||||
private final String userId;
|
||||
private final String flowId;
|
||||
private final String flowName;
|
||||
private final long flowRunId;
|
||||
|
||||
public FlowRunRowKey(String clusterId, String userId, String flowId,
|
||||
public FlowRunRowKey(String clusterId, String userId, String flowName,
|
||||
long flowRunId) {
|
||||
this.clusterId = clusterId;
|
||||
this.userId = userId;
|
||||
this.flowId = flowId;
|
||||
this.flowName = flowName;
|
||||
this.flowRunId = flowRunId;
|
||||
}
|
||||
|
||||
@ -46,8 +46,8 @@ public String getUserId() {
|
||||
return userId;
|
||||
}
|
||||
|
||||
public String getFlowId() {
|
||||
return flowId;
|
||||
public String getFlowName() {
|
||||
return flowName;
|
||||
}
|
||||
|
||||
public long getFlowRunId() {
|
||||
@ -56,33 +56,33 @@ public long getFlowRunId() {
|
||||
|
||||
/**
|
||||
* Constructs a row key prefix for the flow run table as follows: {
|
||||
* clusterId!userI!flowId!}
|
||||
* clusterId!userI!flowName!}
|
||||
*
|
||||
* @param clusterId
|
||||
* @param userId
|
||||
* @param flowId
|
||||
* @param flowName
|
||||
* @return byte array with the row key prefix
|
||||
*/
|
||||
public static byte[] getRowKeyPrefix(String clusterId, String userId,
|
||||
String flowId) {
|
||||
String flowName) {
|
||||
return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId,
|
||||
flowId, ""));
|
||||
flowName, ""));
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a row key for the entity table as follows: {
|
||||
* clusterId!userI!flowId!Inverted Flow Run Id}
|
||||
* clusterId!userI!flowName!Inverted Flow Run Id}
|
||||
*
|
||||
* @param clusterId
|
||||
* @param userId
|
||||
* @param flowId
|
||||
* @param flowName
|
||||
* @param flowRunId
|
||||
* @return byte array with the row key
|
||||
*/
|
||||
public static byte[] getRowKey(String clusterId, String userId,
|
||||
String flowId, Long flowRunId) {
|
||||
String flowName, Long flowRunId) {
|
||||
byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId,
|
||||
userId, flowId));
|
||||
userId, flowName));
|
||||
// Note that flowRunId is a long, so we can't encode them all at the same
|
||||
// time.
|
||||
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
|
||||
@ -104,10 +104,10 @@ public static FlowRunRowKey parseRowKey(byte[] rowKey) {
|
||||
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
|
||||
String userId =
|
||||
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
|
||||
String flowId =
|
||||
String flowName =
|
||||
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
|
||||
long flowRunId =
|
||||
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
|
||||
return new FlowRunRowKey(clusterId, userId, flowId, flowRunId);
|
||||
return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
|
||||
}
|
||||
}
|
||||
|
@ -47,7 +47,7 @@
|
||||
* |-------------------------------------------|
|
||||
* | clusterId! | flow_version:version7 |
|
||||
* | userName! | |
|
||||
* | flowId! | running_apps:1 |
|
||||
* | flowName! | running_apps:1 |
|
||||
* | flowRunId | |
|
||||
* | | min_start_time:1392995080000 |
|
||||
* | | #0:"" |
|
||||
|
@ -190,7 +190,7 @@ public void testGetEntityWithUserAndFlowInfo() throws Exception {
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entity/cluster1/app1/app/id_1?userid=user1&" +
|
||||
"flowid=flow1&flowrunid=1");
|
||||
"flowname=flow1&flowrunid=1");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -595,7 +595,7 @@ public void testGetApp() throws Exception {
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app/cluster1/application_1111111111_1111?" +
|
||||
"userid=user1&fields=ALL&flowid=flow_name&flowrunid=1002345678919");
|
||||
"userid=user1&fields=ALL&flowname=flow_name&flowrunid=1002345678919");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(entity);
|
||||
@ -613,7 +613,7 @@ public void testGetApp() throws Exception {
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app/application_1111111111_2222?userid=user1" +
|
||||
"&fields=metrics&flowid=flow_name&flowrunid=1002345678919");
|
||||
"&fields=metrics&flowname=flow_name&flowrunid=1002345678919");
|
||||
resp = getResponse(client, uri);
|
||||
entity = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(entity);
|
||||
|
@ -841,7 +841,7 @@ private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
|
||||
|
||||
assertEquals(user, key.getUserId());
|
||||
assertEquals(cluster, key.getClusterId());
|
||||
assertEquals(flow, key.getFlowId());
|
||||
assertEquals(flow, key.getFlowName());
|
||||
assertEquals(runid, key.getFlowRunId());
|
||||
assertEquals(appName, key.getAppId());
|
||||
assertEquals(te.getType(), key.getEntityType());
|
||||
@ -856,7 +856,7 @@ private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
|
||||
|
||||
assertEquals(cluster, key.getClusterId());
|
||||
assertEquals(user, key.getUserId());
|
||||
assertEquals(flow, key.getFlowId());
|
||||
assertEquals(flow, key.getFlowName());
|
||||
assertEquals(runid, key.getFlowRunId());
|
||||
assertEquals(appName, key.getAppId());
|
||||
return true;
|
||||
|
@ -165,7 +165,7 @@ public void testWriteFlowRunMinMax() throws Exception {
|
||||
assertNotNull(flowActivityRowKey);
|
||||
assertEquals(cluster, flowActivityRowKey.getClusterId());
|
||||
assertEquals(user, flowActivityRowKey.getUserId());
|
||||
assertEquals(flow, flowActivityRowKey.getFlowId());
|
||||
assertEquals(flow, flowActivityRowKey.getFlowName());
|
||||
long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
|
||||
.currentTimeMillis());
|
||||
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
|
||||
@ -280,7 +280,7 @@ private void checkFlowActivityTable(String cluster, String user, String flow,
|
||||
assertNotNull(flowActivityRowKey);
|
||||
assertEquals(cluster, flowActivityRowKey.getClusterId());
|
||||
assertEquals(user, flowActivityRowKey.getUserId());
|
||||
assertEquals(flow, flowActivityRowKey.getFlowId());
|
||||
assertEquals(flow, flowActivityRowKey.getFlowName());
|
||||
long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
|
||||
.currentTimeMillis());
|
||||
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
|
||||
@ -409,7 +409,7 @@ private void checkFlowActivityTableSeveralRuns(String cluster, String user,
|
||||
assertNotNull(flowActivityRowKey);
|
||||
assertEquals(cluster, flowActivityRowKey.getClusterId());
|
||||
assertEquals(user, flowActivityRowKey.getUserId());
|
||||
assertEquals(flow, flowActivityRowKey.getFlowId());
|
||||
assertEquals(flow, flowActivityRowKey.getFlowName());
|
||||
long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
|
||||
.currentTimeMillis());
|
||||
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
|
||||
|
Loading…
Reference in New Issue
Block a user