YARN-4409. Fix javadoc and checkstyle issues in timelineservice code (Varun Saxena via sjlee)

This commit is contained in:
Sangjin Lee 2016-02-08 12:17:43 -08:00
parent 9cb1287e9b
commit 960af7d471
117 changed files with 1418 additions and 819 deletions

View File

@ -147,7 +147,8 @@ public class JobHistoryEventHandler extends AbstractService
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
private static String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE = "MAPREDUCE_TASK_ATTEMPT";
private static final String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE =
"MAPREDUCE_TASK_ATTEMPT";
public JobHistoryEventHandler(AppContext context, int startCount) {
super("JobHistoryEventHandler");
@ -479,8 +480,9 @@ private void shutdownAndAwaitTermination() {
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
LOG.error("ThreadPool did not terminate");
}
}
} catch (InterruptedException ie) {
threadPool.shutdownNow();
@ -1073,7 +1075,7 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
default:
break;
}
try {
TimelinePutResponse response = timelineClient.putEntities(tEntity);
List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
@ -1116,12 +1118,12 @@ public JsonNode countersToJSON(Counters counters) {
return nodes;
}
private void putEntityWithoutBlocking(final TimelineClient timelineClient,
private void putEntityWithoutBlocking(final TimelineClient client,
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) {
Runnable publishWrapper = new Runnable() {
public void run() {
try {
timelineClient.putEntities(entity);
client.putEntities(entity);
} catch (IOException|YarnException e) {
LOG.error("putEntityNonBlocking get failed: " + e);
throw new RuntimeException(e.toString());
@ -1187,87 +1189,92 @@ public void run() {
entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
return entity;
}
private void processEventForNewTimelineService(HistoryEvent event, JobId jobId,
long timestamp) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity = null;
private void processEventForNewTimelineService(HistoryEvent event,
JobId jobId, long timestamp) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity =
null;
String taskId = null;
String taskAttemptId = null;
boolean setCreatedTime = false;
switch (event.getEventType()) {
// Handle job events
case JOB_SUBMITTED:
setCreatedTime = true;
break;
case JOB_STATUS_CHANGED:
case JOB_INFO_CHANGED:
case JOB_INITED:
case JOB_PRIORITY_CHANGED:
case JOB_QUEUE_CHANGED:
case JOB_FAILED:
case JOB_KILLED:
case JOB_ERROR:
case JOB_FINISHED:
case AM_STARTED:
case NORMALIZED_RESOURCE:
break;
// Handle task events
case TASK_STARTED:
setCreatedTime = true;
taskId = ((TaskStartedEvent)event).getTaskId().toString();
break;
case TASK_FAILED:
taskId = ((TaskFailedEvent)event).getTaskId().toString();
break;
case TASK_UPDATED:
taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
break;
case TASK_FINISHED:
taskId = ((TaskFinishedEvent)event).getTaskId().toString();
break;
case MAP_ATTEMPT_STARTED:
case REDUCE_ATTEMPT_STARTED:
setCreatedTime = true;
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
break;
case CLEANUP_ATTEMPT_STARTED:
case SETUP_ATTEMPT_STARTED:
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
break;
case MAP_ATTEMPT_FAILED:
case CLEANUP_ATTEMPT_FAILED:
case REDUCE_ATTEMPT_FAILED:
case SETUP_ATTEMPT_FAILED:
case MAP_ATTEMPT_KILLED:
case CLEANUP_ATTEMPT_KILLED:
case REDUCE_ATTEMPT_KILLED:
case SETUP_ATTEMPT_KILLED:
taskId = ((TaskAttemptUnsuccessfulCompletionEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
getTaskAttemptId().toString();
break;
case MAP_ATTEMPT_FINISHED:
taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((MapAttemptFinishedEvent)event).getAttemptId().toString();
break;
case REDUCE_ATTEMPT_FINISHED:
taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((ReduceAttemptFinishedEvent)event).getAttemptId().toString();
break;
case SETUP_ATTEMPT_FINISHED:
case CLEANUP_ATTEMPT_FINISHED:
taskId = ((TaskAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptFinishedEvent)event).getAttemptId().toString();
break;
default:
LOG.warn("EventType: " + event.getEventType() + " cannot be recognized" +
" and handled by timeline service.");
return;
// Handle job events
case JOB_SUBMITTED:
setCreatedTime = true;
break;
case JOB_STATUS_CHANGED:
case JOB_INFO_CHANGED:
case JOB_INITED:
case JOB_PRIORITY_CHANGED:
case JOB_QUEUE_CHANGED:
case JOB_FAILED:
case JOB_KILLED:
case JOB_ERROR:
case JOB_FINISHED:
case AM_STARTED:
case NORMALIZED_RESOURCE:
break;
// Handle task events
case TASK_STARTED:
setCreatedTime = true;
taskId = ((TaskStartedEvent)event).getTaskId().toString();
break;
case TASK_FAILED:
taskId = ((TaskFailedEvent)event).getTaskId().toString();
break;
case TASK_UPDATED:
taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
break;
case TASK_FINISHED:
taskId = ((TaskFinishedEvent)event).getTaskId().toString();
break;
case MAP_ATTEMPT_STARTED:
case REDUCE_ATTEMPT_STARTED:
setCreatedTime = true;
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
break;
case CLEANUP_ATTEMPT_STARTED:
case SETUP_ATTEMPT_STARTED:
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
break;
case MAP_ATTEMPT_FAILED:
case CLEANUP_ATTEMPT_FAILED:
case REDUCE_ATTEMPT_FAILED:
case SETUP_ATTEMPT_FAILED:
case MAP_ATTEMPT_KILLED:
case CLEANUP_ATTEMPT_KILLED:
case REDUCE_ATTEMPT_KILLED:
case SETUP_ATTEMPT_KILLED:
taskId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
getTaskId().toString();
taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
getTaskAttemptId().toString();
break;
case MAP_ATTEMPT_FINISHED:
taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((MapAttemptFinishedEvent)event).
getAttemptId().toString();
break;
case REDUCE_ATTEMPT_FINISHED:
taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((ReduceAttemptFinishedEvent)event).
getAttemptId().toString();
break;
case SETUP_ATTEMPT_FINISHED:
case CLEANUP_ATTEMPT_FINISHED:
taskId = ((TaskAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptFinishedEvent)event).
getAttemptId().toString();
break;
default:
LOG.warn("EventType: " + event.getEventType() + " cannot be recognized" +
" and handled by timeline service.");
return;
}
if (taskId == null) {
// JobEntity
@ -1286,7 +1293,6 @@ private void processEventForNewTimelineService(HistoryEvent event, JobId jobId,
taskId, setCreatedTime);
}
}
putEntityWithoutBlocking(timelineClient, tEntity);
}

View File

@ -405,7 +405,7 @@ public Object getDatum() {
public void setDatum(Object datum) {
this.datum = datum;
}
@Override
public TimelineEvent toTimelineEvent() {
return null;

View File

@ -26,7 +26,12 @@
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.ObjectNode;
public class JobHistoryEventUtils {
/**
* Class containing utility methods to be used by JobHistoryEventHandler.
*/
public final class JobHistoryEventUtils {
private JobHistoryEventUtils() {
}
public static JsonNode countersToJSON(Counters counters) {
ObjectMapper mapper = new ObjectMapper();

View File

@ -125,7 +125,8 @@ private void addMetrics(TimelineEntity entity, Counters counters) {
}
}
private Set<TimelineEntity> createTaskAndTaskAttemptEntities(JobInfo jobInfo) {
private Set<TimelineEntity> createTaskAndTaskAttemptEntities(
JobInfo jobInfo) {
Set<TimelineEntity> entities = new HashSet<>();
Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
@ -175,7 +176,8 @@ private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) {
return taskAttempts;
}
private TimelineEntity createTaskAttemptEntity(TaskAttemptInfo taskAttemptInfo) {
private TimelineEntity createTaskAttemptEntity(
TaskAttemptInfo taskAttemptInfo) {
TimelineEntity taskAttempt = new TimelineEntity();
taskAttempt.setType(TASK_ATTEMPT);
taskAttempt.setId(taskAttemptInfo.getAttemptId().toString());

View File

@ -28,6 +28,9 @@
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
/**
* Used to parse job history and configuration files.
*/
class JobHistoryFileParser {
private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class);

View File

@ -20,6 +20,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This entity represents an application attempt.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ApplicationAttemptEntity extends HierarchicalTimelineEntity {
@ -29,8 +32,10 @@ public ApplicationAttemptEntity() {
public ApplicationAttemptEntity(TimelineEntity entity) {
super(entity);
if (!entity.getType().equals(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString())) {
throw new IllegalArgumentException("Incompatible entity type: " + getId());
if (!entity.getType().equals(
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString())) {
throw new IllegalArgumentException("Incompatible entity type: "
+ getId());
}
}
}

View File

@ -20,6 +20,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This entity represents an application.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ApplicationEntity extends HierarchicalTimelineEntity {
@ -32,8 +35,10 @@ public ApplicationEntity() {
public ApplicationEntity(TimelineEntity entity) {
super(entity);
if (!entity.getType().equals(TimelineEntityType.YARN_APPLICATION.toString())) {
throw new IllegalArgumentException("Incompatible entity type: " + getId());
if (!entity.getType().equals(
TimelineEntityType.YARN_APPLICATION.toString())) {
throw new IllegalArgumentException("Incompatible entity type: "
+ getId());
}
}

View File

@ -20,6 +20,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This entity represents a YARN cluster.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ClusterEntity extends HierarchicalTimelineEntity {
@ -30,7 +33,8 @@ public ClusterEntity() {
public ClusterEntity(TimelineEntity entity) {
super(entity);
if (!entity.getType().equals(TimelineEntityType.YARN_CLUSTER.toString())) {
throw new IllegalArgumentException("Incompatible entity type: " + getId());
throw new IllegalArgumentException("Incompatible entity type: "
+ getId());
}
}
}

View File

@ -20,6 +20,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This entity represents a container belonging to an application attempt.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ContainerEntity extends HierarchicalTimelineEntity {
@ -29,8 +32,10 @@ public ContainerEntity() {
public ContainerEntity(TimelineEntity entity) {
super(entity);
if (!entity.getType().equals(TimelineEntityType.YARN_CONTAINER.toString())) {
throw new IllegalArgumentException("Incompatible entity type: " + getId());
if (!entity.getType().equals(
TimelineEntityType.YARN_CONTAINER.toString())) {
throw new IllegalArgumentException("Incompatible entity type: "
+ getId());
}
}
}

View File

@ -22,6 +22,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This entity represents a flow run.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class FlowRunEntity extends HierarchicalTimelineEntity {
@ -44,8 +47,10 @@ public FlowRunEntity() {
public FlowRunEntity(TimelineEntity entity) {
super(entity);
if (!entity.getType().equals(TimelineEntityType.YARN_FLOW_RUN.toString())) {
throw new IllegalArgumentException("Incompatible entity type: " + getId());
if (!entity.getType().equals(
TimelineEntityType.YARN_FLOW_RUN.toString())) {
throw new IllegalArgumentException("Incompatible entity type: "
+ getId());
}
// set config to null
setConfigs(null);

View File

@ -17,17 +17,18 @@
*/
package org.apache.hadoop.yarn.api.records.timelineservice;
import com.google.common.base.Joiner;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* This class extends timeline entity and defines parent-child relationships
* with other entities.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract class HierarchicalTimelineEntity extends TimelineEntity {
@ -66,6 +67,7 @@ public void setParent(String type, String id) {
setParent(new Identifier(type, id));
}
@SuppressWarnings("unchecked")
public Set<Identifier> getChildren() {
Object identifiers = getInfo().get(CHILDREN_INFO_KEY);
if (identifiers == null) {

View File

@ -20,6 +20,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This entity represents a queue.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class QueueEntity extends HierarchicalTimelineEntity {
@ -30,7 +33,8 @@ public QueueEntity() {
public QueueEntity(TimelineEntity entity) {
super(entity);
if (!entity.getType().equals(TimelineEntityType.YARN_QUEUE.toString())) {
throw new IllegalArgumentException("Incompatible entity type: " + getId());
throw new IllegalArgumentException("Incompatible entity type: "
+ getId());
}
}
}

View File

@ -27,6 +27,9 @@
import java.util.HashSet;
import java.util.Set;
/**
* This class hosts a set of timeline entities.
*/
@XmlRootElement(name = "entities")
@XmlAccessorType(XmlAccessType.NONE)
@InterfaceAudience.Public
@ -44,12 +47,12 @@ public Set<TimelineEntity> getEntities() {
return entities;
}
public void setEntities(Set<TimelineEntity> entities) {
this.entities = entities;
public void setEntities(Set<TimelineEntity> timelineEntities) {
this.entities = timelineEntities;
}
public void addEntities(Set<TimelineEntity> entities) {
this.entities.addAll(entities);
public void addEntities(Set<TimelineEntity> timelineEntities) {
this.entities.addAll(timelineEntities);
}
public void addEntity(TimelineEntity entity) {

View File

@ -54,6 +54,9 @@
public class TimelineEntity implements Comparable<TimelineEntity> {
protected final static String SYSTEM_INFO_KEY_PREFIX = "SYSTEM_INFO_";
/**
* Identifier of timeline entity(entity id + entity type).
*/
@XmlRootElement(name = "identifier")
@XmlAccessorType(XmlAccessType.NONE)
public static class Identifier {
@ -74,8 +77,8 @@ public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
public void setType(String entityType) {
this.type = entityType;
}
@XmlElement(name = "id")
@ -83,8 +86,8 @@ public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
public void setId(String entityId) {
this.id = entityId;
}
@Override
@ -106,8 +109,9 @@ public int hashCode() {
@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
}
if (!(obj instanceof Identifier)) {
return false;
}
@ -208,11 +212,11 @@ public Identifier getIdentifier() {
}
}
public void setIdentifier(Identifier identifier) {
public void setIdentifier(Identifier entityIdentifier) {
if (real == null) {
this.identifier = identifier;
this.identifier = entityIdentifier;
} else {
real.setIdentifier(identifier);
real.setIdentifier(entityIdentifier);
}
}
@ -235,19 +239,19 @@ public Map<String, Object> getInfo() {
}
}
public void setInfo(Map<String, Object> info) {
public void setInfo(Map<String, Object> entityInfos) {
if (real == null) {
this.info = TimelineServiceHelper.mapCastToHashMap(info);
this.info = TimelineServiceHelper.mapCastToHashMap(entityInfos);
} else {
real.setInfo(info);
real.setInfo(entityInfos);
}
}
public void addInfo(Map<String, Object> info) {
public void addInfo(Map<String, Object> entityInfos) {
if (real == null) {
this.info.putAll(info);
this.info.putAll(entityInfos);
} else {
real.addInfo(info);
real.addInfo(entityInfos);
}
}
@ -278,19 +282,19 @@ public Map<String, String> getConfigs() {
}
}
public void setConfigs(Map<String, String> configs) {
public void setConfigs(Map<String, String> entityConfigs) {
if (real == null) {
this.configs = TimelineServiceHelper.mapCastToHashMap(configs);
this.configs = TimelineServiceHelper.mapCastToHashMap(entityConfigs);
} else {
real.setConfigs(configs);
real.setConfigs(entityConfigs);
}
}
public void addConfigs(Map<String, String> configs) {
public void addConfigs(Map<String, String> entityConfigs) {
if (real == null) {
this.configs.putAll(configs);
this.configs.putAll(entityConfigs);
} else {
real.addConfigs(configs);
real.addConfigs(entityConfigs);
}
}
@ -311,19 +315,19 @@ public Set<TimelineMetric> getMetrics() {
}
}
public void setMetrics(Set<TimelineMetric> metrics) {
public void setMetrics(Set<TimelineMetric> entityMetrics) {
if (real == null) {
this.metrics = metrics;
this.metrics = entityMetrics;
} else {
real.setMetrics(metrics);
real.setMetrics(entityMetrics);
}
}
public void addMetrics(Set<TimelineMetric> metrics) {
public void addMetrics(Set<TimelineMetric> entityMetrics) {
if (real == null) {
this.metrics.addAll(metrics);
this.metrics.addAll(entityMetrics);
} else {
real.addMetrics(metrics);
real.addMetrics(entityMetrics);
}
}
@ -344,19 +348,19 @@ public NavigableSet<TimelineEvent> getEvents() {
}
}
public void setEvents(NavigableSet<TimelineEvent> events) {
public void setEvents(NavigableSet<TimelineEvent> entityEvents) {
if (real == null) {
this.events = events;
this.events = entityEvents;
} else {
real.setEvents(events);
real.setEvents(entityEvents);
}
}
public void addEvents(Set<TimelineEvent> events) {
public void addEvents(Set<TimelineEvent> entityEvents) {
if (real == null) {
this.events.addAll(events);
this.events.addAll(entityEvents);
} else {
real.addEvents(events);
real.addEvents(entityEvents);
}
}
@ -389,20 +393,19 @@ public HashMap<String, Set<String>> getIsRelatedToEntitiesJAXB() {
@JsonSetter("isrelatedto")
public void setIsRelatedToEntities(
Map<String, Set<String>> isRelatedToEntities) {
Map<String, Set<String>> isRelatedTo) {
if (real == null) {
this.isRelatedToEntities =
TimelineServiceHelper.mapCastToHashMap(isRelatedToEntities);
TimelineServiceHelper.mapCastToHashMap(isRelatedTo);
} else {
real.setIsRelatedToEntities(isRelatedToEntities);
real.setIsRelatedToEntities(isRelatedTo);
}
}
public void addIsRelatedToEntities(
Map<String, Set<String>> isRelatedToEntities) {
Map<String, Set<String>> isRelatedTo) {
if (real == null) {
for (Map.Entry<String, Set<String>> entry : isRelatedToEntities
.entrySet()) {
for (Map.Entry<String, Set<String>> entry : isRelatedTo.entrySet()) {
Set<String> ids = this.isRelatedToEntities.get(entry.getKey());
if (ids == null) {
ids = new HashSet<>();
@ -411,7 +414,7 @@ public void addIsRelatedToEntities(
ids.addAll(entry.getValue());
}
} else {
real.addIsRelatedToEntities(isRelatedToEntities);
real.addIsRelatedToEntities(isRelatedTo);
}
}
@ -447,10 +450,9 @@ public Map<String, Set<String>> getRelatesToEntities() {
}
}
public void addRelatesToEntities(Map<String, Set<String>> relatesToEntities) {
public void addRelatesToEntities(Map<String, Set<String>> relatesTo) {
if (real == null) {
for (Map.Entry<String, Set<String>> entry : relatesToEntities
.entrySet()) {
for (Map.Entry<String, Set<String>> entry : relatesTo.entrySet()) {
Set<String> ids = this.relatesToEntities.get(entry.getKey());
if (ids == null) {
ids = new HashSet<>();
@ -459,7 +461,7 @@ public void addRelatesToEntities(Map<String, Set<String>> relatesToEntities) {
ids.addAll(entry.getValue());
}
} else {
real.addRelatesToEntities(relatesToEntities);
real.addRelatesToEntities(relatesTo);
}
}
@ -477,12 +479,12 @@ public void addRelatesToEntity(String type, String id) {
}
@JsonSetter("relatesto")
public void setRelatesToEntities(Map<String, Set<String>> relatesToEntities) {
public void setRelatesToEntities(Map<String, Set<String>> relatesTo) {
if (real == null) {
this.relatesToEntities =
TimelineServiceHelper.mapCastToHashMap(relatesToEntities);
TimelineServiceHelper.mapCastToHashMap(relatesTo);
} else {
real.setRelatesToEntities(relatesToEntities);
real.setRelatesToEntities(relatesTo);
}
}
@ -496,11 +498,11 @@ public long getCreatedTime() {
}
@JsonSetter("createdtime")
public void setCreatedTime(long createdTime) {
public void setCreatedTime(long createdTs) {
if (real == null) {
this.createdTime = createdTime;
this.createdTime = createdTs;
} else {
real.setCreatedTime(createdTime);
real.setCreatedTime(createdTs);
}
}
@ -530,10 +532,12 @@ public int hashCode() {
@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
if (!(obj instanceof TimelineEntity))
}
if (!(obj instanceof TimelineEntity)) {
return false;
}
TimelineEntity other = (TimelineEntity) obj;
return getIdentifier().equals(other.getIdentifier());
}

View File

@ -20,6 +20,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Defines type of entity.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public enum TimelineEntityType {
@ -34,51 +37,63 @@ public enum TimelineEntityType {
/**
* Whether the input type can be a parent of this entity.
*
* @param type entity type.
* @return true, if this entity type is parent of passed entity type, false
* otherwise.
*/
public boolean isParent(TimelineEntityType type) {
switch (this) {
case YARN_CLUSTER:
return false;
case YARN_FLOW_RUN:
return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
case YARN_APPLICATION:
return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
case YARN_APPLICATION_ATTEMPT:
return YARN_APPLICATION == type;
case YARN_CONTAINER:
return YARN_APPLICATION_ATTEMPT == type;
case YARN_QUEUE:
return YARN_QUEUE == type;
default:
return false;
case YARN_CLUSTER:
return false;
case YARN_FLOW_RUN:
return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
case YARN_APPLICATION:
return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
case YARN_APPLICATION_ATTEMPT:
return YARN_APPLICATION == type;
case YARN_CONTAINER:
return YARN_APPLICATION_ATTEMPT == type;
case YARN_QUEUE:
return YARN_QUEUE == type;
default:
return false;
}
}
/**
* Whether the input type can be a child of this entity.
*
* @param type entity type.
* @return true, if this entity type is child of passed entity type, false
* otherwise.
*/
public boolean isChild(TimelineEntityType type) {
switch (this) {
case YARN_CLUSTER:
return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
case YARN_FLOW_RUN:
return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
case YARN_APPLICATION:
return YARN_APPLICATION_ATTEMPT == type;
case YARN_APPLICATION_ATTEMPT:
return YARN_CONTAINER == type;
case YARN_CONTAINER:
return false;
case YARN_QUEUE:
return YARN_QUEUE == type;
default:
return false;
case YARN_CLUSTER:
return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
case YARN_FLOW_RUN:
return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
case YARN_APPLICATION:
return YARN_APPLICATION_ATTEMPT == type;
case YARN_APPLICATION_ATTEMPT:
return YARN_CONTAINER == type;
case YARN_CONTAINER:
return false;
case YARN_QUEUE:
return YARN_QUEUE == type;
default:
return false;
}
}
/**
* Whether the type of this entity matches the type indicated by the input
* argument.
*
* @param typeString entity type represented as a string.
* @return true, if string representation of this entity type matches the
* entity type passed.
*/
public boolean matches(String typeString) {
return toString().equals(typeString);

View File

@ -28,6 +28,11 @@
import java.util.HashMap;
import java.util.Map;
/**
* This class contains the information of an event that belongs to an entity.
* Users are free to define what the event means, such as starting an
* application, container being allocated, etc.
*/
@XmlRootElement(name = "event")
@XmlAccessorType(XmlAccessType.NONE)
@InterfaceAudience.Public
@ -48,8 +53,8 @@ public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
public void setId(String eventId) {
this.id = eventId;
}
// required by JAXB
@ -63,12 +68,12 @@ public Map<String, Object> getInfo() {
return info;
}
public void setInfo(Map<String, Object> info) {
this.info = TimelineServiceHelper.mapCastToHashMap(info);
public void setInfo(Map<String, Object> infos) {
this.info = TimelineServiceHelper.mapCastToHashMap(infos);
}
public void addInfo(Map<String, Object> info) {
this.info.putAll(info);
public void addInfo(Map<String, Object> infos) {
this.info.putAll(infos);
}
public void addInfo(String key, Object value) {
@ -80,8 +85,8 @@ public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
public void setTimestamp(long ts) {
this.timestamp = ts;
}
public boolean isValid() {
@ -97,15 +102,18 @@ public int hashCode() {
@Override
public boolean equals(Object o) {
if (this == o)
if (this == o) {
return true;
if (!(o instanceof TimelineEvent))
}
if (!(o instanceof TimelineEvent)) {
return false;
}
TimelineEvent event = (TimelineEvent) o;
if (timestamp != event.timestamp)
if (timestamp != event.timestamp) {
return false;
}
if (!id.equals(event.id)) {
return false;
}

View File

@ -28,12 +28,19 @@
import java.util.Map;
import java.util.TreeMap;
/**
* This class contains the information of a metric that is related to some
* entity. Metric can either be a time series or single value.
*/
@XmlRootElement(name = "metric")
@XmlAccessorType(XmlAccessType.NONE)
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class TimelineMetric {
/**
* Type of metric.
*/
public static enum Type {
SINGLE_VALUE,
TIME_SERIES
@ -63,8 +70,8 @@ public Type getType() {
return type;
}
public void setType(Type type) {
this.type = type;
public void setType(Type metricType) {
this.type = metricType;
}
@XmlElement(name = "id")
@ -72,8 +79,8 @@ public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
public void setId(String metricId) {
this.id = metricId;
}
// required by JAXB
@ -87,24 +94,24 @@ public Map<Long, Number> getValues() {
return values;
}
public void setValues(Map<Long, Number> values) {
public void setValues(Map<Long, Number> vals) {
if (type == Type.SINGLE_VALUE) {
overwrite(values);
overwrite(vals);
} else {
if (values != null) {
this.values = new TreeMap<Long, Number>(reverseComparator);
this.values.putAll(values);
this.values.putAll(vals);
} else {
this.values = null;
}
}
}
public void addValues(Map<Long, Number> values) {
public void addValues(Map<Long, Number> vals) {
if (type == Type.SINGLE_VALUE) {
overwrite(values);
overwrite(vals);
} else {
this.values.putAll(values);
this.values.putAll(vals);
}
}
@ -115,14 +122,14 @@ public void addValue(long timestamp, Number value) {
values.put(timestamp, value);
}
private void overwrite(Map<Long, Number> values) {
if (values.size() > 1) {
private void overwrite(Map<Long, Number> vals) {
if (vals.size() > 1) {
throw new IllegalArgumentException(
"Values cannot contain more than one point in " +
Type.SINGLE_VALUE + " mode");
}
this.values.clear();
this.values.putAll(values);
this.values.putAll(vals);
}
public boolean isValid() {
@ -139,10 +146,12 @@ public int hashCode() {
// Only check if type and id are equal
@Override
public boolean equals(Object o) {
if (this == o)
if (this == o) {
return true;
if (!(o instanceof TimelineMetric))
}
if (!(o instanceof TimelineMetric)) {
return false;
}
TimelineMetric m = (TimelineMetric) o;

View File

@ -29,9 +29,9 @@
/**
* A class that holds a list of put errors. This is the response returned when a
* list of {@link TimelineEntity} objects is added to the timeline. If there are errors
* in storing individual entity objects, they will be indicated in the list of
* errors.
* list of {@link TimelineEntity} objects is added to the timeline. If there are
* errors in storing individual entity objects, they will be indicated in the
* list of errors.
*/
@XmlRootElement(name = "response")
@XmlAccessorType(XmlAccessType.NONE)
@ -46,7 +46,7 @@ public TimelineWriteResponse() {
}
/**
* Get a list of {@link TimelineWriteError} instances
* Get a list of {@link TimelineWriteError} instances.
*
* @return a list of {@link TimelineWriteError} instances
*/
@ -56,7 +56,7 @@ public List<TimelineWriteError> getErrors() {
}
/**
* Add a single {@link TimelineWriteError} instance into the existing list
* Add a single {@link TimelineWriteError} instance into the existing list.
*
* @param error
* a single {@link TimelineWriteError} instance
@ -66,24 +66,24 @@ public void addError(TimelineWriteError error) {
}
/**
* Add a list of {@link TimelineWriteError} instances into the existing list
* Add a list of {@link TimelineWriteError} instances into the existing list.
*
* @param errors
* @param writeErrors
* a list of {@link TimelineWriteError} instances
*/
public void addErrors(List<TimelineWriteError> errors) {
this.errors.addAll(errors);
public void addErrors(List<TimelineWriteError> writeErrors) {
this.errors.addAll(writeErrors);
}
/**
* Set the list to the given list of {@link TimelineWriteError} instances
* Set the list to the given list of {@link TimelineWriteError} instances.
*
* @param errors
* @param writeErrors
* a list of {@link TimelineWriteError} instances
*/
public void setErrors(List<TimelineWriteError> errors) {
public void setErrors(List<TimelineWriteError> writeErrors) {
this.errors.clear();
this.errors.addAll(errors);
this.errors.addAll(writeErrors);
}
/**
@ -106,7 +106,7 @@ public static class TimelineWriteError {
private int errorCode;
/**
* Get the entity Id
* Get the entity Id.
*
* @return the entity Id
*/
@ -116,17 +116,16 @@ public String getEntityId() {
}
/**
* Set the entity Id
* Set the entity Id.
*
* @param entityId
* the entity Id
* @param id the entity Id.
*/
public void setEntityId(String entityId) {
this.entityId = entityId;
public void setEntityId(String id) {
this.entityId = id;
}
/**
* Get the entity type
* Get the entity type.
*
* @return the entity type
*/
@ -136,17 +135,16 @@ public String getEntityType() {
}
/**
* Set the entity type
* Set the entity type.
*
* @param entityType
* the entity type
* @param type the entity type.
*/
public void setEntityType(String entityType) {
this.entityType = entityType;
public void setEntityType(String type) {
this.entityType = type;
}
/**
* Get the error code
* Get the error code.
*
* @return an error code
*/
@ -156,13 +154,12 @@ public int getErrorCode() {
}
/**
* Set the error code to the given error code
* Set the error code to the given error code.
*
* @param errorCode
* an error code
* @param code an error code.
*/
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
public void setErrorCode(int code) {
this.errorCode = code;
}
}

View File

@ -20,6 +20,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This entity represents a user.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class UserEntity extends TimelineEntity {
@ -30,7 +33,8 @@ public UserEntity() {
public UserEntity(TimelineEntity entity) {
super(entity);
if (!entity.getType().equals(TimelineEntityType.YARN_USER.toString())) {
throw new IllegalArgumentException("Incompatible entity type: " + getId());
throw new IllegalArgumentException("Incompatible entity type: "
+ getId());
}
}
}

View File

@ -15,7 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Public package org.apache.hadoop.yarn.api.records.timelineservice;
/**
* Package org.apache.hadoop.yarn.api.records.timelineservice contains classes
* which define the data model for ATSv2.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.api.records.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -495,7 +495,8 @@ public static boolean isAclEnabled(Configuration conf) {
*/
public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = RM_PREFIX
+ "system-metrics-publisher.enabled";
public static final boolean DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
public static final boolean DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED =
false;
/**
* The setting that controls whether yarn system metrics is published on the
@ -517,8 +518,8 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size";
public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
10;
public static final int
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10;
//RM delegation token related keys
public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY =
@ -1827,7 +1828,7 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String TIMELINE_SERVICE_UI_WEB_PATH_PREFIX =
TIMELINE_SERVICE_PREFIX + "ui-web-path.";
/** Timeline client settings */
/** Timeline client settings. */
public static final String TIMELINE_SERVICE_CLIENT_PREFIX =
TIMELINE_SERVICE_PREFIX + "client.";

View File

@ -55,7 +55,7 @@ public abstract class TimelineClient extends AbstractService implements
* construct and initialize a timeline client if the following operations are
* supposed to be conducted by that user.
*/
protected ApplicationId contextAppId;
private ApplicationId contextAppId;
/**
* Creates an instance of the timeline v.1.x client.
@ -78,7 +78,7 @@ public static TimelineClient createTimelineClient(ApplicationId appId) {
@Private
protected TimelineClient(String name, ApplicationId appId) {
super(name);
contextAppId = appId;
setContextAppId(appId);
}
/**
@ -242,11 +242,18 @@ public abstract void putEntitiesAsync(
/**
* <p>
* Update the timeline service address where the request will be sent to
* Update the timeline service address where the request will be sent to.
* </p>
* @param address
* the timeline service address
*/
public abstract void setTimelineServiceAddress(String address);
protected ApplicationId getContextAppId() {
return contextAppId;
}
protected void setContextAppId(ApplicationId appId) {
this.contextAppId = appId;
}
}

View File

@ -400,8 +400,8 @@ private void putEntities(boolean async,
entitiesContainer.addEntity(entity);
}
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
if (contextAppId != null) {
params.add("appid", contextAppId.toString());
if (getContextAppId() != null) {
params.add("appid", getContextAppId().toString());
}
if (async) {
params.add("async", Boolean.TRUE.toString());
@ -439,8 +439,7 @@ public void putObjects(String path, MultivaluedMap<String, String> params,
URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
putObjects(uri, path, params, obj);
needRetry = false;
}
catch (Exception e) {
} catch (Exception e) {
// TODO only handle exception for timelineServiceAddress being updated.
// skip retry for other exceptions.
checkRetryWithSleep(retries, e);
@ -463,10 +462,9 @@ private void checkRetryWithSleep(int retries, Exception e) throws
Thread.currentThread().interrupt();
}
} else {
LOG.error(
"TimelineClient has reached to max retry times :" +
this.maxServiceRetries + " for service address: " +
timelineServiceAddress);
LOG.error("TimelineClient has reached to max retry times :" +
this.maxServiceRetries + " for service address: " +
timelineServiceAddress);
if (e instanceof YarnException) {
throw (YarnException)e;
} else if (e instanceof IOException) {
@ -632,7 +630,7 @@ private Object operateDelegationToken(
}
/**
* Poll TimelineServiceAddress for maximum of retries times if it is null
* Poll TimelineServiceAddress for maximum of retries times if it is null.
* @param retries
* @return the left retry times
*/

View File

@ -45,8 +45,10 @@
public class TimelineUtils {
public static final String FLOW_NAME_TAG_PREFIX = "TIMELINE_FLOW_NAME_TAG";
public static final String FLOW_VERSION_TAG_PREFIX = "TIMELINE_FLOW_VERSION_TAG";
public static final String FLOW_RUN_ID_TAG_PREFIX = "TIMELINE_FLOW_RUN_ID_TAG";
public static final String FLOW_VERSION_TAG_PREFIX =
"TIMELINE_FLOW_VERSION_TAG";
public static final String FLOW_RUN_ID_TAG_PREFIX =
"TIMELINE_FLOW_RUN_ID_TAG";
private static ObjectMapper mapper;
@ -160,38 +162,39 @@ public static Text buildTimelineTokenService(Configuration conf) {
return SecurityUtil.buildTokenService(timelineServiceAddr);
}
public static String generateDefaultFlowNameBasedOnAppId(ApplicationId appId) {
public static String generateDefaultFlowNameBasedOnAppId(
ApplicationId appId) {
return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId();
}
/**
* Generate flow name tag
* Generate flow name tag.
*
* @param flowName flow name that identifies a distinct flow application which
* can be run repeatedly over time
* @return
* @return flow name tag.
*/
public static String generateFlowNameTag(String flowName) {
return FLOW_NAME_TAG_PREFIX + ":" + flowName;
}
/**
* Generate flow version tag
* Generate flow version tag.
*
* @param flowVersion flow version that keeps track of the changes made to the
* flow
* @return
* @return flow version tag.
*/
public static String generateFlowVersionTag(String flowVersion) {
return FLOW_VERSION_TAG_PREFIX + ":" + flowVersion;
}
/**
* Generate flow run ID tag
* Generate flow run ID tag.
*
* @param flowRunId flow run ID that identifies one instance (or specific
* execution) of that flow
* @return
* @return flow run id tag.
*/
public static String generateFlowRunIdTag(long flowRunId) {
return FLOW_RUN_ID_TAG_PREFIX + ":" + flowRunId;

View File

@ -483,7 +483,7 @@ public static class NMContext implements Context {
protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
protected Map<ApplicationId, String> registeredCollectors;
private Map<ApplicationId, String> registeredCollectors;
protected final ConcurrentMap<ContainerId,
org.apache.hadoop.yarn.api.records.Container> increasedContainers =

View File

@ -52,12 +52,11 @@ public class NMCollectorService extends CompositeService implements
private static final Log LOG = LogFactory.getLog(NMCollectorService.class);
final Context context;
private final Context context;
private Server server;
public NMCollectorService(Context context) {
super(NMCollectorService.class.getName());
this.context = context;
}
@ -123,7 +122,8 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
client.setTimelineServiceAddress(collectorAddr);
}
}
((NodeManager.NMContext)context).addRegisteredCollectors(newCollectorsMap);
((NodeManager.NMContext)context).addRegisteredCollectors(
newCollectorsMap);
}
return ReportNewCollectorInfoResponse.newInstance();
@ -139,6 +139,7 @@ public GetTimelineCollectorContextResponse getTimelineCollectorContext(
" doesn't exist on NM.");
}
return GetTimelineCollectorContextResponse.newInstance(
app.getUser(), app.getFlowName(), app.getFlowVersion(), app.getFlowRunId());
app.getUser(), app.getFlowName(), app.getFlowVersion(),
app.getFlowRunId());
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.nodemanager.collectormanager contains
* classes for handling timeline collector information.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.nodemanager.collectormanager;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -20,6 +20,10 @@
import org.apache.hadoop.yarn.event.AbstractEvent;
/**
* Event posted to NMTimelinePublisher which in turn publishes it to
* timelineservice v2.
*/
public class NMTimelineEvent extends AbstractEvent<NMTimelineEventType> {
public NMTimelineEvent(NMTimelineEventType type) {
super(type);

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
/**
* Type of {@link NMTimelineEvent}.
*/
public enum NMTimelineEventType {
// Publish the NM Timeline entity
TIMELINE_ENTITY_PUBLISH,

View File

@ -127,7 +127,8 @@ public void reportContainerResourceUsage(Container container, String pId,
memoryMetric.addValue(currentTimeMillis, pmemUsage);
entity.addMetric(memoryMetric);
}
if (cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE) {
if (cpuUsageTotalCoresPercentage !=
ResourceCalculatorProcessTree.UNAVAILABLE) {
TimelineMetric cpuMetric = new TimelineMetric();
cpuMetric.setId(ContainerMetric.CPU.toString() + pId);
cpuMetric.addValue(currentTimeMillis, cpuUsageTotalCoresPercentage);
@ -189,7 +190,8 @@ private void publishContainerFinishedEvent(ContainerStatus containerStatus,
putEntity(entity, containerId.getApplicationAttemptId().getApplicationId());
}
private static ContainerEntity createContainerEntity(ContainerId containerId) {
private static ContainerEntity createContainerEntity(
ContainerId containerId) {
ContainerEntity entity = new ContainerEntity();
entity.setId(containerId.toString());
Identifier parentIdentifier = new Identifier();
@ -214,6 +216,7 @@ private void putEntity(TimelineEntity entity, ApplicationId appId) {
}
}
@SuppressWarnings("unchecked")
public void publishApplicationEvent(ApplicationEvent event) {
// publish only when the desired event is received
switch (event.getType()) {
@ -226,14 +229,14 @@ public void publishApplicationEvent(ApplicationEvent event) {
default:
if (LOG.isDebugEnabled()) {
LOG.debug(event.getType()
+ " is not a desired ApplicationEvent which needs to be published by"
+ " NMTimelinePublisher");
LOG.debug(event.getType() + " is not a desired ApplicationEvent which"
+ " needs to be published by NMTimelinePublisher");
}
break;
}
}
@SuppressWarnings("unchecked")
public void publishContainerEvent(ContainerEvent event) {
// publish only when the desired event is received
switch (event.getType()) {
@ -251,6 +254,7 @@ public void publishContainerEvent(ContainerEvent event) {
}
}
@SuppressWarnings("unchecked")
public void publishLocalizationEvent(LocalizationEvent event) {
// publish only when the desired event is received
switch (event.getType()) {

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.nodemanager.timelineservice contains
* classes related to publishing container events and other NM lifecycle events
* to ATSv2.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -388,15 +388,15 @@ public RMTimelineCollectorManager getRMTimelineCollectorManager() {
@Private
@Unstable
public void setRMTimelineCollectorManager(
RMTimelineCollectorManager timelineCollectorManager) {
this.timelineCollectorManager = timelineCollectorManager;
RMTimelineCollectorManager collectorManager) {
this.timelineCollectorManager = collectorManager;
}
@Private
@Unstable
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher;
SystemMetricsPublisher metricsPublisher) {
this.systemMetricsPublisher = metricsPublisher;
}
@Private

View File

@ -383,8 +383,8 @@ public RMTimelineCollectorManager getRMTimelineCollectorManager() {
@Override
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher;
SystemMetricsPublisher metricsPublisher) {
this.systemMetricsPublisher = metricsPublisher;
}
@Override

View File

@ -30,6 +30,10 @@
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
/**
* Abstract implementation of SystemMetricsPublisher which is then extended by
* metrics publisher implementations depending on timeline service version.
*/
public abstract class AbstractSystemMetricsPublisher extends CompositeService
implements SystemMetricsPublisher {
private MultiThreadedDispatcher dispatcher;
@ -45,14 +49,19 @@ public AbstractSystemMetricsPublisher(String name) {
@Override
protected void serviceInit(Configuration conf) throws Exception {
dispatcher =
new MultiThreadedDispatcher(getConfig().getInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
new MultiThreadedDispatcher(getConfig().getInt(
YarnConfiguration.
RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
YarnConfiguration.
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
dispatcher.setDrainEventsOnStop();
addIfService(dispatcher);
super.serviceInit(conf);
}
/**
* Dispatches ATS related events using multiple threads.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public static class MultiThreadedDispatcher extends CompositeService
implements Dispatcher {
@ -107,7 +116,7 @@ protected AsyncDispatcher createDispatcher() {
}
/**
* EventType which is used while publishing the events
* EventType which is used while publishing the events.
*/
protected static enum SystemMetricsEventType {
PUBLISH_ENTITY, PUBLISH_APPLICATION_FINISHED_ENTITY
@ -158,9 +167,10 @@ public boolean equals(Object obj) {
if (other.getType() != null) {
return false;
}
} else
} else {
if (!appId.equals(other.appId) || !getType().equals(other.getType())) {
return false;
return false;
}
}
return true;
}

View File

@ -25,7 +25,7 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
/**
* This class does nothing when any of the methods are invoked on
* SystemMetricsPublisher
* SystemMetricsPublisher.
*/
public class NoOpSystemMetricPublisher implements SystemMetricsPublisher{

View File

@ -25,6 +25,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
/**
* Interface used to publish app/container events to timelineservice.
*/
public interface SystemMetricsPublisher {
void appCreated(RMApp app, long createdTime);

View File

@ -44,6 +44,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* This class is responsible for posting application, appattempt &amp; Container
* lifecycle related events to timeline service v1.
*/
public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
private static final Log LOG =
@ -138,8 +142,8 @@ public void appFinished(RMApp app, RMAppState state, long finishedTime) {
entity.addEvent(tEvent);
// sync sending of finish event to avoid possibility of saving application
// finished state in RMStateStore save without publishing in ATS
putEntity(entity);// sync event so that ATS update is done without fail
// finished state in RMStateStore save without publishing in ATS.
putEntity(entity); // sync event so that ATS update is done without fail.
}
@SuppressWarnings("unchecked")

View File

@ -61,14 +61,14 @@
/**
* This class is responsible for posting application, appattempt &amp; Container
* lifecycle related events to timeline service V2
* lifecycle related events to timeline service v2.
*/
@Private
@Unstable
public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
private static final Log LOG =
LogFactory.getLog(TimelineServiceV2Publisher.class);
protected RMTimelineCollectorManager rmTimelineCollectorManager;
private RMTimelineCollectorManager rmTimelineCollectorManager;
private boolean publishContainerMetrics;
public TimelineServiceV2Publisher(RMContext rmContext) {
@ -362,7 +362,8 @@ public void containerFinished(RMContainer container, long finishedTime) {
}
}
private static ContainerEntity createContainerEntity(ContainerId containerId) {
private static ContainerEntity createContainerEntity(
ContainerId containerId) {
ContainerEntity entity = new ContainerEntity();
entity.setId(containerId.toString());
entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION_ATTEMPT

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.resourcemanager.metrics contains
* classes related to publishing app/container events to ATS.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -20,6 +20,9 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* Event used for updating collector address in RMApp on node heartbeat.
*/
public class RMAppCollectorUpdateEvent extends RMAppEvent {
private final String appCollectorAddr;

View File

@ -98,7 +98,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@ -597,8 +596,8 @@ public String getCollectorAddr() {
}
@Override
public void setCollectorAddr(String collectorAddr) {
this.collectorAddr = collectorAddr;
public void setCollectorAddr(String collectorAddress) {
this.collectorAddr = collectorAddress;
}
@Override

View File

@ -28,7 +28,10 @@
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* This class extends TimelineCollectorManager to provide RM specific
* implementations.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RMTimelineCollectorManager extends TimelineCollectorManager {
@ -44,8 +47,8 @@ public void postPut(ApplicationId appId, TimelineCollector collector) {
RMApp app = rmContext.getRMApps().get(appId);
if (app == null) {
throw new YarnRuntimeException(
"Unable to get the timeline collector context info for a non-existing app " +
appId);
"Unable to get the timeline collector context info for a " +
"non-existing app " + appId);
}
String userId = app.getUser();
if (userId != null && !userId.isEmpty()) {
@ -57,18 +60,18 @@ public void postPut(ApplicationId appId, TimelineCollector collector) {
continue;
}
switch (parts[0].toUpperCase()) {
case TimelineUtils.FLOW_NAME_TAG_PREFIX:
collector.getTimelineEntityContext().setFlowName(parts[1]);
break;
case TimelineUtils.FLOW_VERSION_TAG_PREFIX:
collector.getTimelineEntityContext().setFlowVersion(parts[1]);
break;
case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX:
collector.getTimelineEntityContext().setFlowRunId(
Long.parseLong(parts[1]));
break;
default:
break;
case TimelineUtils.FLOW_NAME_TAG_PREFIX:
collector.getTimelineEntityContext().setFlowName(parts[1]);
break;
case TimelineUtils.FLOW_VERSION_TAG_PREFIX:
collector.getTimelineEntityContext().setFlowVersion(parts[1]);
break;
case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX:
collector.getTimelineEntityContext().setFlowRunId(
Long.parseLong(parts[1]));
break;
default:
break;
}
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.resourcemanager.timelineservice
* contains classes related to handling of app level collectors.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -55,7 +55,8 @@ protected void serviceInit(Configuration conf) throws Exception {
// Current user usually is not the app user, but keep this field non-null
context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName());
// Use app ID to generate a default flow name for orphan app
context.setFlowName(TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId));
context.setFlowName(
TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId));
// Set the flow version to string 1 if it's an orphan app
context.setFlowVersion("1");
// Set the flow run ID to 1 if it's an orphan app

View File

@ -48,13 +48,17 @@
import com.google.common.annotations.VisibleForTesting;
/**
* Class on the NodeManager side that manages adding and removing collectors and
* their lifecycle. Also instantiates the per-node collector webapp.
*/
@Private
@Unstable
public class NodeTimelineCollectorManager extends TimelineCollectorManager {
private static final Log LOG =
LogFactory.getLog(NodeTimelineCollectorManager.class);
// REST server for this collector manager
// REST server for this collector manager.
private HttpServer2 timelineRestServer;
private String timelineRestServerBindAddress;
@ -97,7 +101,7 @@ public void postPut(ApplicationId appId, TimelineCollector collector) {
}
/**
* Launch the REST web server for this collector manager
* Launch the REST web server for this collector manager.
*/
private void startWebApp() {
Configuration conf = getConfig();

View File

@ -108,6 +108,7 @@ protected void serviceStop() throws Exception {
* The collector is also initialized and started. If the service already
* exists, no new service is created.
*
* @param appId Application Id to be added.
* @return whether it was added successfully
*/
public boolean addApplication(ApplicationId appId) {
@ -122,6 +123,7 @@ public boolean addApplication(ApplicationId appId) {
* collector is also stopped as a result. If the collector does not exist, no
* change is made.
*
* @param appId Application Id to be removed.
* @return whether it was removed successfully
*/
public boolean removeApplication(ApplicationId appId) {

View File

@ -80,6 +80,8 @@ protected void setWriter(TimelineWriter w) {
* @param entities entities to post
* @param callerUgi the caller UGI
* @return the response that contains the result of the post.
* @throws IOException if there is any exception encountered while putting
* entities.
*/
public TimelineWriteResponse putEntities(TimelineEntities entities,
UserGroupInformation callerUgi) throws IOException {

View File

@ -68,8 +68,10 @@ public void serviceInit(Configuration conf) throws Exception {
// basis
writerFlusher = Executors.newSingleThreadScheduledExecutor();
flushInterval = conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS);
YarnConfiguration.
TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS,
YarnConfiguration.
DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS);
super.serviceInit(conf);
}
@ -102,6 +104,8 @@ protected TimelineWriter getWriter() {
* Put the collector into the collection if an collector mapped by id does
* not exist.
*
* @param appId Application Id for which collector needs to be put.
* @param collector timeline collector to be put.
* @throws YarnRuntimeException if there was any exception in initializing
* and starting the app level service
* @return the collector associated with id after the potential put.
@ -140,6 +144,7 @@ protected void postPut(ApplicationId appId, TimelineCollector collector) {
* Removes the collector for the specified id. The collector is also stopped
* as a result. If the collector does not exist, no change is made.
*
* @param appId Application Id to remove.
* @return whether it was removed successfully
*/
public boolean remove(ApplicationId appId) {
@ -162,6 +167,7 @@ protected void postRemove(ApplicationId appId, TimelineCollector collector) {
/**
* Returns the collector for the specified id.
*
* @param appId Application Id for which we need to get the collector.
* @return the collector or null if it does not exist
*/
public TimelineCollector get(ApplicationId appId) {
@ -171,6 +177,8 @@ public TimelineCollector get(ApplicationId appId) {
/**
* Returns whether the collector for the specified id exists in this
* collection.
* @param appId Application Id.
* @return true if collector for the app id is found, false otherwise.
*/
public boolean containsTimelineCollector(ApplicationId appId) {
return collectors.containsKey(appId);

View File

@ -74,6 +74,9 @@ public class TimelineCollectorWebService {
private @Context ServletContext context;
/**
* Gives information about timeline collector.
*/
@XmlRootElement(name = "about")
@XmlAccessorType(XmlAccessType.NONE)
@Public
@ -86,8 +89,8 @@ public AboutInfo() {
}
public AboutInfo(String about) {
this.about = about;
public AboutInfo(String abt) {
this.about = abt;
}
@XmlElement(name = "About")
@ -95,14 +98,18 @@ public String getAbout() {
return about;
}
public void setAbout(String about) {
this.about = about;
public void setAbout(String abt) {
this.about = abt;
}
}
/**
* Return the description of the timeline web services.
*
* @param req Servlet request.
* @param res Servlet response.
* @return description of timeline web service.
*/
@GET
@Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
@ -117,6 +124,15 @@ public AboutInfo about(
* Accepts writes to the collector, and returns a response. It simply routes
* the request to the app level collector. It expects an application as a
* context.
*
* @param req Servlet request.
* @param res Servlet response.
* @param async flag indicating whether its an async put or not. "true"
* indicates, its an async call. If null, its considered false.
* @param appId Application Id to which the entities to be put belong to. If
* appId is not there or it cannot be parsed, HTTP 400 will be sent back.
* @param entities timeline entities to be put.
* @return a Response with appropriate HTTP status.
*/
@PUT
@Path("/entities")
@ -202,29 +218,29 @@ private static TimelineEntities processTimelineEntities(
}
if (type != null) {
switch (type) {
case YARN_CLUSTER:
entitiesToReturn.addEntity(new ClusterEntity(entity));
break;
case YARN_FLOW_RUN:
entitiesToReturn.addEntity(new FlowRunEntity(entity));
break;
case YARN_APPLICATION:
entitiesToReturn.addEntity(new ApplicationEntity(entity));
break;
case YARN_APPLICATION_ATTEMPT:
entitiesToReturn.addEntity(new ApplicationAttemptEntity(entity));
break;
case YARN_CONTAINER:
entitiesToReturn.addEntity(new ContainerEntity(entity));
break;
case YARN_QUEUE:
entitiesToReturn.addEntity(new QueueEntity(entity));
break;
case YARN_USER:
entitiesToReturn.addEntity(new UserEntity(entity));
break;
default:
break;
case YARN_CLUSTER:
entitiesToReturn.addEntity(new ClusterEntity(entity));
break;
case YARN_FLOW_RUN:
entitiesToReturn.addEntity(new FlowRunEntity(entity));
break;
case YARN_APPLICATION:
entitiesToReturn.addEntity(new ApplicationEntity(entity));
break;
case YARN_APPLICATION_ATTEMPT:
entitiesToReturn.addEntity(new ApplicationAttemptEntity(entity));
break;
case YARN_CONTAINER:
entitiesToReturn.addEntity(new ContainerEntity(entity));
break;
case YARN_QUEUE:
entitiesToReturn.addEntity(new QueueEntity(entity));
break;
case YARN_USER:
entitiesToReturn.addEntity(new UserEntity(entity));
break;
default:
break;
}
} else {
entitiesToReturn.addEntity(entity);

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.timelineservice.collector contains
* classes which can be used across collector. This package contains classes
* which are not related to storage implementations though.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.collector;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -34,6 +34,11 @@
import com.google.common.annotations.VisibleForTesting;
/**
* This class wraps over the timeline reader store implementation. It does some
* non trivial manipulation of the timeline data before or after getting
* it from the backend store.
*/
@Private
@Unstable
public class TimelineReaderManager extends AbstractService {
@ -114,9 +119,19 @@ private static void fillUID(TimelineEntityType entityType,
}
/**
* Get a set of entities matching given predicates. The meaning of each
* argument has been documented with {@link TimelineReader#getEntities}.
* Get a set of entities matching given predicates by making a call to
* backend storage implementation. The meaning of each argument has been
* documented in detail with {@link TimelineReader#getEntities}.If cluster ID
* has not been supplied by the client, fills the cluster id from config
* before making a call to backend storage. After fetching entities from
* backend, fills the appropriate UID based on entity type for each entity.
*
* @param context Timeline context within the scope of which entities have to
* be fetched.
* @param filters Filters which limit the number of entities to be returned.
* @param dataToRetrieve Data to carry in each entity fetched.
* @return a set of <cite>TimelineEntity</cite> objects.
* @throws IOException if any problem occurs while getting entities.
* @see TimelineReader#getEntities
*/
public Set<TimelineEntity> getEntities(TimelineReaderContext context,
@ -135,9 +150,18 @@ public Set<TimelineEntity> getEntities(TimelineReaderContext context,
}
/**
* Get single timeline entity. The meaning of each argument has been
* documented with {@link TimelineReader#getEntity}.
* Get single timeline entity by making a call to backend storage
* implementation. The meaning of each argument in detail has been
* documented with {@link TimelineReader#getEntity}. If cluster ID has not
* been supplied by the client, fills the cluster id from config before making
* a call to backend storage. After fetching entity from backend, fills the
* appropriate UID based on entity type.
*
* @param context Timeline context within the scope of which entity has to be
* fetched.
* @param dataToRetrieve Data to carry in the entity fetched.
* @return A <cite>TimelineEntity</cite> object if found, null otherwise.
* @throws IOException if any problem occurs while getting entity.
* @see TimelineReader#getEntity
*/
public TimelineEntity getEntity(TimelineReaderContext context,

View File

@ -49,7 +49,7 @@
import com.google.common.annotations.VisibleForTesting;
/** Main class for Timeline Reader */
/** Main class for Timeline Reader. */
@Private
@Unstable
public class TimelineReaderServer extends CompositeService {

View File

@ -73,24 +73,25 @@ public class TimelineReaderWebServices {
private static final String DATE_PATTERN = "yyyyMMdd";
@VisibleForTesting
static ThreadLocal<DateFormat> DATE_FORMAT = new ThreadLocal<DateFormat>() {
@Override
protected DateFormat initialValue() {
SimpleDateFormat format =
new SimpleDateFormat(DATE_PATTERN, Locale.ENGLISH);
format.setTimeZone(TimeZone.getTimeZone("GMT"));
format.setLenient(false);
return format;
}
};
static final ThreadLocal<DateFormat> DATE_FORMAT =
new ThreadLocal<DateFormat>() {
@Override
protected DateFormat initialValue() {
SimpleDateFormat format =
new SimpleDateFormat(DATE_PATTERN, Locale.ENGLISH);
format.setTimeZone(TimeZone.getTimeZone("GMT"));
format.setLenient(false);
return format;
}
};
private void init(HttpServletResponse response) {
response.setContentType(null);
}
private static class DateRange {
Long dateStart;
Long dateEnd;
private static final class DateRange {
private Long dateStart;
private Long dateEnd;
private DateRange(Long start, Long end) {
this.dateStart = start;
this.dateEnd = end;
@ -212,6 +213,7 @@ public TimelineAbout about(
* @param uId a delimited string containing clusterid, userid, flow name,
* flowrun id and app id which are extracted from UID and then used to
* query backend(Mandatory path param).
* @param entityType Type of entities(Mandatory path param).
* @param limit Number of entities to return(Optional query param).
* @param createdTimeStart If specified, matched entities should not be
* created before this timestamp(Optional query param).

View File

@ -44,15 +44,15 @@ private TimelineReaderWebServicesUtils() {
/**
* Parse the passed context information represented as strings and convert
* into a {@link TimelineReaderContext} object.
* @param clusterId
* @param userId
* @param flowName
* @param flowRunId
* @param appId
* @param entityType
* @param entityId
* @param clusterId Cluster Id.
* @param userId User Id.
* @param flowName Flow Name.
* @param flowRunId Run id for the flow.
* @param appId App Id.
* @param entityType Entity Type.
* @param entityId Entity Id.
* @return a {@link TimelineReaderContext} object.
* @throws Exception
* @throws Exception if any problem occurs during parsing.
*/
static TimelineReaderContext createTimelineReaderContext(String clusterId,
String userId, String flowName, String flowRunId, String appId,
@ -65,17 +65,17 @@ static TimelineReaderContext createTimelineReaderContext(String clusterId,
/**
* Parse the passed filters represented as strings and convert them into a
* {@link TimelineEntityFilters} object.
* @param limit
* @param createdTimeStart
* @param createdTimeEnd
* @param relatesTo
* @param isRelatedTo
* @param infofilters
* @param conffilters
* @param metricfilters
* @param eventfilters
* @param limit Limit to number of entities to return.
* @param createdTimeStart Created time start for the entities to return.
* @param createdTimeEnd Created time end for the entities to return.
* @param relatesTo Entities to return must match relatesTo.
* @param isRelatedTo Entities to return must match isRelatedTo.
* @param infofilters Entities to return must match these info filters.
* @param conffilters Entities to return must match these metric filters.
* @param metricfilters Entities to return must match these metric filters.
* @param eventfilters Entities to return must match these event filters.
* @return a {@link TimelineEntityFilters} object.
* @throws Exception
* @throws Exception if any problem occurs during parsing.
*/
static TimelineEntityFilters createTimelineEntityFilters(String limit,
String createdTimeStart, String createdTimeEnd, String relatesTo,
@ -94,11 +94,11 @@ static TimelineEntityFilters createTimelineEntityFilters(String limit,
/**
* Parse the passed fields represented as strings and convert them into a
* {@link TimelineDataToRetrieve} object.
* @param confs
* @param metrics
* @param fields
* @param confs confs to retrieve.
* @param metrics metrics to retrieve.
* @param fields fields to retrieve.
* @return a {@link TimelineDataToRetrieve} object.
* @throws Exception
* @throws Exception if any problem occurs during parsing.
*/
static TimelineDataToRetrieve createTimelineDataToRetrieve(String confs,
String metrics, String fields) throws Exception {
@ -192,7 +192,7 @@ static Map<String, Set<String>> parseKeyStrValuesStr(String str,
* should be represented as "key1:value1,key2:value2,key3:value3".
* @param str delimited string represented as key-value pairs.
* @param pairsDelim key-value pairs are delimited by this delimiter.
* @param keyValuesDelim key and value are delimited by this delimiter.
* @param keyValDelim key and value are delimited by this delimiter.
* @return a map of key-value pairs with both key and value being strings.
*/
static Map<String, String> parseKeyStrValueStr(String str,
@ -212,8 +212,8 @@ static Map<String, String> parseKeyStrValueStr(String str,
* should be represented as "key1:value1,key2:value2,key3:value3".
* @param str delimited string represented as key-value pairs.
* @param pairsDelim key-value pairs are delimited by this delimiter.
* @param keyValuesDelim key and value are delimited by this delimiter.
* @return a map of key-value pairs with key being a string and value amy
* @param keyValDelim key and value are delimited by this delimiter.
* @return a map of key-value pairs with key being a string and value, any
* object.
*/
static Map<String, Object> parseKeyStrValueObj(String str,

View File

@ -204,7 +204,7 @@ TimelineReaderContext decodeUID(String uId) throws Exception {
/**
* Split UID using {@link #UID_DELIMITER_CHAR} and {@link #UID_ESCAPE_CHAR}.
* @param uid
* @param uid UID to be splitted.
* @return a list of different parts of UID split across delimiter.
* @throws IllegalArgumentException if UID is not properly escaped.
*/
@ -229,17 +229,19 @@ private static String joinAndEscapeUIDParts(String[] parts) {
/**
* Encodes UID depending on UID implementation.
* @param context
*
* @param context Reader context.
* @return UID represented as a string.
*/
abstract String encodeUID(TimelineReaderContext context);
/**
* Decodes UID depending on UID implementation.
* @param uId
*
* @param uId UID to be decoded.
* @return a {@link TimelineReaderContext} object if UID passed can be
* decoded, null otherwise.
* @throws Exception
* @throws Exception if any problem occurs while decoding.
*/
abstract TimelineReaderContext decodeUID(String uId) throws Exception;
}

View File

@ -93,9 +93,11 @@ private static <T> Filter createHBaseColQualPrefixFilter(
* Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList}
* while converting different timeline filters(of type {@link TimelineFilter})
* into their equivalent HBase filters.
* @param colPrefix
* @param filterList
* @return a {@link FilterList} object
*
* @param <T> Describes the type of column prefix.
* @param colPrefix column prefix which will be used for conversion.
* @param filterList timeline filter list which has to be converted.
* @return A {@link FilterList} object.
*/
public static <T> FilterList createHBaseFilterList(ColumnPrefix<T> colPrefix,
TimelineFilterList filterList) {

View File

@ -15,6 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.timelineservice.reader contains classes
* which can be used across reader. This package contains classes which are
* not related to storage implementations.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.reader;

View File

@ -105,15 +105,16 @@ String getRootPath() {
/**
* Deserialize a POJO object from a JSON string.
* @param clazz
* class to be desirialized
*
* @param jsonString
* json string to deserialize
* @return TimelineEntity object
* @throws IOException
* @throws JsonMappingException
* @throws JsonGenerationException
* @param <T> Describes the type of class to be returned.
* @param clazz class to be deserialized.
* @param jsonString JSON string to deserialize.
* @return An object based on class type. Used typically for
* <cite>TimelineEntity</cite> object.
* @throws IOException if the underlying input source has problems during
* parsing.
* @throws JsonMappingException if parser has problems parsing content.
* @throws JsonGenerationException if there is a problem in JSON writing.
*/
public static <T> T getTimelineRecordFromJSON(
String jsonString, Class<T> clazz)
@ -128,33 +129,32 @@ private static void fillFields(TimelineEntity finalEntity,
}
for (Field field : fields) {
switch(field) {
case CONFIGS:
finalEntity.setConfigs(real.getConfigs());
break;
case METRICS:
finalEntity.setMetrics(real.getMetrics());
break;
case INFO:
finalEntity.setInfo(real.getInfo());
break;
case IS_RELATED_TO:
finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
break;
case RELATES_TO:
finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
break;
case EVENTS:
finalEntity.setEvents(real.getEvents());
break;
default:
continue;
case CONFIGS:
finalEntity.setConfigs(real.getConfigs());
break;
case METRICS:
finalEntity.setMetrics(real.getMetrics());
break;
case INFO:
finalEntity.setInfo(real.getInfo());
break;
case IS_RELATED_TO:
finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
break;
case RELATES_TO:
finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
break;
case EVENTS:
finalEntity.setEvents(real.getEvents());
break;
default:
continue;
}
}
}
private String getFlowRunPath(String userId, String clusterId, String flowName,
Long flowRunId, String appId)
throws IOException {
private String getFlowRunPath(String userId, String clusterId,
String flowName, Long flowRunId, String appId) throws IOException {
if (userId != null && flowName != null && flowRunId != null) {
return userId + "/" + flowName + "/" + flowRunId;
}
@ -272,11 +272,11 @@ private Set<TimelineEntity> getEntities(File dir, String entityType,
Map<Long, Set<TimelineEntity>> sortedEntities =
new TreeMap<>(
new Comparator<Long>() {
@Override
public int compare(Long l1, Long l2) {
return l2.compareTo(l1);
}
@Override
public int compare(Long l1, Long l2) {
return l2.compareTo(l1);
}
}
);
for (File entityFile : dir.listFiles()) {
if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {

View File

@ -76,9 +76,10 @@ public TimelineWriteResponse write(String clusterId, String userId,
return response;
}
private synchronized void write(String clusterId, String userId, String flowName,
String flowVersion, long flowRun, String appId, TimelineEntity entity,
TimelineWriteResponse response) throws IOException {
private synchronized void write(String clusterId, String userId,
String flowName, String flowVersion, long flowRun, String appId,
TimelineEntity entity, TimelineWriteResponse response)
throws IOException {
PrintWriter out = null;
try {
String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,

View File

@ -35,6 +35,9 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
/**
* HBase based implementation for {@link TimelineReader}.
*/
public class HBaseTimelineReaderImpl
extends AbstractService implements TimelineReader {

View File

@ -93,7 +93,7 @@ public HBaseTimelineWriterImpl(Configuration conf) throws IOException {
}
/**
* initializes the hbase connection to write to the entity table
* initializes the hbase connection to write to the entity table.
*/
@Override
protected void serviceInit(Configuration conf) throws Exception {
@ -104,7 +104,8 @@ protected void serviceInit(Configuration conf) throws Exception {
appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn);
flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
flowActivityTable = new FlowActivityTable().getTableMutator(hbaseConf, conn);
flowActivityTable =
new FlowActivityTable().getTableMutator(hbaseConf, conn);
}
/**
@ -289,7 +290,7 @@ private void storeRelations(byte[] rowKey, TimelineEntity te,
}
/**
* Stores the Relations from the {@linkplain TimelineEntity} object
* Stores the Relations from the {@linkplain TimelineEntity} object.
*/
private <T> void storeRelations(byte[] rowKey,
Map<String, Set<String>> connectedEntities,
@ -306,7 +307,7 @@ private <T> void storeRelations(byte[] rowKey,
}
/**
* Stores information from the {@linkplain TimelineEntity} object
* Stores information from the {@linkplain TimelineEntity} object.
*/
private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion,
boolean isApplication) throws IOException {
@ -341,7 +342,7 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion,
}
/**
* stores the config information from {@linkplain TimelineEntity}
* stores the config information from {@linkplain TimelineEntity}.
*/
private void storeConfig(byte[] rowKey, Map<String, String> config,
boolean isApplication) throws IOException {
@ -351,17 +352,17 @@ private void storeConfig(byte[] rowKey, Map<String, String> config,
for (Map.Entry<String, String> entry : config.entrySet()) {
if (isApplication) {
ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable,
entry.getKey(), null, entry.getValue());
entry.getKey(), null, entry.getValue());
} else {
EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(),
null, entry.getValue());
null, entry.getValue());
}
}
}
/**
* stores the {@linkplain TimelineMetric} information from the
* {@linkplain TimelineEvent} object
* {@linkplain TimelineEvent} object.
*/
private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
boolean isApplication) throws IOException {
@ -373,10 +374,10 @@ private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
Long timestamp = timeseriesEntry.getKey();
if (isApplication) {
ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable,
metricColumnQualifier, timestamp, timeseriesEntry.getValue());
metricColumnQualifier, timestamp, timeseriesEntry.getValue());
} else {
EntityColumnPrefix.METRIC.store(rowKey, entityTable,
metricColumnQualifier, timestamp, timeseriesEntry.getValue());
metricColumnQualifier, timestamp, timeseriesEntry.getValue());
}
}
}
@ -384,7 +385,7 @@ private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
}
/**
* Stores the events from the {@linkplain TimelineEvent} object
* Stores the events from the {@linkplain TimelineEvent} object.
*/
private void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
boolean isApplication) throws IOException {
@ -428,10 +429,10 @@ private void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
Bytes.toBytes(info.getKey()));
if (isApplication) {
ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
compoundColumnQualifierBytes, null, info.getValue());
compoundColumnQualifierBytes, null, info.getValue());
} else {
EntityColumnPrefix.EVENT.store(rowKey, entityTable,
compoundColumnQualifierBytes, null, info.getValue());
compoundColumnQualifierBytes, null, info.getValue());
}
} // for info: eventInfo
}
@ -459,7 +460,7 @@ public void flush() throws IOException {
/**
* close the hbase connections The close APIs perform flushing and release any
* resources held
* resources held.
*/
@Override
protected void serviceStop() throws Exception {

View File

@ -29,7 +29,7 @@
import java.io.IOException;
/**
* YARN timeline service v2 offline aggregation storage interface
* YARN timeline service v2 offline aggregation storage interface.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@ -46,8 +46,8 @@ public OfflineAggregationWriter(String name) {
/**
* Persist aggregated timeline entities to the offline store based on which
* track this entity is to be rolled up to. The tracks along which aggregations
* are to be done are given by {@link OfflineAggregationInfo}.
* track this entity is to be rolled up to. The tracks along which
* aggregations are to be done are given by {@link OfflineAggregationInfo}.
*
* @param context a {@link TimelineCollectorContext} object that describes the
* context information of the aggregated data. Depends on the
@ -58,9 +58,10 @@ public OfflineAggregationWriter(String name) {
* detail of the aggregation. Current supported option is
* {@link OfflineAggregationInfo#FLOW_AGGREGATION}.
* @return a {@link TimelineWriteResponse} object.
* @throws IOException
* @throws IOException if any problem occurs while writing aggregated
* entities.
*/
abstract TimelineWriteResponse writeAggregatedEntity(
TimelineCollectorContext context,
TimelineEntities entities, OfflineAggregationInfo info) throws IOException;
TimelineCollectorContext context, TimelineEntities entities,
OfflineAggregationInfo info) throws IOException;
}

View File

@ -102,18 +102,18 @@ public class PhoenixOfflineAggregationWriterImpl
private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
= "timeline_cf_placeholder";
/** Default Phoenix JDBC driver name */
/** Default Phoenix JDBC driver name. */
private static final String DRIVER_CLASS_NAME
= "org.apache.phoenix.jdbc.PhoenixDriver";
/** Default Phoenix timeline config column family */
/** Default Phoenix timeline config column family. */
private static final String METRIC_COLUMN_FAMILY = "m.";
/** Default Phoenix timeline info column family */
/** Default Phoenix timeline info column family. */
private static final String INFO_COLUMN_FAMILY = "i.";
/** Default separator for Phoenix storage */
/** Default separator for Phoenix storage. */
private static final String AGGREGATION_STORAGE_SEPARATOR = ";";
/** Connection string to the deployed Phoenix cluster */
/** Connection string to the deployed Phoenix cluster. */
private String connString = null;
private Properties connProperties = new Properties();
@ -162,7 +162,8 @@ public TimelineWriteResponse writeAggregatedEntity(
}
int idx = info.setStringsForPrimaryKey(ps, context, null, 1);
ps.setLong(idx++, entity.getCreatedTime());
ps.setString(idx++, StringUtils.join(formattedMetrics.keySet().toArray(),
ps.setString(idx++,
StringUtils.join(formattedMetrics.keySet().toArray(),
AGGREGATION_STORAGE_SEPARATOR));
ps.execute();
@ -185,7 +186,7 @@ public TimelineWriteResponse writeAggregatedEntity(
* Create Phoenix tables for offline aggregation storage if the tables do not
* exist.
*
* @throws IOException
* @throws IOException if any problem happens while creating Phoenix tables.
*/
public void createPhoenixTables() throws IOException {
// Create tables if necessary
@ -197,7 +198,8 @@ public void createPhoenixTables() throws IOException {
+ "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+ "flow_name VARCHAR NOT NULL, "
+ "created_time UNSIGNED_LONG, "
+ METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
+ METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER
+ " VARBINARY, "
+ "metric_names VARCHAR, info_keys VARCHAR "
+ "CONSTRAINT pk PRIMARY KEY("
+ "user, cluster, flow_name))";
@ -206,7 +208,8 @@ public void createPhoenixTables() throws IOException {
+ OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME
+ "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+ "created_time UNSIGNED_LONG, "
+ METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
+ METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER
+ " VARBINARY, "
+ "metric_names VARCHAR, info_keys VARCHAR "
+ "CONSTRAINT pk PRIMARY KEY(user, cluster))";
stmt.executeUpdate(sql);
@ -251,9 +254,9 @@ void dropTable(String tableName) throws Exception {
private static class DynamicColumns<K> {
static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
String columnFamilyPrefix;
String type;
Set<K> columns;
private String columnFamilyPrefix;
private String type;
private Set<K> columns;
public DynamicColumns(String columnFamilyPrefix, String type,
Set<K> keyValues) {

View File

@ -20,7 +20,7 @@
/**
* specifies the tracks along which an entity
* info is to be aggregated on
* info is to be aggregated on.
*
*/
public enum TimelineAggregationTrack {

View File

@ -97,7 +97,8 @@ public enum Field {
* <cite>FlowRunEntity</cite>.<br>
* For all other entity types, entity returned is of type
* <cite>TimelineEntity</cite>.
* @throws IOException
* @throws IOException if there is an exception encountered while fetching
* entity from backend storage.
*/
TimelineEntity getEntity(TimelineReaderContext context,
TimelineDataToRetrieve dataToRetrieve) throws IOException;
@ -169,7 +170,8 @@ TimelineEntity getEntity(TimelineReaderContext context,
* <cite>FlowRunEntity</cite>.<br>
* For all other entity types, entities returned are of type
* <cite>TimelineEntity</cite>.
* @throws IOException
* @throws IOException if there is an exception encountered while fetching
* entity from backend storage.
*/
Set<TimelineEntity> getEntities(
TimelineReaderContext context,

View File

@ -53,7 +53,9 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TimelineSchemaCreator {
public final class TimelineSchemaCreator {
private TimelineSchemaCreator() {
}
final static String NAME = TimelineSchemaCreator.class.getSimpleName();
private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class);

View File

@ -42,12 +42,13 @@ public interface TimelineWriter extends Service {
* @param userId context user ID
* @param flowName context flow name
* @param flowVersion context flow version
* @param flowRunId
* @param appId context app ID
* @param flowRunId run id for the flow.
* @param appId context app ID.
* @param data
* a {@link TimelineEntities} object.
* @return a {@link TimelineWriteResponse} object.
* @throws IOException
* @throws IOException if there is any exception encountered while storing
* or writing entities to the backend storage.
*/
TimelineWriteResponse write(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId,
@ -65,8 +66,11 @@ TimelineWriteResponse write(String clusterId, String userId,
* a {@link TimelineEntity} object
* a {@link TimelineAggregationTrack} enum
* value.
* @param track Specifies the track or dimension along which aggregation would
* occur. Includes USER, FLOW, QUEUE, etc.
* @return a {@link TimelineWriteResponse} object.
* @throws IOException
* @throws IOException if there is any exception encountered while aggregating
* entities to the backend storage.
*/
TimelineWriteResponse aggregate(TimelineEntity data,
TimelineAggregationTrack track) throws IOException;
@ -76,7 +80,8 @@ TimelineWriteResponse aggregate(TimelineEntity data,
* written to the storage when the method returns. This may be a potentially
* time-consuming operation, and should be used judiciously.
*
* @throws IOException
* @throws IOException if there is any exception encountered while flushing
* entities to the backend storage.
*/
void flush() throws IOException;
}

View File

@ -34,7 +34,7 @@
public enum ApplicationColumn implements Column<ApplicationTable> {
/**
* App id
* App id.
*/
ID(ApplicationColumnFamily.INFO, "id"),
@ -84,7 +84,7 @@ public Object readResult(Result result) throws IOException {
/**
* Retrieve an {@link ApplicationColumn} given a name, or null if there is no
* match. The following holds true: {@code columnFor(x) == columnFor(y)} if
* and only if {@code x.equals(y)} or {@code (x == y == null)}
* and only if {@code x.equals(y)} or {@code (x == y == null)}.
*
* @param columnQualifier Name of the column to retrieve
* @return the corresponding {@link ApplicationColumn} or null

View File

@ -54,7 +54,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
INFO(ApplicationColumnFamily.INFO, "i"),
/**
* Lifecycle events for an application
* Lifecycle events for an application.
*/
EVENT(ApplicationColumnFamily.INFO, "e"),
@ -214,7 +214,7 @@ public Map<String, Object> readResults(Result result) throws IOException {
* is to facilitate returning byte arrays of values that were not
* Strings. If they can be treated as Strings, you should use
* {@link #readResults(Result)} instead.
* @throws IOException
* @throws IOException if any problem occurs while reading results.
*/
public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
throws IOException {
@ -276,8 +276,8 @@ public static final ApplicationColumnPrefix columnFor(
for (ApplicationColumnPrefix acp : ApplicationColumnPrefix.values()) {
// Find a match based column family and on name.
if (acp.columnFamily.equals(columnFamily)
&& (((columnPrefix == null) && (acp.getColumnPrefix() == null)) || (acp
.getColumnPrefix().equals(columnPrefix)))) {
&& (((columnPrefix == null) && (acp.getColumnPrefix() == null)) ||
(acp.getColumnPrefix().equals(columnPrefix)))) {
return acp;
}
}

View File

@ -62,11 +62,11 @@ public String getAppId() {
/**
* Constructs a row key prefix for the application table as follows:
* {@code clusterId!userName!flowName!}
* {@code clusterId!userName!flowName!}.
*
* @param clusterId
* @param userId
* @param flowName
* @param clusterId Cluster Id.
* @param userId User Id.
* @param flowName Flow Name.
* @return byte array with the row key prefix
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
@ -78,12 +78,12 @@ public static byte[] getRowKeyPrefix(String clusterId, String userId,
/**
* Constructs a row key prefix for the application table as follows:
* {@code clusterId!userName!flowName!flowRunId!}
* {@code clusterId!userName!flowName!flowRunId!}.
*
* @param clusterId
* @param userId
* @param flowName
* @param flowRunId
* @param clusterId Cluster Id.
* @param userId User Id.
* @param flowName Flow Name.
* @param flowRunId Run Id for the flow.
* @return byte array with the row key prefix
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
@ -96,13 +96,13 @@ public static byte[] getRowKeyPrefix(String clusterId, String userId,
/**
* Constructs a row key for the application table as follows:
* {@code clusterId!userName!flowName!flowRunId!AppId}
* {@code clusterId!userName!flowName!flowRunId!AppId}.
*
* @param clusterId
* @param userId
* @param flowName
* @param flowRunId
* @param appId
* @param clusterId Cluster Id.
* @param userId User Id.
* @param flowName Flow Name.
* @param flowRunId Run Id for the flow.
* @param appId App Id.
* @return byte array with the row key
*/
public static byte[] getRowKey(String clusterId, String userId,
@ -119,6 +119,9 @@ public static byte[] getRowKey(String clusterId, String userId,
/**
* Given the raw row key as bytes, returns the row key as an object.
*
* @param rowKey Byte representation of row key.
* @return An <cite>ApplicationRowKey</cite> object.
*/
public static ApplicationRowKey parseRowKey(byte[] rowKey) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);

View File

@ -68,28 +68,28 @@
* </pre>
*/
public class ApplicationTable extends BaseTable<ApplicationTable> {
/** application prefix */
/** application prefix. */
private static final String PREFIX =
YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".application";
/** config param name that specifies the application table name */
/** config param name that specifies the application table name. */
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
/**
* config param name that specifies the TTL for metrics column family in
* application table
* application table.
*/
private static final String METRICS_TTL_CONF_NAME = PREFIX
+ ".table.metrics.ttl";
/** default value for application table name */
/** default value for application table name. */
private static final String DEFAULT_TABLE_NAME =
"timelineservice.application";
/** default TTL is 30 days for metrics timeseries */
/** default TTL is 30 days for metrics timeseries. */
private static final int DEFAULT_METRICS_TTL = 2592000;
/** default max number of versions */
/** default max number of versions. */
private static final int DEFAULT_METRICS_MAX_VERSIONS = 1000;
private static final Log LOG = LogFactory.getLog(ApplicationTable.class);
@ -139,8 +139,8 @@ public void createTable(Admin admin, Configuration hbaseConf)
metricsCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
DEFAULT_METRICS_TTL));
applicationTableDescp
.setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
applicationTableDescp.setRegionSplitPolicyClassName(
"org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
applicationTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
admin.createTable(applicationTableDescp,

View File

@ -16,6 +16,10 @@
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.timelineservice.storage.application
* contains classes related to implementation for application table.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.storage.application;

View File

@ -35,17 +35,17 @@
public enum AppToFlowColumn implements Column<AppToFlowTable> {
/**
* The flow ID
* The flow ID.
*/
FLOW_ID(AppToFlowColumnFamily.MAPPING, "flow_id"),
/**
* The flow run ID
* The flow run ID.
*/
FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"),
/**
* The user
* The user.
*/
USER_ID(AppToFlowColumnFamily.MAPPING, "user_id");

View File

@ -26,7 +26,7 @@
*/
public enum AppToFlowColumnFamily implements ColumnFamily<AppToFlowTable> {
/**
* Mapping column family houses known columns such as flowName and flowRunId
* Mapping column family houses known columns such as flowName and flowRunId.
*/
MAPPING("m");

View File

@ -43,10 +43,10 @@ public String getAppId() {
/**
* Constructs a row key prefix for the app_flow table as follows:
* {@code clusterId!AppId}
* {@code clusterId!AppId}.
*
* @param clusterId
* @param appId
* @param clusterId Cluster Id.
* @param appId Application Id.
* @return byte array with the row key
*/
public static byte[] getRowKey(String clusterId, String appId) {
@ -57,6 +57,9 @@ public static byte[] getRowKey(String clusterId, String appId) {
/**
* Given the raw row key as bytes, returns the row key as an object.
*
* @param rowKey a rowkey represented as a byte array.
* @return an <cite>AppToFlowRowKey</cite> object.
*/
public static AppToFlowRowKey parseRowKey(byte[] rowKey) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);

View File

@ -58,14 +58,14 @@
* </pre>
*/
public class AppToFlowTable extends BaseTable<AppToFlowTable> {
/** app_flow prefix */
/** app_flow prefix. */
private static final String PREFIX =
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "app-flow";
/** config param name that specifies the app_flow table name */
/** config param name that specifies the app_flow table name. */
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
/** default value for app_flow table name */
/** default value for app_flow table name. */
private static final String DEFAULT_TABLE_NAME = "timelineservice.app_flow";
private static final Log LOG = LogFactory.getLog(AppToFlowTable.class);

View File

@ -15,6 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow
* contains classes related to implementation for app to flow table.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;

View File

@ -39,7 +39,7 @@
public abstract class BaseTable<T> {
/**
* Name of config variable that is used to point to this table
* Name of config variable that is used to point to this table.
*/
private final String tableNameConfName;
@ -52,6 +52,8 @@ public abstract class BaseTable<T> {
/**
* @param tableNameConfName name of config variable that is used to point to
* this table.
* @param defaultTableName Default table name if table from config is not
* found.
*/
protected BaseTable(String tableNameConfName, String defaultTableName) {
this.tableNameConfName = tableNameConfName;
@ -61,10 +63,11 @@ protected BaseTable(String tableNameConfName, String defaultTableName) {
/**
* Used to create a type-safe mutator for this table.
*
* @param hbaseConf used to read table name
* @param hbaseConf used to read table name.
* @param conn used to create a table from.
* @return a type safe {@link BufferedMutator} for the entity table.
* @throws IOException
* @throws IOException if any exception occurs while creating mutator for the
* table.
*/
public TypedBufferedMutator<T> getTableMutator(Configuration hbaseConf,
Connection conn) throws IOException {
@ -88,7 +91,7 @@ public TypedBufferedMutator<T> getTableMutator(Configuration hbaseConf,
* @param conn used to create table from
* @param scan that specifies what you want to read from this table.
* @return scanner for the table.
* @throws IOException
* @throws IOException if any exception occurs while getting the scanner.
*/
public ResultScanner getResultScanner(Configuration hbaseConf,
Connection conn, Scan scan) throws IOException {
@ -102,7 +105,7 @@ public ResultScanner getResultScanner(Configuration hbaseConf,
* @param conn used to create table from
* @param get that specifies what single row you want to get from this table
* @return result of get operation
* @throws IOException
* @throws IOException if any exception occurs while getting the result.
*/
public Result getResult(Configuration hbaseConf, Connection conn, Get get)
throws IOException {
@ -113,7 +116,8 @@ public Result getResult(Configuration hbaseConf, Connection conn, Get get)
/**
* Get the table name for this table.
*
* @param hbaseConf
* @param hbaseConf HBase configuration from which table name will be fetched.
* @return A {@link TableName} object.
*/
public TableName getTableName(Configuration hbaseConf) {
TableName table =
@ -126,8 +130,9 @@ public TableName getTableName(Configuration hbaseConf) {
* Used to create the table in HBase. Should be called only once (per HBase
* instance).
*
* @param admin
* @param hbaseConf
* @param admin Used for doing HBase table operations.
* @param hbaseConf Hbase configuration.
* @throws IOException if any exception occurs while creating the table.
*/
public abstract void createTable(Admin admin, Configuration hbaseConf)
throws IOException;

View File

@ -26,7 +26,7 @@
import org.apache.hadoop.hbase.client.Mutation;
/**
* To be used to wrap an actual {@link BufferedMutator} in a type safe manner
* To be used to wrap an actual {@link BufferedMutator} in a type safe manner.
*
* @param <T> The class referring to the table to be written to.
*/

View File

@ -19,7 +19,6 @@
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
@ -39,25 +38,26 @@ public interface Column<T> {
* column.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
* @param attributes Map of attributes for this mutation. used in the coprocessor
* to set/read the cell tags. Can be null.
* @param attributes Map of attributes for this mutation. used in the
* coprocessor to set/read the cell tags. Can be null.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
* @throws IOException if there is any exception encountered during store.
*/
public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException;
/**
* Get the latest version of this specified column. Note: this call clones the
* value content of the hosting {@link Cell}.
* value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}.
*
* @param result Cannot be null
* @return result object (can be cast to whatever object was written to), or
* null when result doesn't contain this column.
* @throws IOException
* @throws IOException if there is any exception encountered while reading
* result.
*/
public Object readResult(Result result) throws IOException;
Object readResult(Result result) throws IOException;
}

View File

@ -29,6 +29,6 @@ public interface ColumnFamily<T> {
*
* @return a clone of the byte representation of the column family.
*/
public byte[] getBytes();
byte[] getBytes();
}

View File

@ -26,7 +26,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
@ -82,7 +81,9 @@ public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter) {
* @param inputValue
* the value to write to the rowKey and column qualifier. Nothing
* gets written when null.
* @throws IOException
* @param attributes Attributes to be set for HBase Put.
* @throws IOException if any problem occurs during store operation(sending
* mutation to table).
*/
public void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
byte[] columnQualifier, Long timestamp, Object inputValue,
@ -140,13 +141,13 @@ public ColumnFamily<T> getColumnFamily() {
/**
* Get the latest version of this specified column. Note: this call clones the
* value content of the hosting {@link Cell}.
* value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}.
*
* @param result from which to read the value. Cannot be null
* @param columnQualifierBytes referring to the column to be read.
* @return latest version of the specified column of whichever object was
* written.
* @throws IOException
* @throws IOException if any problem occurs while reading result.
*/
public Object readResult(Result result, byte[] columnQualifierBytes)
throws IOException {
@ -167,9 +168,9 @@ public Object readResult(Result result, byte[] columnQualifierBytes)
* columns are returned.
* @param <V> the type of the values. The values will be cast into that type.
* @return the cell values at each respective time in for form
* {idA={timestamp1->value1}, idA={timestamp2->value2},
* idB={timestamp3->value3}, idC={timestamp1->value4}}
* @throws IOException
* {@literal {idA={timestamp1->value1}, idA={timestamp2->value2},
* idB={timestamp3->value3}, idC={timestamp1->value4}}}
* @throws IOException if any problem occurs while reading results.
*/
@SuppressWarnings("unchecked")
public <V> NavigableMap<String, NavigableMap<Long, V>>
@ -180,8 +181,9 @@ public Object readResult(Result result, byte[] columnQualifierBytes)
new TreeMap<String, NavigableMap<Long, V>>();
if (result != null) {
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> resultMap =
result.getMap();
NavigableMap<
byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> resultMap =
result.getMap();
NavigableMap<byte[], NavigableMap<Long, byte[]>> columnCellMap =
resultMap.get(columnFamilyBytes);
@ -240,7 +242,7 @@ public Object readResult(Result result, byte[] columnQualifierBytes)
* back and forth from Strings, you should use
* {@link #readResultsHavingCompoundColumnQualifiers(Result, byte[])}
* instead.
* @throws IOException
* @throws IOException if any problem occurs while reading results.
*/
public Map<String, Object> readResults(Result result,
byte[] columnPrefixBytes) throws IOException {
@ -294,7 +296,7 @@ public Map<String, Object> readResults(Result result,
* non-null column prefix bytes, the column qualifier is returned as
* a list of parts, each part a byte[]. This is to facilitate
* returning byte arrays of values that were not Strings.
* @throws IOException
* @throws IOException if any problem occurs while reading results.
*/
public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result,
byte[] columnPrefixBytes) throws IOException {

View File

@ -21,7 +21,6 @@
import java.util.Map;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
@ -48,7 +47,8 @@ public interface ColumnPrefix<T> {
* coprocessor to set/read the cell tags.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
* @throws IOException if there is any exception encountered while doing
* store operation(sending mutation to the table).
*/
void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
byte[] qualifier, Long timestamp, Object inputValue,
@ -69,7 +69,8 @@ void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
* coprocessor to set/read the cell tags.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
* @throws IOException if there is any exception encountered while doing
* store operation(sending mutation to the table).
*/
void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
String qualifier, Long timestamp, Object inputValue,
@ -77,14 +78,15 @@ void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
/**
* Get the latest version of this specified column. Note: this call clones the
* value content of the hosting {@link Cell}.
* value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}.
*
* @param result Cannot be null
* @param qualifier column qualifier. Nothing gets read when null.
* @return result object (can be cast to whatever object was written to) or
* null when specified column qualifier for this prefix doesn't exist
* in the result.
* @throws IOException
* @throws IOException if there is any exception encountered while reading
* result.
*/
Object readResult(Result result, String qualifier) throws IOException;
@ -92,7 +94,8 @@ void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
* @param result from which to read columns
* @return the latest values of columns in the column family with this prefix
* (or all of them if the prefix value is null).
* @throws IOException
* @throws IOException if there is any exception encountered while reading
* results.
*/
Map<String, Object> readResults(Result result) throws IOException;
@ -100,9 +103,10 @@ void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
* @param result from which to reads data with timestamps
* @param <V> the type of the values. The values will be cast into that type.
* @return the cell values at each respective time in for form
* {idA={timestamp1->value1}, idA={timestamp2->value2},
* idB={timestamp3->value3}, idC={timestamp1->value4}}
* @throws IOException
* {@literal {idA={timestamp1->value1}, idA={timestamp2->value2},
* idB={timestamp3->value3}, idC={timestamp1->value4}}}
* @throws IOException if there is any exception encountered while reading
* result.
*/
<V> NavigableMap<String, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result) throws IOException;

View File

@ -55,8 +55,9 @@ public Object decodeValue(byte[] bytes) throws IOException {
/**
* Compares two numbers as longs. If either number is null, it will be taken
* as 0.
* @param num1
* @param num2
*
* @param num1 the first {@code Long} to compare.
* @param num2 the second {@code Long} to compare.
* @return -1 if num1 is less than num2, 0 if num1 is equal to num2 and 1 if
* num1 is greater than num2.
*/

View File

@ -29,9 +29,10 @@ public interface NumericValueConverter extends ValueConverter,
/**
* Adds two or more numbers. If either of the numbers are null, it is taken as
* 0.
* @param num1
* @param num2
* @param numbers
*
* @param num1 the first number to add.
* @param num2 the second number to add.
* @param numbers Rest of the numbers to be added.
* @return result after adding up the numbers.
*/
Number add(Number num1, Number num2, Number...numbers);

View File

@ -33,21 +33,24 @@
*/
public final class OfflineAggregationInfo {
/**
* Default flow level aggregation table name
* Default flow level aggregation table name.
*/
@VisibleForTesting
public static final String FLOW_AGGREGATION_TABLE_NAME
= "yarn_timeline_flow_aggregation";
/**
* Default user level aggregation table name
* Default user level aggregation table name.
*/
public static final String USER_AGGREGATION_TABLE_NAME
= "yarn_timeline_user_aggregation";
// These lists are not taking effects in table creations.
private static final String[] FLOW_AGGREGATION_PK_LIST =
{ "user", "cluster", "flow_name" };
private static final String[] USER_AGGREGATION_PK_LIST = { "user", "cluster"};
private static final String[] FLOW_AGGREGATION_PK_LIST = {
"user", "cluster", "flow_name"
};
private static final String[] USER_AGGREGATION_PK_LIST = {
"user", "cluster"
};
private final String tableName;
private final String[] primaryKeyList;
@ -81,30 +84,32 @@ public int setStringsForPrimaryKey(PreparedStatement ps,
public static final OfflineAggregationInfo FLOW_AGGREGATION =
new OfflineAggregationInfo(FLOW_AGGREGATION_TABLE_NAME,
FLOW_AGGREGATION_PK_LIST, new PrimaryKeyStringSetter() {
@Override
public int setValues(PreparedStatement ps,
TimelineCollectorContext context, String[] extraInfo, int startPos)
throws SQLException {
int idx = startPos;
ps.setString(idx++, context.getUserId());
ps.setString(idx++, context.getClusterId());
ps.setString(idx++, context.getFlowName());
return idx;
}
});
FLOW_AGGREGATION_PK_LIST,
new PrimaryKeyStringSetter() {
@Override
public int setValues(PreparedStatement ps,
TimelineCollectorContext context, String[] extraInfo,
int startPos) throws SQLException {
int idx = startPos;
ps.setString(idx++, context.getUserId());
ps.setString(idx++, context.getClusterId());
ps.setString(idx++, context.getFlowName());
return idx;
}
});
public static final OfflineAggregationInfo USER_AGGREGATION =
new OfflineAggregationInfo(USER_AGGREGATION_TABLE_NAME,
USER_AGGREGATION_PK_LIST, new PrimaryKeyStringSetter() {
@Override
public int setValues(PreparedStatement ps,
TimelineCollectorContext context, String[] extraInfo, int startPos)
throws SQLException {
int idx = startPos;
ps.setString(idx++, context.getUserId());
ps.setString(idx++, context.getClusterId());
return idx;
}
});
USER_AGGREGATION_PK_LIST,
new PrimaryKeyStringSetter() {
@Override
public int setValues(PreparedStatement ps,
TimelineCollectorContext context, String[] extraInfo,
int startPos) throws SQLException {
int idx = startPos;
ps.setString(idx++, context.getUserId());
ps.setString(idx++, context.getClusterId());
return idx;
}
});
}

View File

@ -20,6 +20,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Encapsulates a range with start and end indices.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class Range {

View File

@ -30,7 +30,7 @@
public enum Separator {
/**
* separator in key or column qualifier fields
* separator in key or column qualifier fields.
*/
QUALIFIERS("!", "%0$"),
@ -53,7 +53,7 @@ public enum Separator {
private final String value;
/**
* The URLEncoded version of this separator
* The URLEncoded version of this separator.
*/
private final String encodedValue;
@ -63,7 +63,7 @@ public enum Separator {
private final byte[] bytes;
/**
* The value quoted so that it can be used as a safe regex
* The value quoted so that it can be used as a safe regex.
*/
private final String quotedValue;
@ -99,7 +99,7 @@ public String getValue() {
/**
* Used to make token safe to be used with this separator without collisions.
*
* @param token
* @param token Token to be encoded.
* @return the token with any occurrences of this separator URLEncoded.
*/
public String encode(String token) {
@ -111,7 +111,9 @@ public String encode(String token) {
}
/**
* @param token
* Decode the token encoded using {@link #encode}.
*
* @param token Token to be decoded.
* @return the token with any occurrences of the encoded separator replaced by
* the separator itself.
*/
@ -193,7 +195,7 @@ public static String decode(String token, Separator... separators) {
* Returns a single byte array containing all of the individual arrays
* components separated by this separator.
*
* @param components
* @param components Byte array components to be joined together.
* @return byte array after joining the components
*/
public byte[] join(byte[]... components) {
@ -287,8 +289,8 @@ public String joinEncoded(Iterable<?> items) {
public Collection<String> splitEncoded(String compoundValue) {
List<String> result = new ArrayList<String>();
if (compoundValue != null) {
for (String value : compoundValue.split(quotedValue)) {
result.add(decode(value));
for (String val : compoundValue.split(quotedValue)) {
result.add(decode(val));
}
}
return result;
@ -298,6 +300,7 @@ public Collection<String> splitEncoded(String compoundValue) {
* Splits the source array into multiple array segments using this separator,
* up to a maximum of count items. This will naturally produce copied byte
* arrays for each of the split segments.
*
* @param source to be split
* @param limit on how many segments are supposed to be returned. A
* non-positive value indicates no limit on number of segments.
@ -311,6 +314,7 @@ public byte[][] split(byte[] source, int limit) {
* Splits the source array into multiple array segments using this separator,
* as many times as splits are found. This will naturally produce copied byte
* arrays for each of the split segments.
*
* @param source to be split
* @return source split by this separator.
*/

View File

@ -24,11 +24,13 @@
/**
* contains the constants used in the context of schema accesses for
* {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
* information
* information.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TimelineHBaseSchemaConstants {
public final class TimelineHBaseSchemaConstants {
private TimelineHBaseSchemaConstants() {
}
/**
* Used to create a pre-split for tables starting with a username in the
@ -36,27 +38,28 @@ public class TimelineHBaseSchemaConstants {
* separators) so that different installations can presplit based on their own
* commonly occurring names.
*/
private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"),
Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"),
Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"),
Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"),
Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"),
Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"),
Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"),
Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"),
Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"),
Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"),
Bytes.toBytes("z") };
private final static byte[][] USERNAME_SPLITS = {
Bytes.toBytes("a"), Bytes.toBytes("ad"), Bytes.toBytes("an"),
Bytes.toBytes("b"), Bytes.toBytes("ca"), Bytes.toBytes("cl"),
Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"),
Bytes.toBytes("j"), Bytes.toBytes("k"), Bytes.toBytes("l"),
Bytes.toBytes("m"), Bytes.toBytes("n"), Bytes.toBytes("o"),
Bytes.toBytes("q"), Bytes.toBytes("r"), Bytes.toBytes("s"),
Bytes.toBytes("se"), Bytes.toBytes("t"), Bytes.toBytes("u"),
Bytes.toBytes("v"), Bytes.toBytes("w"), Bytes.toBytes("x"),
Bytes.toBytes("y"), Bytes.toBytes("z")
};
/**
* The length at which keys auto-split
* The length at which keys auto-split.
*/
public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
/**
* @return splits for splits where a user is a prefix.
*/
public final static byte[][] getUsernameSplits() {
public static byte[][] getUsernameSplits() {
byte[][] kloon = USERNAME_SPLITS.clone();
// Deep copy.
for (int row = 0; row < USERNAME_SPLITS.length; row++) {

View File

@ -45,15 +45,17 @@
*/
@Public
@Unstable
public class TimelineStorageUtils {
public final class TimelineStorageUtils {
private TimelineStorageUtils() {
}
/** empty bytes */
/** empty bytes. */
public static final byte[] EMPTY_BYTES = new byte[0];
/** indicator for no limits for splitting */
/** indicator for no limits for splitting. */
public static final int NO_LIMIT_SPLIT = -1;
/** milliseconds in one day */
/** milliseconds in one day. */
public static final long MILLIS_ONE_DAY = 86400000L;
/**
@ -62,9 +64,9 @@ public class TimelineStorageUtils {
* copied byte arrays for each of the split segments. To identify the split
* ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
*
* @param source
* @param separator
* @return byte[] array after splitting the source
* @param source Source array.
* @param separator Separator represented as a byte array.
* @return byte[][] after splitting the source
*/
public static byte[][] split(byte[] source, byte[] separator) {
return split(source, separator, NO_LIMIT_SPLIT);
@ -76,10 +78,10 @@ public static byte[][] split(byte[] source, byte[] separator) {
* copied byte arrays for each of the split segments. To identify the split
* ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
*
* @param source
* @param separator
* @param source Source array.
* @param separator Separator represented as a byte array.
* @param limit a non-positive value indicates no limit on number of segments.
* @return byte[][] after splitting the input source
* @return byte[][] after splitting the input source.
*/
public static byte[][] split(byte[] source, byte[] separator, int limit) {
List<Range> segments = splitRanges(source, separator, limit);
@ -100,6 +102,10 @@ public static byte[][] split(byte[] source, byte[] separator, int limit) {
* Returns a list of ranges identifying [start, end) -- closed, open --
* positions within the source byte array that would be split using the
* separator byte array.
*
* @param source Source array.
* @param separator Separator represented as a byte array.
* @return a list of ranges.
*/
public static List<Range> splitRanges(byte[] source, byte[] separator) {
return splitRanges(source, separator, NO_LIMIT_SPLIT);
@ -113,6 +119,7 @@ public static List<Range> splitRanges(byte[] source, byte[] separator) {
* @param source the source data
* @param separator the separator pattern to look for
* @param limit the maximum number of splits to identify in the source
* @return a list of ranges.
*/
public static List<Range> splitRanges(byte[] source, byte[] separator,
int limit) {
@ -132,7 +139,7 @@ public static List<Range> splitRanges(byte[] source, byte[] separator,
// everything else goes in one final segment
break;
}
segments.add(new Range(start, i));
segments.add(new Range(start, i));
start = i + separator.length;
// i will be incremented again in outer for loop
i += separator.length - 1;
@ -219,9 +226,9 @@ public static String decodeAppId(byte[] appIdBytes) {
/**
* returns the timestamp of that day's start (which is midnight 00:00:00 AM)
* for a given input timestamp
* for a given input timestamp.
*
* @param ts
* @param ts Timestamp.
* @return timestamp of that day's beginning (midnight)
*/
public static long getTopOfTheDayTimestamp(long ts) {
@ -233,9 +240,9 @@ public static long getTopOfTheDayTimestamp(long ts) {
* Combines the input array of attributes and the input aggregation operation
* into a new array of attributes.
*
* @param attributes
* @param aggOp
* @return array of combined attributes
* @param attributes Attributes to be combined.
* @param aggOp Aggregation operation.
* @return array of combined attributes.
*/
public static Attribute[] combineAttributes(Attribute[] attributes,
AggregationOperation aggOp) {
@ -257,8 +264,8 @@ public static Attribute[] combineAttributes(Attribute[] attributes,
* Returns a number for the new array size. The new array is the combination
* of input array of attributes and the input aggregation operation.
*
* @param attributes
* @param aggOp
* @param attributes Attributes.
* @param aggOp Aggregation operation.
* @return the size for the new array
*/
private static int getNewLengthCombinedAttributes(Attribute[] attributes,
@ -283,16 +290,17 @@ private static int getAttributesLength(Attribute[] attributes) {
}
/**
* checks if an application has finished
* checks if an application has finished.
*
* @param te
* @param te TimlineEntity object.
* @return true if application has finished else false
*/
public static boolean isApplicationFinished(TimelineEntity te) {
SortedSet<TimelineEvent> allEvents = te.getEvents();
if ((allEvents != null) && (allEvents.size() > 0)) {
TimelineEvent event = allEvents.last();
if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
if (event.getId().equals(
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
return true;
}
}
@ -300,26 +308,27 @@ public static boolean isApplicationFinished(TimelineEntity te) {
}
/**
* get the time at which an app finished
* get the time at which an app finished.
*
* @param te
* @param te TimelineEntity object.
* @return true if application has finished else false
*/
public static long getApplicationFinishedTime(TimelineEntity te) {
SortedSet<TimelineEvent> allEvents = te.getEvents();
if ((allEvents != null) && (allEvents.size() > 0)) {
TimelineEvent event = allEvents.last();
if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
if (event.getId().equals(
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
return event.getTimestamp();
}
}
return 0l;
return 0L;
}
/**
* Checks if the input TimelineEntity object is an ApplicationEntity.
*
* @param te
* @param te TimelineEntity object.
* @return true if input is an ApplicationEntity, false otherwise
*/
public static boolean isApplicationEntity(TimelineEntity te) {
@ -329,7 +338,7 @@ public static boolean isApplicationEntity(TimelineEntity te) {
/**
* Checks for the APPLICATION_CREATED event.
*
* @param te
* @param te TimelineEntity object.
* @return true is application event exists, false otherwise
*/
public static boolean isApplicationCreated(TimelineEntity te) {
@ -346,9 +355,9 @@ public static boolean isApplicationCreated(TimelineEntity te) {
/**
* Returns the first seen aggregation operation as seen in the list of input
* tags or null otherwise
* tags or null otherwise.
*
* @param tags
* @param tags list of HBase tags.
* @return AggregationOperation
*/
public static AggregationOperation getAggregationOperationFromTagsList(
@ -366,8 +375,8 @@ public static AggregationOperation getAggregationOperationFromTagsList(
/**
* Creates a {@link Tag} from the input attribute.
*
* @param attribute
* @return Tag
* @param attribute Attribute from which tag has to be fetched.
* @return a HBase Tag.
*/
public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
// attribute could be either an Aggregation Operation or
@ -380,8 +389,9 @@ public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
return t;
}
AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension
.getAggregationCompactionDimension(attribute.getKey());
AggregationCompactionDimension aggCompactDim =
AggregationCompactionDimension.getAggregationCompactionDimension(
attribute.getKey());
if (aggCompactDim != null) {
Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
return t;
@ -475,7 +485,8 @@ public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
/**
* Checks if passed object is of integral type(Short/Integer/Long).
* @param obj
*
* @param obj Object to be checked.
* @return true if object passed is of type Short or Integer or Long, false
* otherwise.
*/

View File

@ -40,6 +40,8 @@ public class TimestampGenerator {
/**
* Returns the current wall clock time in milliseconds, multiplied by the
* required precision.
*
* @return current timestamp.
*/
public long currentTime() {
// We want to align cell timestamps with current time.
@ -58,6 +60,8 @@ public long currentTime() {
* sustained rate of more than 1M hbase writes per second AND if region fails
* over within that time range of timestamps being generated then there may be
* collisions writing to a cell version of the same column.
*
* @return unique timestamp.
*/
public long getUniqueTimestamp() {
long lastTs;
@ -78,8 +82,8 @@ public long getUniqueTimestamp() {
* column at the same time, then say appId of 1001 will overlap with appId of
* 001 and there may be collisions for that flow run's specific column.
*
* @param incomingTS
* @param appId
* @param incomingTS Timestamp to be converted.
* @param appId Application Id.
* @return a timestamp multiplied with TS_MULTIPLIER and last few digits of
* application id
*/
@ -101,9 +105,9 @@ private static long getAppIdSuffix(String appIdStr) {
/**
* truncates the last few digits of the timestamp which were supplemented by
* the TimestampGenerator#getSupplementedTimestamp function
* the TimestampGenerator#getSupplementedTimestamp function.
*
* @param incomingTS
* @param incomingTS Timestamp to be truncated.
* @return a truncated timestamp value
*/
public static long getTruncatedTimestamp(long incomingTS) {

View File

@ -28,18 +28,20 @@ public interface ValueConverter {
/**
* Encode an object as a byte array depending on the converter implementation.
* @param value
*
* @param value Value to be encoded.
* @return a byte array
* @throws IOException
* @throws IOException if any problem is encountered while encoding.
*/
byte[] encodeValue(Object value) throws IOException;
/**
* Decode a byte array and convert it into an object depending on the
* converter implementation.
* @param bytes
*
* @param bytes Byte array to be decoded.
* @return an object
* @throws IOException
* @throws IOException if any problem is encountered while decoding.
*/
Object decodeValue(byte[] bytes) throws IOException;
}

View File

@ -39,7 +39,7 @@ public enum EntityColumn implements Column<EntityTable> {
ID(EntityColumnFamily.INFO, "id"),
/**
* The type of entity
* The type of entity.
*/
TYPE(EntityColumnFamily.INFO, "type"),

View File

@ -54,7 +54,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
INFO(EntityColumnFamily.INFO, "i"),
/**
* Lifecycle events for an entity
* Lifecycle events for an entity.
*/
EVENT(EntityColumnFamily.INFO, "e"),
@ -215,7 +215,8 @@ public Map<String, Object> readResults(Result result) throws IOException {
* is to facilitate returning byte arrays of values that were not
* Strings. If they can be treated as Strings, you should use
* {@link #readResults(Result)} instead.
* @throws IOException
* @throws IOException if there is any exception encountered while reading
* result.
*/
public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
throws IOException {
@ -277,8 +278,8 @@ public static final EntityColumnPrefix columnFor(
for (EntityColumnPrefix ecp : EntityColumnPrefix.values()) {
// Find a match based column family and on name.
if (ecp.columnFamily.equals(columnFamily)
&& (((columnPrefix == null) && (ecp.getColumnPrefix() == null)) || (ecp
.getColumnPrefix().equals(columnPrefix)))) {
&& (((columnPrefix == null) && (ecp.getColumnPrefix() == null)) ||
(ecp.getColumnPrefix().equals(columnPrefix)))) {
return ecp;
}
}

View File

@ -74,14 +74,14 @@ public String getEntityId() {
/**
* Constructs a row key prefix for the entity table as follows:
* {@code userName!clusterId!flowName!flowRunId!AppId}
* {@code userName!clusterId!flowName!flowRunId!AppId}.
*
* @param clusterId
* @param userId
* @param flowName
* @param flowRunId
* @param appId
* @return byte array with the row key prefix
* @param clusterId Context cluster id.
* @param userId User name.
* @param flowName Flow name.
* @param flowRunId Run Id for the flow.
* @param appId Application Id.
* @return byte array with the row key prefix.
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
String flowName, Long flowRunId, String appId) {
@ -97,15 +97,17 @@ public static byte[] getRowKeyPrefix(String clusterId, String userId,
/**
* Constructs a row key prefix for the entity table as follows:
* {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}
* {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}.
* Typically used while querying multiple entities of a particular entity
* type.
*
* @param clusterId
* @param userId
* @param flowName
* @param flowRunId
* @param appId
* @param entityType
* @return byte array with the row key prefix
* @param clusterId Context cluster id.
* @param userId User name.
* @param flowName Flow name.
* @param flowRunId Run Id for the flow.
* @param appId Application Id.
* @param entityType Entity type.
* @return byte array with the row key prefix.
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
String flowName, Long flowRunId, String appId, String entityType) {
@ -123,16 +125,17 @@ public static byte[] getRowKeyPrefix(String clusterId, String userId,
/**
* Constructs a row key for the entity table as follows:
* {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}
* {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}.
* Typically used while querying a specific entity.
*
* @param clusterId
* @param userId
* @param flowName
* @param flowRunId
* @param appId
* @param entityType
* @param entityId
* @return byte array with the row key
* @param clusterId Context cluster id.
* @param userId User name.
* @param flowName Flow name.
* @param flowRunId Run Id for the flow.
* @param appId Application Id.
* @param entityType Entity type.
* @param entityId Entity Id.
* @return byte array with the row key.
*/
public static byte[] getRowKey(String clusterId, String userId,
String flowName, Long flowRunId, String appId, String entityType,
@ -151,6 +154,9 @@ public static byte[] getRowKey(String clusterId, String userId,
/**
* Given the raw row key as bytes, returns the row key as an object.
*
* @param rowKey byte representation of row key.
* @return An <cite>EntityRowKey</cite> object.
*/
public static EntityRowKey parseRowKey(byte[] rowKey) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);

View File

@ -69,27 +69,27 @@
* </pre>
*/
public class EntityTable extends BaseTable<EntityTable> {
/** entity prefix */
/** entity prefix. */
private static final String PREFIX =
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "entity";
/** config param name that specifies the entity table name */
/** config param name that specifies the entity table name. */
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
/**
* config param name that specifies the TTL for metrics column family in
* entity table
* entity table.
*/
private static final String METRICS_TTL_CONF_NAME = PREFIX
+ ".table.metrics.ttl";
/** default value for entity table name */
/** default value for entity table name. */
private static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
/** default TTL is 30 days for metrics timeseries */
/** default TTL is 30 days for metrics timeseries. */
private static final int DEFAULT_METRICS_TTL = 2592000;
/** default max number of versions */
/** default max number of versions. */
private static final int DEFAULT_METRICS_MAX_VERSIONS = 1000;
private static final Log LOG = LogFactory.getLog(EntityTable.class);
@ -139,8 +139,8 @@ public void createTable(Admin admin, Configuration hbaseConf)
metricsCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
DEFAULT_METRICS_TTL));
entityTableDescp
.setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
entityTableDescp.setRegionSplitPolicyClassName(
"org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
admin.createTable(entityTableDescp,

View File

@ -16,6 +16,10 @@
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.timelineservice.storage.entity
* contains classes related to implementation for entity table.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.storage.entity;

View File

@ -26,7 +26,7 @@
public enum AggregationCompactionDimension {
/**
* the application id
* the application id.
*/
APPLICATION_ID((byte) 101);
@ -50,8 +50,8 @@ public byte[] getInBytes() {
return this.inBytes.clone();
}
public static AggregationCompactionDimension getAggregationCompactionDimension(
String aggCompactDimStr) {
public static AggregationCompactionDimension
getAggregationCompactionDimension(String aggCompactDimStr) {
for (AggregationCompactionDimension aggDim : AggregationCompactionDimension
.values()) {
if (aggDim.name().equals(aggCompactDimStr)) {

View File

@ -36,17 +36,17 @@ public enum AggregationOperation {
MAX((byte) 73),
/**
* The metrics of the flow
* The metrics of the flow.
*/
SUM((byte) 79),
/**
* application running
* application running.
*/
SUM_FINAL((byte) 83),
/**
* compact
* compact.
*/
COMPACT((byte) 89);
@ -71,8 +71,8 @@ public byte[] getInBytes() {
}
/**
* returns the AggregationOperation enum that represents that string
* @param aggOpStr
* returns the AggregationOperation enum that represents that string.
* @param aggOpStr Aggregation operation.
* @return the AggregationOperation enum that represents that string
*/
public static AggregationOperation getAggregationOperation(String aggOpStr) {

View File

@ -24,7 +24,8 @@
/**
* Represents the flow run table column families.
*/
public enum FlowActivityColumnFamily implements ColumnFamily<FlowActivityTable> {
public enum FlowActivityColumnFamily
implements ColumnFamily<FlowActivityTable> {
/**
* Info column family houses known columns, specifically ones included in

Some files were not shown because too many files have changed in this diff Show More