YARN-3367. Replace starting a separate thread for post entity with event loop in TimelineClient (Naganarasimha G R via sjlee)

This commit is contained in:
Sangjin Lee 2016-02-09 09:07:37 -08:00
parent 960af7d471
commit 0d02ab8729
12 changed files with 619 additions and 217 deletions

View File

@ -28,10 +28,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -85,7 +82,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The job history events get routed to this class. This class writes the Job
* history events to the DFS directly into a staging dir and then moved to a
@ -141,10 +137,6 @@ public class JobHistoryEventHandler extends AbstractService
private boolean timelineServiceV2Enabled = false;
// For posting entities in new timeline service in a non-blocking way
// TODO YARN-3367 replace with event loop in TimelineClient.
private ExecutorService threadPool;
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
private static final String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE =
@ -284,10 +276,6 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.timelineServiceV2Enabled(conf);
LOG.info("Timeline service is enabled; version: " +
YarnConfiguration.getTimelineServiceVersion(conf));
if (timelineServiceV2Enabled) {
// initialize the thread pool for v.2 timeline service
threadPool = createThreadPool();
}
} else {
LOG.info("Timeline service is not enabled");
}
@ -461,35 +449,9 @@ protected void serviceStop() throws Exception {
if (timelineClient != null) {
timelineClient.stop();
}
if (threadPool != null) {
shutdownAndAwaitTermination();
}
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
super.serviceStop();
}
// TODO remove threadPool after adding non-blocking call in TimelineClient
private ExecutorService createThreadPool() {
return Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
}
private void shutdownAndAwaitTermination() {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
LOG.error("ThreadPool did not terminate");
}
}
} catch (InterruptedException ie) {
threadPool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
protected EventWriter createEventWriter(Path historyFilePath)
throws IOException {
@ -1097,41 +1059,6 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
+ "Server", ex);
}
}
@Private
public JsonNode countersToJSON(Counters counters) {
ArrayNode nodes = FACTORY.arrayNode();
if (counters != null) {
for (CounterGroup counterGroup : counters) {
ObjectNode groupNode = nodes.addObject();
groupNode.put("NAME", counterGroup.getName());
groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
ArrayNode countersNode = groupNode.putArray("COUNTERS");
for (Counter counter : counterGroup) {
ObjectNode counterNode = countersNode.addObject();
counterNode.put("NAME", counter.getName());
counterNode.put("DISPLAY_NAME", counter.getDisplayName());
counterNode.put("VALUE", counter.getValue());
}
}
}
return nodes;
}
private void putEntityWithoutBlocking(final TimelineClient client,
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) {
Runnable publishWrapper = new Runnable() {
public void run() {
try {
client.putEntities(entity);
} catch (IOException|YarnException e) {
LOG.error("putEntityNonBlocking get failed: " + e);
throw new RuntimeException(e.toString());
}
}
};
threadPool.execute(publishWrapper);
}
// create JobEntity from HistoryEvent with adding other info, like:
// jobId, timestamp and entityType.
@ -1293,7 +1220,13 @@ private void processEventForNewTimelineService(HistoryEvent event,
taskId, setCreatedTime);
}
}
putEntityWithoutBlocking(timelineClient, tEntity);
try {
timelineClient.putEntitiesAsync(tEntity);
} catch (IOException | YarnException e) {
LOG.error("Failed to process Event " + event.getEventType()
+ " for the job : " + jobId, e);
}
}
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {

View File

@ -21,8 +21,8 @@
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -176,7 +176,7 @@ protected void writeEntities(Configuration tlConf,
// create entities from job history and write them
long totalTime = 0;
Set<TimelineEntity> entitySet =
List<TimelineEntity> entitySet =
converter.createTimelineEntities(jobInfo, jobConf);
LOG.info("converted them into timeline entities for job " + jobIdStr);
// use the current user for this purpose
@ -215,7 +215,7 @@ protected void writeEntities(Configuration tlConf,
}
private void writeAllEntities(AppLevelTimelineCollector collector,
Set<TimelineEntity> entitySet, UserGroupInformation ugi)
List<TimelineEntity> entitySet, UserGroupInformation ugi)
throws IOException {
TimelineEntities entities = new TimelineEntities();
entities.setEntities(entitySet);
@ -223,7 +223,7 @@ private void writeAllEntities(AppLevelTimelineCollector collector,
}
private void writePerEntity(AppLevelTimelineCollector collector,
Set<TimelineEntity> entitySet, UserGroupInformation ugi)
List<TimelineEntity> entitySet, UserGroupInformation ugi)
throws IOException {
for (TimelineEntity entity : entitySet) {
TimelineEntities entities = new TimelineEntities();

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.mapred;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -57,16 +59,16 @@ class TimelineEntityConverter {
* Note that we also do not add info to the YARN application entity, which
* would be needed for aggregation.
*/
public Set<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
public List<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
Configuration conf) {
Set<TimelineEntity> entities = new HashSet<>();
List<TimelineEntity> entities = new ArrayList<>();
// create the job entity
TimelineEntity job = createJobEntity(jobInfo, conf);
entities.add(job);
// create the task and task attempt entities
Set<TimelineEntity> tasksAndAttempts =
List<TimelineEntity> tasksAndAttempts =
createTaskAndTaskAttemptEntities(jobInfo);
entities.addAll(tasksAndAttempts);
@ -125,9 +127,9 @@ private void addMetrics(TimelineEntity entity, Counters counters) {
}
}
private Set<TimelineEntity> createTaskAndTaskAttemptEntities(
private List<TimelineEntity> createTaskAndTaskAttemptEntities(
JobInfo jobInfo) {
Set<TimelineEntity> entities = new HashSet<>();
List<TimelineEntity> entities = new ArrayList<>();
Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
" tasks");

View File

@ -17,15 +17,16 @@
*/
package org.apache.hadoop.yarn.api.records.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.util.ArrayList;
import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This class hosts a set of timeline entities.
@ -36,22 +37,22 @@
@InterfaceStability.Unstable
public class TimelineEntities {
private Set<TimelineEntity> entities = new HashSet<>();
private List<TimelineEntity> entities = new ArrayList<>();
public TimelineEntities() {
}
@XmlElement(name = "entities")
public Set<TimelineEntity> getEntities() {
public List<TimelineEntity> getEntities() {
return entities;
}
public void setEntities(Set<TimelineEntity> timelineEntities) {
public void setEntities(List<TimelineEntity> timelineEntities) {
this.entities = timelineEntities;
}
public void addEntities(Set<TimelineEntity> timelineEntities) {
public void addEntities(List<TimelineEntity> timelineEntities) {
this.entities.addAll(timelineEntities);
}

View File

@ -1987,6 +1987,12 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = 1000;
public static final String NUMBER_OF_ASYNC_ENTITIES_TO_MERGE =
TIMELINE_SERVICE_PREFIX
+ "timeline-client.number-of-async-entities-to-merge";
public static final int DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = 10;
// mark app-history related configs @Private as application history is going
// to be integrated into the timeline service
@Private

View File

@ -40,9 +40,6 @@
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
@ -111,7 +108,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* An ApplicationMaster for executing shell commands on a set of launched
@ -225,10 +221,6 @@ public static enum DSEntity {
private boolean timelineServiceV2 = false;
// For posting entities in new timeline service in a non-blocking way
// TODO replace with event loop in TimelineClient.
private ExecutorService threadPool;
// App Master configuration
// No. of containers to run shell command on
@VisibleForTesting
@ -328,10 +320,6 @@ public static void main(String[] args) {
}
appMaster.run();
result = appMaster.finish();
if (appMaster.threadPool != null) {
appMaster.shutdownAndAwaitTermination();
}
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
LogManager.shutdown();
@ -346,29 +334,6 @@ public static void main(String[] args) {
}
}
//TODO remove threadPool after adding non-blocking call in TimelineClient
private ExecutorService createThreadPool() {
return Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
}
private void shutdownAndAwaitTermination() {
threadPool.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
LOG.error("ThreadPool did not terminate");
}
} catch (InterruptedException ie) {
threadPool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
/**
* Dump out contents of $CWD and the environment to stdout for debugging
*/
@ -591,11 +556,7 @@ public boolean init(String[] args) throws ParseException, IOException {
"container_retry_interval", "0"));
if (YarnConfiguration.timelineServiceEnabled(conf)) {
timelineServiceV2 =
YarnConfiguration.timelineServiceV2Enabled(conf);
if (timelineServiceV2) {
threadPool = createThreadPool();
}
timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf);
} else {
timelineClient = null;
LOG.warn("Timeline service is not enabled");
@ -746,8 +707,10 @@ public Void run() throws Exception {
if (timelineServiceV2) {
timelineClient = TimelineClient.createTimelineClient(
appAttemptID.getApplicationId());
LOG.info("Timeline service V2 client is enabled");
} else {
timelineClient = TimelineClient.createTimelineClient();
LOG.info("Timeline service V1 client is enabled");
}
timelineClient.init(conf);
timelineClient.start();
@ -1385,18 +1348,8 @@ Thread createLaunchContainerThread(Container allocatedContainer,
shellId);
return new Thread(runnableLaunchContainer);
}
private void publishContainerStartEventOnTimelineServiceV2(
final Container container) {
Runnable publishWrapper = new Runnable() {
public void run() {
publishContainerStartEventOnTimelineServiceV2Base(container);
}
};
threadPool.execute(publishWrapper);
}
private void publishContainerStartEventOnTimelineServiceV2Base(
private void publishContainerStartEventOnTimelineServiceV2(
Container container) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
@ -1430,16 +1383,6 @@ public TimelinePutResponse run() throws Exception {
private void publishContainerEndEventOnTimelineServiceV2(
final ContainerStatus container) {
Runnable publishWrapper = new Runnable() {
public void run() {
publishContainerEndEventOnTimelineServiceV2Base(container);
}
};
threadPool.execute(publishWrapper);
}
private void publishContainerEndEventOnTimelineServiceV2Base(
final ContainerStatus container) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getContainerId().toString());
@ -1470,17 +1413,6 @@ public TimelinePutResponse run() throws Exception {
}
private void publishApplicationAttemptEventOnTimelineServiceV2(
final DSEvent appEvent) {
Runnable publishWrapper = new Runnable() {
public void run() {
publishApplicationAttemptEventOnTimelineServiceV2Base(appEvent);
}
};
threadPool.execute(publishWrapper);
}
private void publishApplicationAttemptEventOnTimelineServiceV2Base(
DSEvent appEvent) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
@ -1498,7 +1430,7 @@ private void publishApplicationAttemptEventOnTimelineServiceV2Base(
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntities(entity);
timelineClient.putEntitiesAsync(entity);
return null;
}
});

View File

@ -41,9 +41,9 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -327,6 +327,19 @@ public void run() {
LOG.info("Interrupted while waiting for queue", ex);
continue;
}
String collectorAddress = response.getCollectorAddr();
TimelineClient timelineClient = client.getRegisteredTimeineClient();
if (timelineClient != null && collectorAddress != null
&& !collectorAddress.isEmpty()) {
if (collectorAddr == null
|| !collectorAddr.equals(collectorAddress)) {
collectorAddr = collectorAddress;
timelineClient.setTimelineServiceAddress(collectorAddress);
LOG.info("collectorAddress " + collectorAddress);
}
}
List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) {
handler.onNodesUpdated(updatedNodes);
@ -354,17 +367,6 @@ public void run() {
if (!allocated.isEmpty()) {
handler.onContainersAllocated(allocated);
}
String collectorAddress = response.getCollectorAddr();
TimelineClient timelineClient = client.getRegisteredTimeineClient();
if (timelineClient != null && collectorAddress != null
&& !collectorAddress.isEmpty()) {
if (collectorAddr == null ||
!collectorAddr.equals(collectorAddress)) {
collectorAddr = collectorAddress;
timelineClient.setTimelineServiceAddress(collectorAddress);
}
}
progress = handler.getProgress();
} catch (Throwable ex) {
handler.onError(ex);

View File

@ -28,12 +28,12 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -55,7 +55,7 @@ public abstract class TimelineClient extends AbstractService implements
* construct and initialize a timeline client if the following operations are
* supposed to be conducted by that user.
*/
private ApplicationId contextAppId;
protected ApplicationId contextAppId;
/**
* Creates an instance of the timeline v.1.x client.

View File

@ -30,6 +30,14 @@
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
@ -130,6 +138,8 @@ public class TimelineClientImpl extends TimelineClient {
@VisibleForTesting
TimelineClientConnectionRetry connectionRetry;
private TimelineEntityDispatcher entityDispatcher;
// Abstract class for an operation that should be retried by timeline client
@Private
@VisibleForTesting
@ -315,6 +325,7 @@ protected void serviceInit(Configuration conf) throws Exception {
serviceRetryInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
entityDispatcher = new TimelineEntityDispatcher(conf);
} else {
if (YarnConfiguration.useHttps(conf)) {
setTimelineServiceAddress(conf.get(
@ -335,7 +346,9 @@ protected void serviceInit(Configuration conf) throws Exception {
@Override
protected void serviceStart() throws Exception {
if (!timelineServiceV2) {
if (timelineServiceV2) {
entityDispatcher.start();
} else {
timelineWriter = createTimelineWriter(configuration, authUgi, client,
constructResURI(getConfig(), timelineServiceAddress, false));
}
@ -357,6 +370,9 @@ protected void serviceStop() throws Exception {
if (this.timelineWriter != null) {
this.timelineWriter.close();
}
if (timelineServiceV2) {
entityDispatcher.stop();
}
super.serviceStop();
}
@ -376,37 +392,21 @@ public TimelinePutResponse putEntities(
@Override
public void putEntities(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
throws IOException, YarnException {
putEntities(false, entities);
throws IOException, YarnException {
if (!timelineServiceV2) {
throw new YarnException("v.2 method is invoked on a v.1.x client");
}
entityDispatcher.dispatchEntities(true, entities);
}
@Override
public void putEntitiesAsync(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
throws IOException, YarnException {
putEntities(true, entities);
}
private void putEntities(boolean async,
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
throws IOException, YarnException {
if (!timelineServiceV2) {
throw new YarnException("v.2 method is invoked on a v.1.x client");
}
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
entitiesContainer =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) {
entitiesContainer.addEntity(entity);
}
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
if (getContextAppId() != null) {
params.add("appid", getContextAppId().toString());
}
if (async) {
params.add("async", Boolean.TRUE.toString());
}
putObjects("entities", params, entitiesContainer);
entityDispatcher.dispatchEntities(false, entities);
}
@Override
@ -417,20 +417,10 @@ public void putDomain(TimelineDomain domain) throws IOException,
// Used for new timeline service only
@Private
public void putObjects(String path, MultivaluedMap<String, String> params,
protected void putObjects(String path, MultivaluedMap<String, String> params,
Object obj) throws IOException, YarnException {
// timelineServiceAddress could haven't be initialized yet
// or stale (only for new timeline service)
int retries = pollTimelineServiceAddress(this.maxServiceRetries);
if (timelineServiceAddress == null) {
String errMessage = "TimelineClient has reached to max retry times : "
+ this.maxServiceRetries
+ ", but failed to fetch timeline service address. Please verify"
+ " Timeline Auxillary Service is configured in all the NMs";
LOG.error(errMessage);
throw new YarnException(errMessage);
}
int retries = verifyRestEndPointAvailable();
// timelineServiceAddress could be stale, add retry logic here.
boolean needRetry = true;
@ -448,6 +438,21 @@ public void putObjects(String path, MultivaluedMap<String, String> params,
}
}
private int verifyRestEndPointAvailable() throws YarnException {
// timelineServiceAddress could haven't be initialized yet
// or stale (only for new timeline service)
int retries = pollTimelineServiceAddress(this.maxServiceRetries);
if (timelineServiceAddress == null) {
String errMessage = "TimelineClient has reached to max retry times : "
+ this.maxServiceRetries
+ ", but failed to fetch timeline service address. Please verify"
+ " Timeline Auxillary Service is configured in all the NMs";
LOG.error(errMessage);
throw new YarnException(errMessage);
}
return retries;
}
/**
* Check if reaching to maximum of retries.
* @param retries
@ -641,7 +646,7 @@ private int pollTimelineServiceAddress(int retries) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
timelineServiceAddress = getTimelineServiceAddress();
// timelineServiceAddress = getTimelineServiceAddress();
retries--;
}
return retries;
@ -906,4 +911,212 @@ public boolean shouldRetryOn(Exception e) {
}
}
private final class EntitiesHolder extends FutureTask<Void> {
private final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities;
private final boolean isSync;
EntitiesHolder(
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities,
final boolean isSync) {
super(new Callable<Void>() {
// publishEntities()
public Void call() throws Exception {
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
params.add("appid", contextAppId.toString());
params.add("async", Boolean.toString(!isSync));
putObjects("entities", params, entities);
return null;
}
});
this.entities = entities;
this.isSync = isSync;
}
public boolean isSync() {
return isSync;
}
public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities getEntities() {
return entities;
}
}
/**
* This class is responsible for collecting the timeline entities and
* publishing them in async.
*/
private class TimelineEntityDispatcher {
/**
* Time period for which the timelineclient will wait for draining after
* stop
*/
private static final long DRAIN_TIME_PERIOD = 2000L;
private int numberOfAsyncsToMerge;
private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
private ExecutorService executor;
TimelineEntityDispatcher(Configuration conf) {
timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
numberOfAsyncsToMerge =
conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
}
Runnable createRunnable() {
return new Runnable() {
@Override
public void run() {
try {
EntitiesHolder entitiesHolder;
while (!Thread.currentThread().isInterrupted()) {
// Merge all the async calls and make one push, but if its sync
// call push immediately
try {
entitiesHolder = timelineEntityQueue.take();
} catch (InterruptedException ie) {
LOG.info("Timeline dispatcher thread was interrupted ");
Thread.currentThread().interrupt();
return;
}
if (entitiesHolder != null) {
publishWithoutBlockingOnQueue(entitiesHolder);
}
}
} finally {
if (!timelineEntityQueue.isEmpty()) {
LOG.info("Yet to publish " + timelineEntityQueue.size()
+ " timelineEntities, draining them now. ");
}
// Try to drain the remaining entities to be published @ the max for
// 2 seconds
long timeTillweDrain =
System.currentTimeMillis() + DRAIN_TIME_PERIOD;
while (!timelineEntityQueue.isEmpty()) {
publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
if (System.currentTimeMillis() > timeTillweDrain) {
// time elapsed stop publishing further....
if (!timelineEntityQueue.isEmpty()) {
LOG.warn("Time to drain elapsed! Remaining "
+ timelineEntityQueue.size() + "timelineEntities will not"
+ " be published");
// if some entities were not drained then we need interrupt
// the threads which had put sync EntityHolders to the queue.
EntitiesHolder nextEntityInTheQueue = null;
while ((nextEntityInTheQueue =
timelineEntityQueue.poll()) != null) {
nextEntityInTheQueue.cancel(true);
}
}
break;
}
}
}
}
/**
* Publishes the given EntitiesHolder and return immediately if sync
* call, else tries to fetch the EntitiesHolder from the queue in non
* blocking fashion and collate the Entities if possible before
* publishing through REST.
*
* @param entitiesHolder
*/
private void publishWithoutBlockingOnQueue(
EntitiesHolder entitiesHolder) {
if (entitiesHolder.isSync()) {
entitiesHolder.run();
return;
}
int count = 1;
while (true) {
// loop till we find a sync put Entities or there is nothing
// to take
EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
if (nextEntityInTheQueue == null) {
// Nothing in the queue just publish and get back to the
// blocked wait state
entitiesHolder.run();
break;
} else if (nextEntityInTheQueue.isSync()) {
// flush all the prev async entities first
entitiesHolder.run();
// and then flush the sync entity
nextEntityInTheQueue.run();
break;
} else {
// append all async entities together and then flush
entitiesHolder.getEntities().addEntities(
nextEntityInTheQueue.getEntities().getEntities());
count++;
if (count == numberOfAsyncsToMerge) {
// Flush the entities if the number of the async
// putEntites merged reaches the desired limit. To avoid
// collecting multiple entities and delaying for a long
// time.
entitiesHolder.run();
break;
}
}
}
}
};
}
public void dispatchEntities(boolean sync,
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[] entitiesTobePublished)
throws YarnException {
if (executor.isShutdown()) {
throw new YarnException("Timeline client is in the process of stopping,"
+ " not accepting any more TimelineEntities");
}
// wrap all TimelineEntity into TimelineEntities object
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entitiesTobePublished) {
entities.addEntity(entity);
}
// created a holder and place it in queue
EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
try {
timelineEntityQueue.put(entitiesHolder);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException(
"Failed while adding entity to the queue for publishing", e);
}
if (sync) {
// In sync call we need to wait till its published and if any error then
// throw it back
try {
entitiesHolder.get();
} catch (ExecutionException e) {
throw new YarnException("Failed while publishing entity",
e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while publishing entity", e);
}
}
}
public void start() {
executor = Executors.newSingleThreadExecutor();
executor.execute(createRunnable());
}
public void stop() {
LOG.info("Stopping TimelineClient.");
executor.shutdownNow();
try {
executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
}

View File

@ -2220,6 +2220,13 @@
<value>1000</value>
</property>
<property>
<description>Time line V2 client tries to merge these many number of
async entities (if available) and then call the REST ATS V2 API to submit.
</description>
<name>yarn.timeline-service.timeline-client.number-of-async-entities-to-merge</name>
<value>10</value>
</property>
<!-- Shared Cache Configuration -->
<property>

View File

@ -0,0 +1,304 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestTimelineClientV2Impl {
private static final Log LOG =
LogFactory.getLog(TestTimelineClientV2Impl.class);
private TestV2TimelineClient client;
private static long TIME_TO_SLEEP = 150;
@Before
public void setup() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
client = createTimelineClient(conf);
}
private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) {
ApplicationId id = ApplicationId.newInstance(0, 0);
TestV2TimelineClient client = new TestV2TimelineClient(id);
client.init(conf);
client.start();
return client;
}
private class TestV2TimelineClient extends TimelineClientImpl {
private boolean sleepBeforeReturn;
private boolean throwException;
private List<TimelineEntities> publishedEntities;
public TimelineEntities getPublishedEntities(int putIndex) {
Assert.assertTrue("Not So many entities Published",
putIndex < publishedEntities.size());
return publishedEntities.get(putIndex);
}
public void setSleepBeforeReturn(boolean sleepBeforeReturn) {
this.sleepBeforeReturn = sleepBeforeReturn;
}
public void setThrowException(boolean throwException) {
this.throwException = throwException;
}
public int getNumOfTimelineEntitiesPublished() {
return publishedEntities.size();
}
public TestV2TimelineClient(ApplicationId id) {
super(id);
publishedEntities = new ArrayList<TimelineEntities>();
}
protected void putObjects(String path,
MultivaluedMap<String, String> params, Object obj)
throws IOException, YarnException {
if (throwException) {
throw new YarnException("ActualException");
}
publishedEntities.add((TimelineEntities) obj);
if (sleepBeforeReturn) {
try {
Thread.sleep(TIME_TO_SLEEP);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
@Test
public void testPostEntities() throws Exception {
try {
client.putEntities(generateEntity("1"));
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
}
@Test
public void testASyncCallMerge() throws Exception {
client.setSleepBeforeReturn(true);
try {
client.putEntitiesAsync(generateEntity("1"));
Thread.sleep(TIME_TO_SLEEP / 2);
// by the time first put response comes push 2 entities in the queue
client.putEntitiesAsync(generateEntity("2"));
client.putEntitiesAsync(generateEntity("3"));
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
for (int i = 0; i < 4; i++) {
if (client.getNumOfTimelineEntitiesPublished() == 2) {
break;
}
Thread.sleep(TIME_TO_SLEEP);
}
Assert.assertEquals("two merged TimelineEntities needs to be published", 2,
client.getNumOfTimelineEntitiesPublished());
TimelineEntities secondPublishedEntities = client.getPublishedEntities(1);
Assert.assertEquals(
"Merged TimelineEntities Object needs to 2 TimelineEntity Object", 2,
secondPublishedEntities.getEntities().size());
Assert.assertEquals("Order of Async Events Needs to be FIFO", "2",
secondPublishedEntities.getEntities().get(0).getId());
Assert.assertEquals("Order of Async Events Needs to be FIFO", "3",
secondPublishedEntities.getEntities().get(1).getId());
}
@Test
public void testSyncCall() throws Exception {
try {
// sync entity should not be be merged with Async
client.putEntities(generateEntity("1"));
client.putEntitiesAsync(generateEntity("2"));
client.putEntitiesAsync(generateEntity("3"));
// except for the sync call above 2 should be merged
client.putEntities(generateEntity("4"));
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
for (int i = 0; i < 4; i++) {
if (client.getNumOfTimelineEntitiesPublished() == 3) {
break;
}
Thread.sleep(TIME_TO_SLEEP);
}
printReceivedEntities();
Assert.assertEquals("TimelineEntities not published as desired", 3,
client.getNumOfTimelineEntitiesPublished());
TimelineEntities firstPublishedEntities = client.getPublishedEntities(0);
Assert.assertEquals("sync entities should not be merged with async", 1,
firstPublishedEntities.getEntities().size());
// test before pushing the sync entities asyncs are merged and pushed
TimelineEntities secondPublishedEntities = client.getPublishedEntities(1);
Assert.assertEquals(
"async entities should be merged before publishing sync", 2,
secondPublishedEntities.getEntities().size());
Assert.assertEquals("Order of Async Events Needs to be FIFO", "2",
secondPublishedEntities.getEntities().get(0).getId());
Assert.assertEquals("Order of Async Events Needs to be FIFO", "3",
secondPublishedEntities.getEntities().get(1).getId());
// test the last entity published is sync put
TimelineEntities thirdPublishedEntities = client.getPublishedEntities(2);
Assert.assertEquals("sync entities had to be published at the last", 1,
thirdPublishedEntities.getEntities().size());
Assert.assertEquals("Expected last sync Event is not proper", "4",
thirdPublishedEntities.getEntities().get(0).getId());
}
@Test
public void testExceptionCalls() throws Exception {
client.setThrowException(true);
try {
client.putEntitiesAsync(generateEntity("1"));
} catch (YarnException e) {
Assert.fail("Async calls are not expected to throw exception");
}
try {
client.putEntities(generateEntity("2"));
Assert.fail("Sync calls are expected to throw exception");
} catch (YarnException e) {
Assert.assertEquals("Same exception needs to be thrown",
"ActualException", e.getCause().getMessage());
}
}
@Test
public void testConfigurableNumberOfMerges() throws Exception {
client.setSleepBeforeReturn(true);
try {
// At max 3 entities need to be merged
client.putEntitiesAsync(generateEntity("1"));
client.putEntitiesAsync(generateEntity("2"));
client.putEntitiesAsync(generateEntity("3"));
client.putEntitiesAsync(generateEntity("4"));
client.putEntities(generateEntity("5"));
client.putEntitiesAsync(generateEntity("6"));
client.putEntitiesAsync(generateEntity("7"));
client.putEntitiesAsync(generateEntity("8"));
client.putEntitiesAsync(generateEntity("9"));
client.putEntitiesAsync(generateEntity("10"));
} catch (YarnException e) {
Assert.fail("No exception expected");
}
// not having the same logic here as it doesn't depend on how many times
// events are published.
Thread.sleep(2 * TIME_TO_SLEEP);
printReceivedEntities();
for (TimelineEntities publishedEntities : client.publishedEntities) {
Assert.assertTrue(
"Number of entities should not be greater than 3 for each publish,"
+ " but was " + publishedEntities.getEntities().size(),
publishedEntities.getEntities().size() <= 3);
}
}
@Test
public void testAfterStop() throws Exception {
client.setSleepBeforeReturn(true);
try {
// At max 3 entities need to be merged
client.putEntities(generateEntity("1"));
for (int i = 2; i < 20; i++) {
client.putEntitiesAsync(generateEntity("" + i));
}
client.stop();
try {
client.putEntitiesAsync(generateEntity("50"));
Assert.fail("Exception expected");
} catch (YarnException e) {
// expected
}
} catch (YarnException e) {
Assert.fail("No exception expected");
}
// not having the same logic here as it doesn't depend on how many times
// events are published.
for (int i = 0; i < 5; i++) {
TimelineEntities publishedEntities =
client.publishedEntities.get(client.publishedEntities.size() - 1);
TimelineEntity timelineEntity = publishedEntities.getEntities()
.get(publishedEntities.getEntities().size() - 1);
if (!timelineEntity.getId().equals("19")) {
Thread.sleep(2 * TIME_TO_SLEEP);
}
}
printReceivedEntities();
TimelineEntities publishedEntities =
client.publishedEntities.get(client.publishedEntities.size() - 1);
TimelineEntity timelineEntity = publishedEntities.getEntities()
.get(publishedEntities.getEntities().size() - 1);
Assert.assertEquals("", "19", timelineEntity.getId());
}
private void printReceivedEntities() {
for (int i = 0; i < client.getNumOfTimelineEntitiesPublished(); i++) {
TimelineEntities publishedEntities = client.getPublishedEntities(i);
StringBuilder entitiesPerPublish = new StringBuilder();
;
for (TimelineEntity entity : publishedEntities.getEntities()) {
entitiesPerPublish.append(entity.getId());
entitiesPerPublish.append(",");
}
LOG.info("Entities Published @ index " + i + " : "
+ entitiesPerPublish.toString());
}
}
private static TimelineEntity generateEntity(String id) {
TimelineEntity entity = new TimelineEntity();
entity.setId(id);
entity.setType("testEntity");
entity.setCreatedTime(System.currentTimeMillis());
return entity;
}
@After
public void tearDown() {
if (client != null) {
client.stop();
}
}
}

View File

@ -962,7 +962,9 @@ private void updateTimelineClientsAddress(
Map<ApplicationId, String> knownCollectorsMap =
response.getAppCollectorsMap();
if (knownCollectorsMap == null) {
LOG.warn("the collectors map is null");
if (LOG.isDebugEnabled()) {
LOG.debug("No collectors to update RM");
}
} else {
Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
knownCollectorsMap.entrySet();