YARN-4675. Reorganize TimelineClient and TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed by Naganarasimha G R.

This commit is contained in:
Sangjin Lee 2017-02-16 11:41:04 -08:00
parent 5690b51ef7
commit 4fa1afdb88
19 changed files with 1272 additions and 985 deletions

View File

@ -72,13 +72,12 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
@ -90,8 +89,6 @@ import com.sun.jersey.api.client.ClientHandlerException;
*/
public class JobHistoryEventHandler extends AbstractService
implements EventHandler<JobHistoryEvent> {
private static final JsonNodeFactory FACTORY =
new ObjectMapper().getNodeFactory();
private final AppContext context;
private final int startCount;
@ -133,9 +130,10 @@ public class JobHistoryEventHandler extends AbstractService
// should job completion be force when the AM shuts down?
protected volatile boolean forceJobCompletion = false;
@VisibleForTesting
protected TimelineClient timelineClient;
private boolean timelineServiceV2Enabled = false;
@VisibleForTesting
protected TimelineV2Client timelineV2Client;
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
@ -268,12 +266,17 @@ public class JobHistoryEventHandler extends AbstractService
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
LOG.info("Emitting job history data to the timeline service is enabled");
if (YarnConfiguration.timelineServiceEnabled(conf)) {
timelineClient =
((MRAppMaster.RunningAppContext)context).getTimelineClient();
timelineClient.init(conf);
timelineServiceV2Enabled =
YarnConfiguration.timelineServiceV2Enabled(conf);
boolean timelineServiceV2Enabled =
((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
if(timelineServiceV2Enabled) {
timelineV2Client =
((MRAppMaster.RunningAppContext)context).getTimelineV2Client();
timelineV2Client.init(conf);
} else {
timelineClient =
((MRAppMaster.RunningAppContext) context).getTimelineClient();
timelineClient.init(conf);
}
LOG.info("Timeline service is enabled; version: " +
YarnConfiguration.getTimelineServiceVersion(conf));
} else {
@ -324,6 +327,8 @@ public class JobHistoryEventHandler extends AbstractService
protected void serviceStart() throws Exception {
if (timelineClient != null) {
timelineClient.start();
} else if (timelineV2Client != null) {
timelineV2Client.start();
}
eventHandlingThread = new Thread(new Runnable() {
@Override
@ -448,6 +453,8 @@ public class JobHistoryEventHandler extends AbstractService
}
if (timelineClient != null) {
timelineClient.stop();
} else if (timelineV2Client != null) {
timelineV2Client.stop();
}
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
super.serviceStop();
@ -605,14 +612,12 @@ public class JobHistoryEventHandler extends AbstractService
}
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID());
if (timelineClient != null) {
if (timelineServiceV2Enabled) {
processEventForNewTimelineService(historyEvent, event.getJobID(),
event.getTimestamp());
} else {
processEventForTimelineServer(historyEvent, event.getJobID(),
event.getTimestamp());
}
if (timelineV2Client != null) {
processEventForNewTimelineService(historyEvent, event.getJobID(),
event.getTimestamp());
} else if (timelineClient != null) {
processEventForTimelineServer(historyEvent, event.getJobID(),
event.getTimestamp());
}
if (LOG.isDebugEnabled()) {
LOG.debug("In HistoryEventHandler "
@ -1162,8 +1167,8 @@ public class JobHistoryEventHandler extends AbstractService
configSize += size;
if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) {
if (jobEntityForConfigs.getConfigs().size() > 0) {
timelineClient.putEntities(jobEntityForConfigs);
timelineClient.putEntities(appEntityForConfigs);
timelineV2Client.putEntities(jobEntityForConfigs);
timelineV2Client.putEntities(appEntityForConfigs);
jobEntityForConfigs = createJobEntity(jobId);
appEntityForConfigs = new ApplicationEntity();
appEntityForConfigs.setId(appId);
@ -1174,8 +1179,8 @@ public class JobHistoryEventHandler extends AbstractService
appEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
}
if (configSize > 0) {
timelineClient.putEntities(jobEntityForConfigs);
timelineClient.putEntities(appEntityForConfigs);
timelineV2Client.putEntities(jobEntityForConfigs);
timelineV2Client.putEntities(appEntityForConfigs);
}
} catch (IOException | YarnException e) {
LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " +
@ -1295,9 +1300,9 @@ public class JobHistoryEventHandler extends AbstractService
}
try {
if (appEntityWithJobMetrics == null) {
timelineClient.putEntitiesAsync(tEntity);
timelineV2Client.putEntitiesAsync(tEntity);
} else {
timelineClient.putEntities(tEntity, appEntityWithJobMetrics);
timelineV2Client.putEntities(tEntity, appEntityWithJobMetrics);
}
} catch (IOException | YarnException e) {
LOG.error("Failed to process Event " + event.getEventType()

View File

@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.crypto.KeyGenerator;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -141,6 +143,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -154,8 +157,6 @@ import org.apache.hadoop.yarn.util.SystemClock;
import com.google.common.annotations.VisibleForTesting;
import javax.crypto.KeyGenerator;
/**
* The Map-Reduce Application Master.
* The state machine is encapsulated in the implementation of Job interface.
@ -1066,6 +1067,7 @@ public class MRAppMaster extends CompositeService {
private final ClusterInfo clusterInfo = new ClusterInfo();
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
private TimelineClient timelineClient = null;
private TimelineV2Client timelineV2Client = null;
private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
@ -1081,7 +1083,7 @@ public class MRAppMaster extends CompositeService {
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// create new version TimelineClient
timelineClient = TimelineClient.createTimelineClient(
timelineV2Client = TimelineV2Client.createTimelineClient(
appAttemptID.getApplicationId());
} else {
timelineClient = TimelineClient.createTimelineClient();
@ -1177,10 +1179,14 @@ public class MRAppMaster extends CompositeService {
return taskAttemptFinishingMonitor;
}
// Get Timeline Collector's address (get sync from RM)
public TimelineClient getTimelineClient() {
return timelineClient;
}
// Get Timeline Collector's address (get sync from RM)
public TimelineV2Client getTimelineV2Client() {
return timelineV2Client;
}
}
@SuppressWarnings("unchecked")

View File

@ -882,8 +882,8 @@ public class RMContainerAllocator extends RMContainerRequestor
MRAppMaster.RunningAppContext appContext =
(MRAppMaster.RunningAppContext)this.getContext();
if (collectorAddr != null && !collectorAddr.isEmpty()
&& appContext.getTimelineClient() != null) {
appContext.getTimelineClient().setTimelineServiceAddress(
&& appContext.getTimelineV2Client() != null) {
appContext.getTimelineV2Client().setTimelineServiceAddress(
response.getCollectorAddr());
}

View File

@ -29,8 +29,8 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import org.apache.commons.logging.Log;
@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
@ -829,6 +830,9 @@ public class TestJobHistoryEventHandler {
if (mockContext instanceof RunningAppContext) {
when(((RunningAppContext)mockContext).getTimelineClient()).
thenReturn(TimelineClient.createTimelineClient());
when(((RunningAppContext) mockContext).getTimelineV2Client())
.thenReturn(TimelineV2Client
.createTimelineClient(ApplicationId.newInstance(0, 1)));
}
return mockContext;
}
@ -937,6 +941,8 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
protected void serviceStart() {
if (timelineClient != null) {
timelineClient.start();
} else if (timelineV2Client != null) {
timelineV2Client.start();
}
}

View File

@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
@ -219,7 +220,9 @@ public class ApplicationMaster {
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
private boolean timelineServiceV2 = false;
private boolean timelineServiceV2Enabled = false;
private boolean timelineServiceV1Enabled = false;
// App Master configuration
// No. of containers to run shell command on
@ -293,6 +296,10 @@ public class ApplicationMaster {
// Timeline Client
@VisibleForTesting
TimelineClient timelineClient;
// Timeline v2 Client
private TimelineV2Client timelineV2Client;
static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
static final String APPID_TIMELINE_FILTER_NAME = "appId";
static final String USER_TIMELINE_FILTER_NAME = "user";
@ -556,9 +563,12 @@ public class ApplicationMaster {
"container_retry_interval", "0"));
if (YarnConfiguration.timelineServiceEnabled(conf)) {
timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf);
timelineServiceV2Enabled =
((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
timelineServiceV1Enabled = !timelineServiceV2Enabled;
} else {
timelineClient = null;
timelineV2Client = null;
LOG.warn("Timeline service is not enabled");
}
@ -621,18 +631,17 @@ public class ApplicationMaster {
nmClientAsync.start();
startTimelineClient(conf);
if (timelineServiceV2) {
if (timelineServiceV2Enabled) {
// need to bind timelineClient
amRMClient.registerTimelineClient(timelineClient);
amRMClient.registerTimelineV2Client(timelineV2Client);
}
if(timelineClient != null) {
if (timelineServiceV2) {
publishApplicationAttemptEventOnTimelineServiceV2(
DSEvent.DS_APP_ATTEMPT_START);
} else {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
}
if (timelineServiceV2Enabled) {
publishApplicationAttemptEventOnTimelineServiceV2(
DSEvent.DS_APP_ATTEMPT_START);
} else if (timelineServiceV1Enabled) {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
}
// Setup local RPC Server to accept status requests directly from clients
@ -704,18 +713,21 @@ public class ApplicationMaster {
public Void run() throws Exception {
if (YarnConfiguration.timelineServiceEnabled(conf)) {
// Creating the Timeline Client
if (timelineServiceV2) {
timelineClient = TimelineClient.createTimelineClient(
if (timelineServiceV2Enabled) {
timelineV2Client = TimelineV2Client.createTimelineClient(
appAttemptID.getApplicationId());
timelineV2Client.init(conf);
timelineV2Client.start();
LOG.info("Timeline service V2 client is enabled");
} else {
timelineClient = TimelineClient.createTimelineClient();
timelineClient.init(conf);
timelineClient.start();
LOG.info("Timeline service V1 client is enabled");
}
timelineClient.init(conf);
timelineClient.start();
} else {
timelineClient = null;
timelineV2Client = null;
LOG.warn("Timeline service is not enabled");
}
return null;
@ -741,14 +753,12 @@ public class ApplicationMaster {
} catch (InterruptedException ex) {}
}
if (timelineClient != null) {
if (timelineServiceV2) {
publishApplicationAttemptEventOnTimelineServiceV2(
DSEvent.DS_APP_ATTEMPT_END);
} else {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
}
if (timelineServiceV2Enabled) {
publishApplicationAttemptEventOnTimelineServiceV2(
DSEvent.DS_APP_ATTEMPT_END);
} else if (timelineServiceV1Enabled) {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
}
// Join all launched threads
@ -797,8 +807,10 @@ public class ApplicationMaster {
amRMClient.stop();
// Stop Timeline Client
if(timelineClient != null) {
if(timelineServiceV1Enabled) {
timelineClient.stop();
} else if (timelineServiceV2Enabled) {
timelineV2Client.stop();
}
return success;
@ -853,16 +865,14 @@ public class ApplicationMaster {
LOG.info("Container completed successfully." + ", containerId="
+ containerStatus.getContainerId());
}
if(timelineClient != null) {
if (timelineServiceV2) {
publishContainerEndEventOnTimelineServiceV2(containerStatus);
} else {
publishContainerEndEvent(
timelineClient, containerStatus, domainId, appSubmitterUgi);
}
if (timelineServiceV2Enabled) {
publishContainerEndEventOnTimelineServiceV2(containerStatus);
} else if (timelineServiceV1Enabled) {
publishContainerEndEvent(timelineClient, containerStatus, domainId,
appSubmitterUgi);
}
}
// ask for more containers if any failed
int askCount = numTotalContainers - numRequestedContainers.get();
numRequestedContainers.addAndGet(askCount);
@ -983,15 +993,13 @@ public class ApplicationMaster {
applicationMaster.nmClientAsync.getContainerStatusAsync(
containerId, container.getNodeId());
}
if(applicationMaster.timelineClient != null) {
if (applicationMaster.timelineServiceV2) {
applicationMaster.publishContainerStartEventOnTimelineServiceV2(
container);
} else {
applicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
}
if (applicationMaster.timelineServiceV2Enabled) {
applicationMaster
.publishContainerStartEventOnTimelineServiceV2(container);
} else if (applicationMaster.timelineServiceV1Enabled) {
applicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
}
}
@ -1371,7 +1379,7 @@ public class ApplicationMaster {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntities(entity);
timelineV2Client.putEntities(entity);
return null;
}
});
@ -1404,7 +1412,7 @@ public class ApplicationMaster {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntities(entity);
timelineV2Client.putEntities(entity);
return null;
}
});
@ -1438,7 +1446,7 @@ public class ApplicationMaster {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntitiesAsync(entity);
timelineV2Client.putEntitiesAsync(entity);
return null;
}
});

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@ -41,12 +42,13 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.yarn.util.resource.Resources;
@InterfaceAudience.Public
@InterfaceStability.Stable
@ -54,7 +56,8 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
AbstractService {
private static final Log LOG = LogFactory.getLog(AMRMClient.class);
private TimelineClient timelineClient;
private TimelineV2Client timelineV2Client;
private boolean timelineServiceV2Enabled;
/**
* Create a new instance of AMRMClient.
@ -79,6 +82,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
nmTokenCache = NMTokenCache.getSingleton();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
timelineServiceV2Enabled = YarnConfiguration.timelineServiceV2Enabled(conf);
}
/**
* Object to represent a single container request for resources. Scheduler
* documentation should be consulted for the specifics of how the parameters
@ -682,19 +691,30 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
}
/**
* Register TimelineClient to AMRMClient.
* @param client the timeline client to register
* Register TimelineV2Client to AMRMClient. Writer's address for the timeline
* V2 client will be updated dynamically if registered.
*
* @param client the timeline v2 client to register
* @throws YarnException when this method is invoked even when ATS V2 is not
* configured.
*/
public void registerTimelineClient(TimelineClient client) {
this.timelineClient = client;
public void registerTimelineV2Client(TimelineV2Client client)
throws YarnException {
if (timelineServiceV2Enabled) {
timelineV2Client = client;
} else {
LOG.error("Trying to register timeline v2 client when not configured.");
throw new YarnException(
"register timeline v2 client when not configured.");
}
}
/**
* Get registered timeline client.
* @return the registered timeline client
* Get registered timeline v2 client.
* @return the registered timeline v2 client
*/
public TimelineClient getRegisteredTimelineClient() {
return this.timelineClient;
public TimelineV2Client getRegisteredTimelineV2Client() {
return this.timelineV2Client;
}
/**

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.client.api.async;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@ -29,8 +27,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
@ -46,13 +44,15 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
/**
* <code>AMRMClientAsync</code> handles communication with the ResourceManager
@ -346,17 +346,20 @@ extends AbstractService {
/**
* Register TimelineClient to AMRMClient.
* @param timelineClient
* @throws YarnException when this method is invoked even when ATS V2 is not
* configured.
*/
public void registerTimelineClient(TimelineClient timelineClient) {
client.registerTimelineClient(timelineClient);
public void registerTimelineV2Client(TimelineV2Client timelineClient)
throws YarnException {
client.registerTimelineV2Client(timelineClient);
}
/**
* Get registered timeline client.
* @return the registered timeline client
*/
public TimelineClient getRegisteredTimelineClient() {
return client.getRegisteredTimelineClient();
public TimelineV2Client getRegisteredTimelineV2Client() {
return client.getRegisteredTimelineV2Client();
}
/**

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
@ -326,7 +326,8 @@ extends AMRMClientAsync<T> {
AllocateResponse response = (AllocateResponse) object;
String collectorAddress = response.getCollectorAddr();
TimelineClient timelineClient = client.getRegisteredTimelineClient();
TimelineV2Client timelineClient =
client.getRegisteredTimelineV2Client();
if (timelineClient != null && collectorAddress != null
&& !collectorAddress.isEmpty()) {
if (collectorAddr == null

View File

@ -141,7 +141,7 @@ public class YarnClientImpl extends YarnClient {
Text timelineService;
@VisibleForTesting
String timelineDTRenewer;
protected boolean timelineServiceEnabled;
private boolean timelineV1ServiceEnabled;
protected boolean timelineServiceBestEffort;
private static final String ROOT = "root";
@ -167,9 +167,14 @@ public class YarnClientImpl extends YarnClient {
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
}
float timelineServiceVersion =
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
timelineServiceEnabled = true;
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
&& ((Float.compare(timelineServiceVersion, 1.0f) == 0)
|| (Float.compare(timelineServiceVersion, 1.5f) == 0))) {
timelineV1ServiceEnabled = true;
timelineDTRenewer = getTimelineDelegationTokenRenewer(conf);
timelineService = TimelineUtils.buildTimelineTokenService(conf);
}
@ -178,7 +183,7 @@ public class YarnClientImpl extends YarnClient {
// TimelineServer which means we are able to get history information
// for applications/applicationAttempts/containers by using ahsClient
// when the TimelineServer is running.
if (timelineServiceEnabled || conf.getBoolean(
if (timelineV1ServiceEnabled || conf.getBoolean(
YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
historyServiceEnabled = true;
@ -257,7 +262,7 @@ public class YarnClientImpl extends YarnClient {
// Automatically add the timeline DT into the CLC
// Only when the security and the timeline service are both enabled
if (isSecurityEnabled() && timelineServiceEnabled) {
if (isSecurityEnabled() && timelineV1ServiceEnabled) {
addTimelineDelegationToken(appContext.getAMContainerSpec());
}

View File

@ -21,14 +21,12 @@ package org.apache.hadoop.yarn.client.api;
import java.io.Flushable;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
@ -39,24 +37,22 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
/**
* A client library that can be used to post some information in terms of a
* number of conceptual entities.
* number of conceptual entities. This client library needs to be used along
* with Timeline V.1.x server versions.
* Refer {@link TimelineV2Client} for ATS V2 interface.
*/
@Public
@Evolving
public abstract class TimelineClient extends AbstractService implements
public abstract class TimelineClient extends CompositeService implements
Flushable {
/**
* Create a timeline client. The current UGI when the user initialize the
* client will be used to do the put and the delegation token operations. The
* current user may use {@link UserGroupInformation#doAs} another user to
* construct and initialize a timeline client if the following operations are
* supposed to be conducted by that user.
*/
private ApplicationId contextAppId;
/**
* Creates an instance of the timeline v.1.x client.
* The current UGI when the user initialize the client will be used to do the
* put and the delegation token operations. The current user may use
* {@link UserGroupInformation#doAs} another user to construct and initialize
* a timeline client if the following operations are supposed to be conducted
* by that user.
*
* @return the created timeline client instance
*/
@ -66,23 +62,8 @@ public abstract class TimelineClient extends AbstractService implements
return client;
}
/**
* Creates an instance of the timeline v.2 client.
*
* @param appId the application id with which the timeline client is
* associated
* @return the created timeline client instance
*/
@Public
public static TimelineClient createTimelineClient(ApplicationId appId) {
TimelineClient client = new TimelineClientImpl(appId);
return client;
}
@Private
protected TimelineClient(String name, ApplicationId appId) {
protected TimelineClient(String name) {
super(name);
setContextAppId(appId);
}
/**
@ -207,57 +188,4 @@ public abstract class TimelineClient extends AbstractService implements
public abstract void cancelDelegationToken(
Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException;
/**
* <p>
* Send the information of a number of conceptual entities to the timeline
* service v.2 collector. It is a blocking API. The method will not return
* until all the put entities have been persisted. If this method is invoked
* for a non-v.2 timeline client instance, a YarnException is thrown.
* </p>
*
* @param entities the collection of {@link
* org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
* @throws IOException
* @throws YarnException
*/
@Public
public abstract void putEntities(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
entities) throws IOException, YarnException;
/**
* <p>
* Send the information of a number of conceptual entities to the timeline
* service v.2 collector. It is an asynchronous API. The method will return
* once all the entities are received. If this method is invoked for a
* non-v.2 timeline client instance, a YarnException is thrown.
* </p>
*
* @param entities the collection of {@link
* org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
* @throws IOException
* @throws YarnException
*/
@Public
public abstract void putEntitiesAsync(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
entities) throws IOException, YarnException;
/**
* <p>
* Update the timeline service address where the request will be sent to.
* </p>
* @param address
* the timeline service address
*/
public abstract void setTimelineServiceAddress(String address);
protected ApplicationId getContextAppId() {
return contextAppId;
}
protected void setContextAppId(ApplicationId appId) {
this.contextAppId = appId;
}
}

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* A client library that can be used to post some information in terms of a
* number of conceptual entities. This client library needs to be used along
* with time line v.2 server version.
* Refer {@link TimelineClient} for ATS V1 interface.
*/
public abstract class TimelineV2Client extends CompositeService {
/**
* Creates an instance of the timeline v.2 client.
*
* @param appId the application id with which the timeline client is
* associated
* @return the created timeline client instance
*/
@Public
public static TimelineV2Client createTimelineClient(ApplicationId appId) {
TimelineV2Client client = new TimelineV2ClientImpl(appId);
return client;
}
protected TimelineV2Client(String name) {
super(name);
}
/**
* <p>
* Send the information of a number of conceptual entities to the timeline
* service v.2 collector. It is a blocking API. The method will not return
* until all the put entities have been persisted.
* </p>
*
* @param entities the collection of {@link TimelineEntity}
* @throws IOException if there are I/O errors
* @throws YarnException if entities are incomplete/invalid
*/
@Public
public abstract void putEntities(TimelineEntity... entities)
throws IOException, YarnException;
/**
* <p>
* Send the information of a number of conceptual entities to the timeline
* service v.2 collector. It is an asynchronous API. The method will return
* once all the entities are received.
* </p>
*
* @param entities the collection of {@link TimelineEntity}
* @throws IOException if there are I/O errors
* @throws YarnException if entities are incomplete/invalid
*/
@Public
public abstract void putEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException;
/**
* <p>
* Update the timeline service address where the request will be sent to.
* </p>
*
* @param address the timeline service address
*/
public abstract void setTimelineServiceAddress(String address);
}

View File

@ -20,32 +20,10 @@ package org.apache.hadoop.yarn.client.api.impl;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@ -57,16 +35,9 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@ -79,19 +50,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import com.sun.jersey.core.util.MultivaluedMapImpl;
@Private
@Evolving
@ -100,9 +61,6 @@ public class TimelineClientImpl extends TimelineClient {
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
private static final Joiner JOINER = Joiner.on("");
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
private static Options opts;
private static final String ENTITY_DATA_TYPE = "entity";
@ -117,179 +75,38 @@ public class TimelineClientImpl extends TimelineClient {
opts.addOption("help", false, "Print usage");
}
private Client client;
private ConnectionConfigurator connConfigurator;
private DelegationTokenAuthenticator authenticator;
private DelegationTokenAuthenticatedURL.Token token;
private UserGroupInformation authUgi;
private String doAsUser;
private Configuration configuration;
@VisibleForTesting
protected DelegationTokenAuthenticatedURL.Token token;
@VisibleForTesting
protected UserGroupInformation authUgi;
@VisibleForTesting
protected String doAsUser;
private float timelineServiceVersion;
private TimelineWriter timelineWriter;
private SSLFactory sslFactory;
private volatile String timelineServiceAddress;
// Retry parameters for identifying new timeline service
// TODO consider to merge with connection retry
private int maxServiceRetries;
private long serviceRetryInterval;
private boolean timelineServiceV2 = false;
private String timelineServiceAddress;
@Private
@VisibleForTesting
TimelineClientConnectionRetry connectionRetry;
private TimelineEntityDispatcher entityDispatcher;
// Abstract class for an operation that should be retried by timeline client
@Private
@VisibleForTesting
public static abstract class TimelineClientRetryOp {
// The operation that should be retried
public abstract Object run() throws IOException;
// The method to indicate if we should retry given the incoming exception
public abstract boolean shouldRetryOn(Exception e);
}
// Class to handle retry
// Outside this class, only visible to tests
@Private
@VisibleForTesting
static class TimelineClientConnectionRetry {
// maxRetries < 0 means keep trying
@Private
@VisibleForTesting
public int maxRetries;
@Private
@VisibleForTesting
public long retryInterval;
// Indicates if retries happened last time. Only tests should read it.
// In unit tests, retryOn() calls should _not_ be concurrent.
private boolean retried = false;
@Private
@VisibleForTesting
boolean getRetired() {
return retried;
}
// Constructor with default retry settings
public TimelineClientConnectionRetry(Configuration conf) {
Preconditions.checkArgument(conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES) >= -1,
"%s property value should be greater than or equal to -1",
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
Preconditions
.checkArgument(
conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
"%s property value should be greater than zero",
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
maxRetries = conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
retryInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
}
public Object retryOn(TimelineClientRetryOp op)
throws RuntimeException, IOException {
int leftRetries = maxRetries;
retried = false;
// keep trying
while (true) {
try {
// try perform the op, if fail, keep retrying
return op.run();
} catch (IOException | RuntimeException e) {
// break if there's no retries left
if (leftRetries == 0) {
break;
}
if (op.shouldRetryOn(e)) {
logException(e, leftRetries);
} else {
throw e;
}
}
if (leftRetries > 0) {
leftRetries--;
}
retried = true;
try {
// sleep for the given time interval
Thread.sleep(retryInterval);
} catch (InterruptedException ie) {
LOG.warn("Client retry sleep interrupted! ");
}
}
throw new RuntimeException("Failed to connect to timeline server. "
+ "Connection retries limit exceeded. "
+ "The posted timeline event may be missing");
};
private void logException(Exception e, int leftRetries) {
if (leftRetries > 0) {
LOG.info("Exception caught by TimelineClientConnectionRetry,"
+ " will try " + leftRetries + " more time(s).\nMessage: "
+ e.getMessage());
} else {
// note that maxRetries may be -1 at the very beginning
LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
+ " will keep retrying.\nMessage: "
+ e.getMessage());
}
}
}
private class TimelineJerseyRetryFilter extends ClientFilter {
@Override
public ClientResponse handle(final ClientRequest cr)
throws ClientHandlerException {
// Set up the retry operation
TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
@Override
public Object run() {
// Try pass the request, if fail, keep retrying
return getNext().handle(cr);
}
@Override
public boolean shouldRetryOn(Exception e) {
// Only retry on connection exceptions
return (e instanceof ClientHandlerException)
&& (e.getCause() instanceof ConnectException ||
e.getCause() instanceof SocketTimeoutException);
}
};
try {
return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
} catch (IOException e) {
throw new ClientHandlerException("Jersey retry failed!\nMessage: "
+ e.getMessage());
}
}
}
TimelineConnector connector;
public TimelineClientImpl() {
super(TimelineClientImpl.class.getName(), null);
}
public TimelineClientImpl(ApplicationId applicationId) {
super(TimelineClientImpl.class.getName(), applicationId);
this.timelineServiceV2 = true;
super(TimelineClientImpl.class.getName());
}
protected void serviceInit(Configuration conf) throws Exception {
this.configuration = conf;
timelineServiceVersion =
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
LOG.info("Timeline service address: " + getTimelineServiceAddress());
if (!YarnConfiguration.timelineServiceEnabled(conf)
|| !((Float.compare(this.timelineServiceVersion, 1.0f) == 0)
|| (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) {
throw new IOException("Timeline V1 client is not properly configured. "
+ "Either timeline service is not enabled or version is not set to"
+ " 1.x");
}
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
if (realUgi != null) {
@ -299,62 +116,34 @@ public class TimelineClientImpl extends TimelineClient {
authUgi = ugi;
doAsUser = null;
}
ClientConfig cc = new DefaultClientConfig();
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
connConfigurator = initConnConfigurator(conf);
if (UserGroupInformation.isSecurityEnabled()) {
authenticator = new KerberosDelegationTokenAuthenticator();
} else {
authenticator = new PseudoDelegationTokenAuthenticator();
}
authenticator.setConnectionConfigurator(connConfigurator);
token = new DelegationTokenAuthenticatedURL.Token();
connector = createTimelineConnector();
connectionRetry = new TimelineClientConnectionRetry(conf);
client = new Client(new URLConnectionClientHandler(
new TimelineURLConnectionFactory()), cc);
TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
// TODO need to cleanup filter retry later.
if (!timelineServiceV2) {
client.addFilter(retryFilter);
}
// old version timeline service need to get address from configuration
// while new version need to auto discovery (with retry).
if (timelineServiceV2) {
maxServiceRetries = conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
serviceRetryInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
entityDispatcher = new TimelineEntityDispatcher(conf);
if (YarnConfiguration.useHttps(conf)) {
timelineServiceAddress =
conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
} else {
if (YarnConfiguration.useHttps(conf)) {
setTimelineServiceAddress(conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
} else {
setTimelineServiceAddress(conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
}
timelineServiceVersion =
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
LOG.info("Timeline service address: " + getTimelineServiceAddress());
timelineServiceAddress =
conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
}
super.serviceInit(conf);
}
@VisibleForTesting
protected TimelineConnector createTimelineConnector() {
TimelineConnector newConnector =
new TimelineConnector(true, authUgi, doAsUser, token);
addIfService(newConnector);
return newConnector;
}
@Override
protected void serviceStart() throws Exception {
if (timelineServiceV2) {
entityDispatcher.start();
} else {
timelineWriter = createTimelineWriter(configuration, authUgi, client,
constructResURI(getConfig(), timelineServiceAddress, false));
}
timelineWriter = createTimelineWriter(getConfig(), authUgi,
connector.getClient(), TimelineConnector.constructResURI(getConfig(),
timelineServiceAddress, RESOURCE_URI_STR_V1));
}
protected TimelineWriter createTimelineWriter(Configuration conf,
@ -373,12 +162,6 @@ public class TimelineClientImpl extends TimelineClient {
if (this.timelineWriter != null) {
this.timelineWriter.close();
}
if (timelineServiceV2) {
entityDispatcher.stop();
}
if (this.sslFactory != null) {
this.sslFactory.destroy();
}
super.serviceStop();
}
@ -390,132 +173,17 @@ public class TimelineClientImpl extends TimelineClient {
}
@Override
public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException {
public TimelinePutResponse putEntities(TimelineEntity... entities)
throws IOException, YarnException {
return timelineWriter.putEntities(entities);
}
@Override
public void putEntities(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
entities) throws IOException, YarnException {
if (!timelineServiceV2) {
throw new YarnException("v.2 method is invoked on a v.1.x client");
}
entityDispatcher.dispatchEntities(true, entities);
}
@Override
public void putEntitiesAsync(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
entities) throws IOException, YarnException {
if (!timelineServiceV2) {
throw new YarnException("v.2 method is invoked on a v.1.x client");
}
entityDispatcher.dispatchEntities(false, entities);
}
@Override
public void putDomain(TimelineDomain domain) throws IOException,
YarnException {
timelineWriter.putDomain(domain);
}
// Used for new timeline service only
@Private
protected void putObjects(String path, MultivaluedMap<String, String> params,
Object obj) throws IOException, YarnException {
int retries = verifyRestEndPointAvailable();
// timelineServiceAddress could be stale, add retry logic here.
boolean needRetry = true;
while (needRetry) {
try {
URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
putObjects(uri, path, params, obj);
needRetry = false;
} catch (IOException e) {
// handle exception for timelineServiceAddress being updated.
checkRetryWithSleep(retries, e);
retries--;
}
}
}
private int verifyRestEndPointAvailable() throws YarnException {
// timelineServiceAddress could haven't be initialized yet
// or stale (only for new timeline service)
int retries = pollTimelineServiceAddress(this.maxServiceRetries);
if (timelineServiceAddress == null) {
String errMessage = "TimelineClient has reached to max retry times : "
+ this.maxServiceRetries
+ ", but failed to fetch timeline service address. Please verify"
+ " Timeline Auxiliary Service is configured in all the NMs";
LOG.error(errMessage);
throw new YarnException(errMessage);
}
return retries;
}
/**
* Check if reaching to maximum of retries.
* @param retries
* @param e
*/
private void checkRetryWithSleep(int retries, IOException e)
throws YarnException, IOException {
if (retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while retrying to connect to ATS");
}
} else {
StringBuilder msg =
new StringBuilder("TimelineClient has reached to max retry times : ");
msg.append(this.maxServiceRetries);
msg.append(" for service address: ");
msg.append(timelineServiceAddress);
LOG.error(msg.toString());
throw new IOException(msg.toString(), e);
}
}
protected void putObjects(
URI base, String path, MultivaluedMap<String, String> params, Object obj)
throws IOException, YarnException {
ClientResponse resp;
try {
resp = client.resource(base).path(path).queryParams(params)
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.put(ClientResponse.class, obj);
} catch (RuntimeException re) {
// runtime exception is expected if the client cannot connect the server
String msg =
"Failed to get the response from the timeline server.";
LOG.error(msg, re);
throw new IOException(re);
}
if (resp == null ||
resp.getStatusInfo().getStatusCode() !=
ClientResponse.Status.OK.getStatusCode()) {
String msg = "Response from the timeline server is " +
((resp == null) ? "null":
"not successful," + " HTTP error code: " + resp.getStatus()
+ ", Server response:\n" + resp.getEntity(String.class));
LOG.error(msg);
throw new YarnException(msg);
}
}
@Override
public void setTimelineServiceAddress(String address) {
this.timelineServiceAddress = address;
}
private String getTimelineServiceAddress() {
return this.timelineServiceAddress;
}
@ -532,17 +200,17 @@ public class TimelineClientImpl extends TimelineClient {
public Token<TimelineDelegationTokenIdentifier> run()
throws Exception {
DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
connector.getDelegationTokenAuthenticatedURL();
// TODO we should add retry logic here if timelineServiceAddress is
// not available immediately.
return (Token) authUrl.getDelegationToken(
constructResURI(getConfig(),
getTimelineServiceAddress(), false).toURL(),
TimelineConnector.constructResURI(getConfig(),
getTimelineServiceAddress(), RESOURCE_URI_STR_V1).toURL(),
token, renewer, doAsUser);
}
};
return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
return (Token<TimelineDelegationTokenIdentifier>) connector
.operateDelegationToken(getDTAction);
}
@SuppressWarnings("unchecked")
@ -568,26 +236,26 @@ public class TimelineClientImpl extends TimelineClient {
token.setDelegationToken((Token) timelineDT);
}
DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
connector.getDelegationTokenAuthenticatedURL();
// If the token service address is not available, fall back to use
// the configured service address.
final URI serviceURI = isTokenServiceAddrEmpty ?
constructResURI(getConfig(), getTimelineServiceAddress(), false)
final URI serviceURI = isTokenServiceAddrEmpty
? TimelineConnector.constructResURI(getConfig(),
getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
: new URI(scheme, null, address.getHostName(),
address.getPort(), RESOURCE_URI_STR_V1, null, null);
address.getPort(), RESOURCE_URI_STR_V1, null, null);
return authUrl
.renewDelegationToken(serviceURI.toURL(), token, doAsUser);
}
};
return (Long) operateDelegationToken(renewDTAction);
return (Long) connector.operateDelegationToken(renewDTAction);
}
@SuppressWarnings("unchecked")
@Override
public void cancelDelegationToken(
final Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException {
throws IOException, YarnException {
final boolean isTokenServiceAddrEmpty =
timelineDT.getService().toString().isEmpty();
final String scheme = isTokenServiceAddrEmpty ? null
@ -607,134 +275,29 @@ public class TimelineClientImpl extends TimelineClient {
token.setDelegationToken((Token) timelineDT);
}
DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
connector.getDelegationTokenAuthenticatedURL();
// If the token service address is not available, fall back to use
// the configured service address.
final URI serviceURI = isTokenServiceAddrEmpty ?
constructResURI(getConfig(), getTimelineServiceAddress(), false)
final URI serviceURI = isTokenServiceAddrEmpty
? TimelineConnector.constructResURI(getConfig(),
getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
: new URI(scheme, null, address.getHostName(),
address.getPort(), RESOURCE_URI_STR_V1, null, null);
address.getPort(), RESOURCE_URI_STR_V1, null, null);
authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
return null;
}
};
operateDelegationToken(cancelDTAction);
connector.operateDelegationToken(cancelDTAction);
}
@Override
public String toString() {
return super.toString() + " with timeline server "
+ constructResURI(getConfig(), getTimelineServiceAddress(), false)
+ TimelineConnector.constructResURI(getConfig(),
getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
+ " and writer " + timelineWriter;
}
private Object operateDelegationToken(
final PrivilegedExceptionAction<?> action)
throws IOException, YarnException {
// Set up the retry operation
TimelineClientRetryOp tokenRetryOp =
createTimelineClientRetryOpForOperateDelegationToken(action);
return connectionRetry.retryOn(tokenRetryOp);
}
/**
* Poll TimelineServiceAddress for maximum of retries times if it is null.
*
* @param retries
* @return the left retry times
* @throws IOException
*/
private int pollTimelineServiceAddress(int retries) throws YarnException {
while (timelineServiceAddress == null && retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while trying to connect ATS");
}
retries--;
}
return retries;
}
private class TimelineURLConnectionFactory
implements HttpURLConnectionFactory {
@Override
public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
authUgi.checkTGTAndReloginFromKeytab();
try {
return new DelegationTokenAuthenticatedURL(
authenticator, connConfigurator).openConnection(url, token,
doAsUser);
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (AuthenticationException ae) {
throw new IOException(ae);
}
}
}
private ConnectionConfigurator initConnConfigurator(Configuration conf) {
try {
return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
} catch (Exception e) {
LOG.debug("Cannot load customized ssl related configuration. " +
"Fallback to system-generic settings.", e);
return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
}
}
private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
return conn;
}
};
private ConnectionConfigurator initSslConnConfigurator(final int timeout,
Configuration conf) throws IOException, GeneralSecurityException {
final SSLSocketFactory sf;
final HostnameVerifier hv;
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
sslFactory.init();
sf = sslFactory.createSSLSocketFactory();
hv = sslFactory.getHostnameVerifier();
return new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
if (conn instanceof HttpsURLConnection) {
HttpsURLConnection c = (HttpsURLConnection) conn;
c.setSSLSocketFactory(sf);
c.setHostnameVerifier(hv);
}
setTimeouts(conn, timeout);
return conn;
}
};
}
private static void setTimeouts(URLConnection connection, int socketTimeout) {
connection.setConnectTimeout(socketTimeout);
connection.setReadTimeout(socketTimeout);
}
private static URI constructResURI(
Configuration conf, String address, boolean v2) {
return URI.create(
JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
}
public static void main(String[] argv) throws Exception {
CommandLine cliParser = new GnuParser().parse(opts, argv);
if (cliParser.hasOption("put")) {
@ -870,266 +433,4 @@ public class TimelineClientImpl extends TimelineClient {
public void setTimelineWriter(TimelineWriter writer) {
this.timelineWriter = writer;
}
@Private
@VisibleForTesting
public TimelineClientRetryOp
createTimelineClientRetryOpForOperateDelegationToken(
final PrivilegedExceptionAction<?> action) throws IOException {
return new TimelineClientRetryOpForOperateDelegationToken(
this.authUgi, action);
}
@Private
@VisibleForTesting
public class TimelineClientRetryOpForOperateDelegationToken
extends TimelineClientRetryOp {
private final UserGroupInformation authUgi;
private final PrivilegedExceptionAction<?> action;
public TimelineClientRetryOpForOperateDelegationToken(
UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
this.authUgi = authUgi;
this.action = action;
}
@Override
public Object run() throws IOException {
// Try pass the request, if fail, keep retrying
authUgi.checkTGTAndReloginFromKeytab();
try {
return authUgi.doAs(action);
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public boolean shouldRetryOn(Exception e) {
// retry on connection exceptions
// and SocketTimeoutException
return (e instanceof ConnectException
|| e instanceof SocketTimeoutException);
}
}
private final class EntitiesHolder extends FutureTask<Void> {
private final
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
entities;
private final boolean isSync;
EntitiesHolder(
final
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
entities,
final boolean isSync) {
super(new Callable<Void>() {
// publishEntities()
public Void call() throws Exception {
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
params.add("appid", getContextAppId().toString());
params.add("async", Boolean.toString(!isSync));
putObjects("entities", params, entities);
return null;
}
});
this.entities = entities;
this.isSync = isSync;
}
public boolean isSync() {
return isSync;
}
public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
getEntities() {
return entities;
}
}
/**
* This class is responsible for collecting the timeline entities and
* publishing them in async.
*/
private class TimelineEntityDispatcher {
/**
* Time period for which the timelineclient will wait for draining after
* stop.
*/
private static final long DRAIN_TIME_PERIOD = 2000L;
private int numberOfAsyncsToMerge;
private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
private ExecutorService executor;
TimelineEntityDispatcher(Configuration conf) {
timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
numberOfAsyncsToMerge =
conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
}
Runnable createRunnable() {
return new Runnable() {
@Override
public void run() {
try {
EntitiesHolder entitiesHolder;
while (!Thread.currentThread().isInterrupted()) {
// Merge all the async calls and make one push, but if its sync
// call push immediately
try {
entitiesHolder = timelineEntityQueue.take();
} catch (InterruptedException ie) {
LOG.info("Timeline dispatcher thread was interrupted ");
Thread.currentThread().interrupt();
return;
}
if (entitiesHolder != null) {
publishWithoutBlockingOnQueue(entitiesHolder);
}
}
} finally {
if (!timelineEntityQueue.isEmpty()) {
LOG.info("Yet to publish " + timelineEntityQueue.size()
+ " timelineEntities, draining them now. ");
}
// Try to drain the remaining entities to be published @ the max for
// 2 seconds
long timeTillweDrain =
System.currentTimeMillis() + DRAIN_TIME_PERIOD;
while (!timelineEntityQueue.isEmpty()) {
publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
if (System.currentTimeMillis() > timeTillweDrain) {
// time elapsed stop publishing further....
if (!timelineEntityQueue.isEmpty()) {
LOG.warn("Time to drain elapsed! Remaining "
+ timelineEntityQueue.size() + "timelineEntities will not"
+ " be published");
// if some entities were not drained then we need interrupt
// the threads which had put sync EntityHolders to the queue.
EntitiesHolder nextEntityInTheQueue = null;
while ((nextEntityInTheQueue =
timelineEntityQueue.poll()) != null) {
nextEntityInTheQueue.cancel(true);
}
}
break;
}
}
}
}
/**
* Publishes the given EntitiesHolder and return immediately if sync
* call, else tries to fetch the EntitiesHolder from the queue in non
* blocking fashion and collate the Entities if possible before
* publishing through REST.
*
* @param entitiesHolder
*/
private void publishWithoutBlockingOnQueue(
EntitiesHolder entitiesHolder) {
if (entitiesHolder.isSync()) {
entitiesHolder.run();
return;
}
int count = 1;
while (true) {
// loop till we find a sync put Entities or there is nothing
// to take
EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
if (nextEntityInTheQueue == null) {
// Nothing in the queue just publish and get back to the
// blocked wait state
entitiesHolder.run();
break;
} else if (nextEntityInTheQueue.isSync()) {
// flush all the prev async entities first
entitiesHolder.run();
// and then flush the sync entity
nextEntityInTheQueue.run();
break;
} else {
// append all async entities together and then flush
entitiesHolder.getEntities().addEntities(
nextEntityInTheQueue.getEntities().getEntities());
count++;
if (count == numberOfAsyncsToMerge) {
// Flush the entities if the number of the async
// putEntites merged reaches the desired limit. To avoid
// collecting multiple entities and delaying for a long
// time.
entitiesHolder.run();
break;
}
}
}
}
};
}
public void dispatchEntities(boolean sync,
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[]
entitiesTobePublished) throws YarnException {
if (executor.isShutdown()) {
throw new YarnException("Timeline client is in the process of stopping,"
+ " not accepting any more TimelineEntities");
}
// wrap all TimelineEntity into TimelineEntities object
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
entities =
new org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntities();
for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
entity : entitiesTobePublished) {
entities.addEntity(entity);
}
// created a holder and place it in queue
EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
try {
timelineEntityQueue.put(entitiesHolder);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException(
"Failed while adding entity to the queue for publishing", e);
}
if (sync) {
// In sync call we need to wait till its published and if any error then
// throw it back
try {
entitiesHolder.get();
} catch (ExecutionException e) {
throw new YarnException("Failed while publishing entity",
e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while publishing entity", e);
}
}
}
public void start() {
executor = Executors.newSingleThreadExecutor();
executor.execute(createRunnable());
}
public void stop() {
LOG.info("Stopping TimelineClient.");
executor.shutdownNow();
try {
executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,440 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
/**
* Utility Connector class which is used by timeline clients to securely get
* connected to the timeline server.
*
*/
public class TimelineConnector extends AbstractService {
private static final Joiner JOINER = Joiner.on("");
private static final Log LOG = LogFactory.getLog(TimelineConnector.class);
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
private SSLFactory sslFactory;
private Client client;
private ConnectionConfigurator connConfigurator;
private DelegationTokenAuthenticator authenticator;
private DelegationTokenAuthenticatedURL.Token token;
private UserGroupInformation authUgi;
private String doAsUser;
@VisibleForTesting
TimelineClientConnectionRetry connectionRetry;
private boolean requireConnectionRetry;
public TimelineConnector(boolean requireConnectionRetry,
UserGroupInformation authUgi, String doAsUser,
DelegationTokenAuthenticatedURL.Token token) {
super("TimelineConnector");
this.requireConnectionRetry = requireConnectionRetry;
this.authUgi = authUgi;
this.doAsUser = doAsUser;
this.token = token;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
ClientConfig cc = new DefaultClientConfig();
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
sslFactory = getSSLFactory(conf);
connConfigurator = getConnConfigurator(sslFactory);
if (UserGroupInformation.isSecurityEnabled()) {
authenticator = new KerberosDelegationTokenAuthenticator();
} else {
authenticator = new PseudoDelegationTokenAuthenticator();
}
authenticator.setConnectionConfigurator(connConfigurator);
connectionRetry = new TimelineClientConnectionRetry(conf);
client =
new Client(
new URLConnectionClientHandler(new TimelineURLConnectionFactory(
authUgi, authenticator, connConfigurator, token, doAsUser)),
cc);
if (requireConnectionRetry) {
TimelineJerseyRetryFilter retryFilter =
new TimelineJerseyRetryFilter(connectionRetry);
client.addFilter(retryFilter);
}
}
private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR
= new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
return conn;
}
};
private ConnectionConfigurator getConnConfigurator(SSLFactory sslFactoryObj) {
try {
return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, sslFactoryObj);
} catch (Exception e) {
LOG.debug("Cannot load customized ssl related configuration. "
+ "Fallback to system-generic settings.", e);
return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
}
}
private static ConnectionConfigurator initSslConnConfigurator(
final int timeout, SSLFactory sslFactory)
throws IOException, GeneralSecurityException {
final SSLSocketFactory sf;
final HostnameVerifier hv;
sf = sslFactory.createSSLSocketFactory();
hv = sslFactory.getHostnameVerifier();
return new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
if (conn instanceof HttpsURLConnection) {
HttpsURLConnection c = (HttpsURLConnection) conn;
c.setSSLSocketFactory(sf);
c.setHostnameVerifier(hv);
}
setTimeouts(conn, timeout);
return conn;
}
};
}
protected SSLFactory getSSLFactory(Configuration conf)
throws GeneralSecurityException, IOException {
SSLFactory newSSLFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
newSSLFactory.init();
return newSSLFactory;
}
private static void setTimeouts(URLConnection connection, int socketTimeout) {
connection.setConnectTimeout(socketTimeout);
connection.setReadTimeout(socketTimeout);
}
public static URI constructResURI(Configuration conf, String address,
String uri) {
return URI.create(
JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
address, uri));
}
DelegationTokenAuthenticatedURL getDelegationTokenAuthenticatedURL() {
return new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
}
protected void serviceStop() {
if (this.sslFactory != null) {
this.sslFactory.destroy();
}
}
public Client getClient() {
return client;
}
public Object operateDelegationToken(
final PrivilegedExceptionAction<?> action)
throws IOException, YarnException {
// Set up the retry operation
TimelineClientRetryOp tokenRetryOp =
createRetryOpForOperateDelegationToken(action);
return connectionRetry.retryOn(tokenRetryOp);
}
@Private
@VisibleForTesting
TimelineClientRetryOp createRetryOpForOperateDelegationToken(
final PrivilegedExceptionAction<?> action) throws IOException {
return new TimelineClientRetryOpForOperateDelegationToken(this.authUgi,
action);
}
/**
* Abstract class for an operation that should be retried by timeline client.
*/
@Private
@VisibleForTesting
public static abstract class TimelineClientRetryOp {
// The operation that should be retried
public abstract Object run() throws IOException;
// The method to indicate if we should retry given the incoming exception
public abstract boolean shouldRetryOn(Exception e);
}
private static class TimelineURLConnectionFactory
implements HttpURLConnectionFactory {
private DelegationTokenAuthenticator authenticator;
private UserGroupInformation authUgi;
private ConnectionConfigurator connConfigurator;
private Token token;
private String doAsUser;
public TimelineURLConnectionFactory(UserGroupInformation authUgi,
DelegationTokenAuthenticator authenticator,
ConnectionConfigurator connConfigurator,
DelegationTokenAuthenticatedURL.Token token, String doAsUser) {
this.authUgi = authUgi;
this.authenticator = authenticator;
this.connConfigurator = connConfigurator;
this.token = token;
this.doAsUser = doAsUser;
}
@Override
public HttpURLConnection getHttpURLConnection(final URL url)
throws IOException {
authUgi.checkTGTAndReloginFromKeytab();
try {
return new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator).openConnection(url, token, doAsUser);
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (AuthenticationException ae) {
throw new IOException(ae);
}
}
}
// Class to handle retry
// Outside this class, only visible to tests
@Private
@VisibleForTesting
static class TimelineClientConnectionRetry {
// maxRetries < 0 means keep trying
@Private
@VisibleForTesting
public int maxRetries;
@Private
@VisibleForTesting
public long retryInterval;
// Indicates if retries happened last time. Only tests should read it.
// In unit tests, retryOn() calls should _not_ be concurrent.
private boolean retried = false;
@Private
@VisibleForTesting
boolean getRetired() {
return retried;
}
// Constructor with default retry settings
public TimelineClientConnectionRetry(Configuration conf) {
Preconditions.checkArgument(
conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES)
>= -1,
"%s property value should be greater than or equal to -1",
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
Preconditions.checkArgument(
conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.
DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
"%s property value should be greater than zero",
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
maxRetries =
conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
retryInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
}
public Object retryOn(TimelineClientRetryOp op)
throws RuntimeException, IOException {
int leftRetries = maxRetries;
retried = false;
// keep trying
while (true) {
try {
// try perform the op, if fail, keep retrying
return op.run();
} catch (IOException | RuntimeException e) {
// break if there's no retries left
if (leftRetries == 0) {
break;
}
if (op.shouldRetryOn(e)) {
logException(e, leftRetries);
} else {
throw e;
}
}
if (leftRetries > 0) {
leftRetries--;
}
retried = true;
try {
// sleep for the given time interval
Thread.sleep(retryInterval);
} catch (InterruptedException ie) {
LOG.warn("Client retry sleep interrupted! ");
}
}
throw new RuntimeException("Failed to connect to timeline server. "
+ "Connection retries limit exceeded. "
+ "The posted timeline event may be missing");
};
private void logException(Exception e, int leftRetries) {
if (leftRetries > 0) {
LOG.info(
"Exception caught by TimelineClientConnectionRetry," + " will try "
+ leftRetries + " more time(s).\nMessage: " + e.getMessage());
} else {
// note that maxRetries may be -1 at the very beginning
LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
+ " will keep retrying.\nMessage: " + e.getMessage());
}
}
}
private static class TimelineJerseyRetryFilter extends ClientFilter {
private TimelineClientConnectionRetry connectionRetry;
public TimelineJerseyRetryFilter(
TimelineClientConnectionRetry connectionRetry) {
this.connectionRetry = connectionRetry;
}
@Override
public ClientResponse handle(final ClientRequest cr)
throws ClientHandlerException {
// Set up the retry operation
TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
@Override
public Object run() {
// Try pass the request, if fail, keep retrying
return getNext().handle(cr);
}
@Override
public boolean shouldRetryOn(Exception e) {
// Only retry on connection exceptions
return (e instanceof ClientHandlerException)
&& (e.getCause() instanceof ConnectException
|| e.getCause() instanceof SocketTimeoutException);
}
};
try {
return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
} catch (IOException e) {
throw new ClientHandlerException(
"Jersey retry failed!\nMessage: " + e.getMessage());
}
}
}
@Private
@VisibleForTesting
public static class TimelineClientRetryOpForOperateDelegationToken
extends TimelineClientRetryOp {
private final UserGroupInformation authUgi;
private final PrivilegedExceptionAction<?> action;
public TimelineClientRetryOpForOperateDelegationToken(
UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
this.authUgi = authUgi;
this.action = action;
}
@Override
public Object run() throws IOException {
// Try pass the request, if fail, keep retrying
authUgi.checkTGTAndReloginFromKeytab();
try {
return authUgi.doAs(action);
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public boolean shouldRetryOn(Exception e) {
// retry on connection exceptions
// and SocketTimeoutException
return (e instanceof ConnectException
|| e instanceof SocketTimeoutException);
}
}
}

View File

@ -0,0 +1,459 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.core.util.MultivaluedMapImpl;
/**
* Implementation of timeline v2 client interface.
*
*/
public class TimelineV2ClientImpl extends TimelineV2Client {
private static final Log LOG = LogFactory.getLog(TimelineV2ClientImpl.class);
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
private TimelineEntityDispatcher entityDispatcher;
private volatile String timelineServiceAddress;
// Retry parameters for identifying new timeline service
// TODO consider to merge with connection retry
private int maxServiceRetries;
private long serviceRetryInterval;
private TimelineConnector connector;
private ApplicationId contextAppId;
public TimelineV2ClientImpl(ApplicationId appId) {
super(TimelineV2ClientImpl.class.getName());
this.contextAppId = appId;
}
public ApplicationId getContextAppId() {
return contextAppId;
}
protected void serviceInit(Configuration conf) throws Exception {
if (!YarnConfiguration.timelineServiceEnabled(conf)
|| (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) {
throw new IOException("Timeline V2 client is not properly configured. "
+ "Either timeline service is not enabled or version is not set to"
+ " 2");
}
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
String doAsUser = null;
UserGroupInformation authUgi = null;
if (realUgi != null) {
authUgi = realUgi;
doAsUser = ugi.getShortUserName();
} else {
authUgi = ugi;
doAsUser = null;
}
// TODO need to add/cleanup filter retry later for ATSV2. similar to V1
DelegationTokenAuthenticatedURL.Token token =
new DelegationTokenAuthenticatedURL.Token();
connector = new TimelineConnector(false, authUgi, doAsUser, token);
addIfService(connector);
// new version need to auto discovery (with retry till ATS v2 address is
// got).
maxServiceRetries =
conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
serviceRetryInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
entityDispatcher = new TimelineEntityDispatcher(conf);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
entityDispatcher.start();
}
@Override
protected void serviceStop() throws Exception {
entityDispatcher.stop();
super.serviceStop();
}
@Override
public void putEntities(TimelineEntity... entities)
throws IOException, YarnException {
entityDispatcher.dispatchEntities(true, entities);
}
@Override
public void putEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException {
entityDispatcher.dispatchEntities(false, entities);
}
@Override
public void setTimelineServiceAddress(String address) {
this.timelineServiceAddress = address;
}
@Private
protected void putObjects(String path, MultivaluedMap<String, String> params,
Object obj) throws IOException, YarnException {
int retries = verifyRestEndPointAvailable();
// timelineServiceAddress could be stale, add retry logic here.
boolean needRetry = true;
while (needRetry) {
try {
URI uri = TimelineConnector.constructResURI(getConfig(),
timelineServiceAddress, RESOURCE_URI_STR_V2);
putObjects(uri, path, params, obj);
needRetry = false;
} catch (IOException e) {
// handle exception for timelineServiceAddress being updated.
checkRetryWithSleep(retries, e);
retries--;
}
}
}
/**
* Check if reaching to maximum of retries.
*
* @param retries
* @param e
*/
private void checkRetryWithSleep(int retries, IOException e)
throws YarnException, IOException {
if (retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while retrying to connect to ATS");
}
} else {
StringBuilder msg =
new StringBuilder("TimelineClient has reached to max retry times : ");
msg.append(this.maxServiceRetries);
msg.append(" for service address: ");
msg.append(timelineServiceAddress);
LOG.error(msg.toString());
throw new IOException(msg.toString(), e);
}
}
protected void putObjects(URI base, String path,
MultivaluedMap<String, String> params, Object obj)
throws IOException, YarnException {
ClientResponse resp;
try {
resp = connector.getClient().resource(base).path(path).queryParams(params)
.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
.put(ClientResponse.class, obj);
} catch (RuntimeException re) {
// runtime exception is expected if the client cannot connect the server
String msg = "Failed to get the response from the timeline server.";
LOG.error(msg, re);
throw new IOException(re);
}
if (resp == null || resp.getStatusInfo()
.getStatusCode() != ClientResponse.Status.OK.getStatusCode()) {
String msg =
"Response from the timeline server is " + ((resp == null) ? "null"
: "not successful," + " HTTP error code: " + resp.getStatus()
+ ", Server response:\n" + resp.getEntity(String.class));
LOG.error(msg);
throw new YarnException(msg);
}
}
private int verifyRestEndPointAvailable() throws YarnException {
// timelineServiceAddress could haven't be initialized yet
// or stale (only for new timeline service)
int retries = pollTimelineServiceAddress(this.maxServiceRetries);
if (timelineServiceAddress == null) {
String errMessage = "TimelineClient has reached to max retry times : "
+ this.maxServiceRetries
+ ", but failed to fetch timeline service address. Please verify"
+ " Timeline Auxiliary Service is configured in all the NMs";
LOG.error(errMessage);
throw new YarnException(errMessage);
}
return retries;
}
/**
* Poll TimelineServiceAddress for maximum of retries times if it is null.
*
* @param retries
* @return the left retry times
* @throws IOException
*/
private int pollTimelineServiceAddress(int retries) throws YarnException {
while (timelineServiceAddress == null && retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while trying to connect ATS");
}
retries--;
}
return retries;
}
private final class EntitiesHolder extends FutureTask<Void> {
private final TimelineEntities entities;
private final boolean isSync;
EntitiesHolder(final TimelineEntities entities, final boolean isSync) {
super(new Callable<Void>() {
// publishEntities()
public Void call() throws Exception {
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
params.add("appid", getContextAppId().toString());
params.add("async", Boolean.toString(!isSync));
putObjects("entities", params, entities);
return null;
}
});
this.entities = entities;
this.isSync = isSync;
}
public boolean isSync() {
return isSync;
}
public TimelineEntities getEntities() {
return entities;
}
}
/**
* This class is responsible for collecting the timeline entities and
* publishing them in async.
*/
private class TimelineEntityDispatcher {
/**
* Time period for which the timelineclient will wait for draining after
* stop.
*/
private static final long DRAIN_TIME_PERIOD = 2000L;
private int numberOfAsyncsToMerge;
private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
private ExecutorService executor;
TimelineEntityDispatcher(Configuration conf) {
timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
numberOfAsyncsToMerge =
conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
}
Runnable createRunnable() {
return new Runnable() {
@Override
public void run() {
try {
EntitiesHolder entitiesHolder;
while (!Thread.currentThread().isInterrupted()) {
// Merge all the async calls and make one push, but if its sync
// call push immediately
try {
entitiesHolder = timelineEntityQueue.take();
} catch (InterruptedException ie) {
LOG.info("Timeline dispatcher thread was interrupted ");
Thread.currentThread().interrupt();
return;
}
if (entitiesHolder != null) {
publishWithoutBlockingOnQueue(entitiesHolder);
}
}
} finally {
if (!timelineEntityQueue.isEmpty()) {
LOG.info("Yet to publish " + timelineEntityQueue.size()
+ " timelineEntities, draining them now. ");
}
// Try to drain the remaining entities to be published @ the max for
// 2 seconds
long timeTillweDrain =
System.currentTimeMillis() + DRAIN_TIME_PERIOD;
while (!timelineEntityQueue.isEmpty()) {
publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
if (System.currentTimeMillis() > timeTillweDrain) {
// time elapsed stop publishing further....
if (!timelineEntityQueue.isEmpty()) {
LOG.warn("Time to drain elapsed! Remaining "
+ timelineEntityQueue.size() + "timelineEntities will not"
+ " be published");
// if some entities were not drained then we need interrupt
// the threads which had put sync EntityHolders to the queue.
EntitiesHolder nextEntityInTheQueue = null;
while ((nextEntityInTheQueue =
timelineEntityQueue.poll()) != null) {
nextEntityInTheQueue.cancel(true);
}
}
break;
}
}
}
}
/**
* Publishes the given EntitiesHolder and return immediately if sync
* call, else tries to fetch the EntitiesHolder from the queue in non
* blocking fashion and collate the Entities if possible before
* publishing through REST.
*
* @param entitiesHolder
*/
private void publishWithoutBlockingOnQueue(
EntitiesHolder entitiesHolder) {
if (entitiesHolder.isSync()) {
entitiesHolder.run();
return;
}
int count = 1;
while (true) {
// loop till we find a sync put Entities or there is nothing
// to take
EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
if (nextEntityInTheQueue == null) {
// Nothing in the queue just publish and get back to the
// blocked wait state
entitiesHolder.run();
break;
} else if (nextEntityInTheQueue.isSync()) {
// flush all the prev async entities first
entitiesHolder.run();
// and then flush the sync entity
nextEntityInTheQueue.run();
break;
} else {
// append all async entities together and then flush
entitiesHolder.getEntities().addEntities(
nextEntityInTheQueue.getEntities().getEntities());
count++;
if (count == numberOfAsyncsToMerge) {
// Flush the entities if the number of the async
// putEntites merged reaches the desired limit. To avoid
// collecting multiple entities and delaying for a long
// time.
entitiesHolder.run();
break;
}
}
}
}
};
}
public void dispatchEntities(boolean sync,
TimelineEntity[] entitiesTobePublished) throws YarnException {
if (executor.isShutdown()) {
throw new YarnException("Timeline client is in the process of stopping,"
+ " not accepting any more TimelineEntities");
}
// wrap all TimelineEntity into TimelineEntities object
TimelineEntities entities = new TimelineEntities();
for (TimelineEntity entity : entitiesTobePublished) {
entities.addEntity(entity);
}
// created a holder and place it in queue
EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
try {
timelineEntityQueue.put(entitiesHolder);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException(
"Failed while adding entity to the queue for publishing", e);
}
if (sync) {
// In sync call we need to wait till its published and if any error then
// throw it back
try {
entitiesHolder.get();
} catch (ExecutionException e) {
throw new YarnException("Failed while publishing entity",
e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while publishing entity", e);
}
}
}
public void start() {
executor = Executors.newSingleThreadExecutor();
executor.execute(createRunnable());
}
public void stop() {
LOG.info("Stopping TimelineClient.");
executor.shutdownNow();
try {
executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
}

View File

@ -215,11 +215,11 @@ public class TestTimelineClient {
+ "Timeline server should be off to run this test. ");
} catch (RuntimeException ce) {
Assert.assertTrue(
"Handler exception for reason other than retry: " + ce.getMessage(),
ce.getMessage().contains("Connection retries limit exceeded"));
"Handler exception for reason other than retry: " + ce.getMessage(),
ce.getMessage().contains("Connection retries limit exceeded"));
// we would expect this exception here, check if the client has retried
Assert.assertTrue("Retry filter didn't perform any retries! ", client
.connectionRetry.getRetired());
Assert.assertTrue("Retry filter didn't perform any retries! ",
client.connector.connectionRetry.getRetired());
}
}
@ -318,7 +318,7 @@ public class TestTimelineClient {
.getMessage().contains("Connection retries limit exceeded"));
// we would expect this exception here, check if the client has retried
Assert.assertTrue("Retry filter didn't perform any retries! ",
client.connectionRetry.getRetired());
client.connector.connectionRetry.getRetired());
}
public static ClientResponse mockEntityClientResponse(
@ -419,17 +419,26 @@ public class TestTimelineClient {
private TimelineClientImpl createTimelineClientFakeTimelineClientRetryOp(
YarnConfiguration conf) {
TimelineClientImpl client = new TimelineClientImpl() {
@Override
public TimelineClientRetryOp
createTimelineClientRetryOpForOperateDelegationToken(
final PrivilegedExceptionAction<?> action) throws IOException {
TimelineClientRetryOpForOperateDelegationToken op =
spy(new TimelineClientRetryOpForOperateDelegationToken(
UserGroupInformation.getCurrentUser(), action));
doThrow(new SocketTimeoutException("Test socketTimeoutException"))
.when(op).run();
return op;
protected TimelineConnector createTimelineConnector() {
TimelineConnector connector =
new TimelineConnector(true, authUgi, doAsUser, token) {
@Override
public TimelineClientRetryOp
createRetryOpForOperateDelegationToken(
final PrivilegedExceptionAction<?> action)
throws IOException {
TimelineClientRetryOpForOperateDelegationToken op =
spy(new TimelineClientRetryOpForOperateDelegationToken(
UserGroupInformation.getCurrentUser(), action));
doThrow(
new SocketTimeoutException("Test socketTimeoutException"))
.when(op).run();
return op;
}
};
addIfService(connector);
return connector;
}
};
client.init(conf);

View File

@ -50,7 +50,7 @@ public class TestTimelineClientV2Impl {
public void setup() {
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
if (!currTestName.getMethodName()
.contains("testRetryOnConnectionFailure")) {
@ -71,7 +71,7 @@ public class TestTimelineClientV2Impl {
}
private class TestV2TimelineClientForExceptionHandling
extends TimelineClientImpl {
extends TimelineV2ClientImpl {
public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
super(id);
}

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -76,7 +76,7 @@ public class NMTimelinePublisher extends CompositeService {
private String httpAddress;
private final Map<ApplicationId, TimelineClient> appToClientMap;
private final Map<ApplicationId, TimelineV2Client> appToClientMap;
public NMTimelinePublisher(Context context) {
super(NMTimelinePublisher.class.getName());
@ -102,7 +102,7 @@ public class NMTimelinePublisher extends CompositeService {
}
@VisibleForTesting
Map<ApplicationId, TimelineClient> getAppToClientMap() {
Map<ApplicationId, TimelineV2Client> getAppToClientMap() {
return appToClientMap;
}
@ -145,7 +145,7 @@ public class NMTimelinePublisher extends CompositeService {
try {
// no need to put it as part of publisher as timeline client already has
// Queuing concept
TimelineClient timelineClient = getTimelineClient(appId);
TimelineV2Client timelineClient = getTimelineClient(appId);
if (timelineClient != null) {
timelineClient.putEntitiesAsync(entity);
} else {
@ -234,7 +234,7 @@ public class NMTimelinePublisher extends CompositeService {
try {
// no need to put it as part of publisher as timeline client already has
// Queuing concept
TimelineClient timelineClient = getTimelineClient(appId);
TimelineV2Client timelineClient = getTimelineClient(appId);
if (timelineClient != null) {
timelineClient.putEntitiesAsync(entity);
} else {
@ -265,7 +265,7 @@ public class NMTimelinePublisher extends CompositeService {
LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
TimelineClient timelineClient = getTimelineClient(appId);
TimelineV2Client timelineClient = getTimelineClient(appId);
if (timelineClient != null) {
timelineClient.putEntities(entity);
} else {
@ -382,8 +382,8 @@ public class NMTimelinePublisher extends CompositeService {
public void createTimelineClient(ApplicationId appId) {
if (!appToClientMap.containsKey(appId)) {
TimelineClient timelineClient =
TimelineClient.createTimelineClient(appId);
TimelineV2Client timelineClient =
TimelineV2Client.createTimelineClient(appId);
timelineClient.init(getConfig());
timelineClient.start();
appToClientMap.put(appId, timelineClient);
@ -391,7 +391,7 @@ public class NMTimelinePublisher extends CompositeService {
}
public void stopTimelineClient(ApplicationId appId) {
TimelineClient client = appToClientMap.remove(appId);
TimelineV2Client client = appToClientMap.remove(appId);
if (client != null) {
client.stop();
}
@ -399,13 +399,13 @@ public class NMTimelinePublisher extends CompositeService {
public void setTimelineServiceAddress(ApplicationId appId,
String collectorAddr) {
TimelineClient client = appToClientMap.get(appId);
TimelineV2Client client = appToClientMap.get(appId);
if (client != null) {
client.setTimelineServiceAddress(collectorAddr);
}
}
private TimelineClient getTimelineClient(ApplicationId appId) {
private TimelineV2Client getTimelineClient(ApplicationId appId) {
return appToClientMap.get(appId);
}
}

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -50,7 +50,7 @@ public class TestNMTimelinePublisher {
public void testContainerResourceUsage() {
Context context = mock(Context.class);
@SuppressWarnings("unchecked")
final DummyTimelineClient timelineClient = new DummyTimelineClient();
final DummyTimelineClient timelineClient = new DummyTimelineClient(null);
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
when(context.getHttpPort()).thenReturn(0);
NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
@ -137,7 +137,11 @@ public class TestNMTimelinePublisher {
}
}
protected static class DummyTimelineClient extends TimelineClientImpl {
protected static class DummyTimelineClient extends TimelineV2ClientImpl {
public DummyTimelineClient(ApplicationId appId) {
super(appId);
}
private TimelineEntity[] lastPublishedEntities;
@Override

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
@ -96,8 +96,8 @@ public class TestTimelineServiceClientIntegration {
@Test
public void testPutEntities() throws Exception {
TimelineClient client =
TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
TimelineV2Client client =
TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1));
try {
// set the timeline service address manually
client.setTimelineServiceAddress(
@ -123,8 +123,8 @@ public class TestTimelineServiceClientIntegration {
@Test
public void testPutExtendedEntities() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
TimelineClient client =
TimelineClient.createTimelineClient(appId);
TimelineV2Client client =
TimelineV2Client.createTimelineClient(appId);
try {
// set the timeline service address manually
client.setTimelineServiceAddress(