From 4fa1afdb883dab8786d2fb5c72a195dd2e87d711 Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Thu, 16 Feb 2017 11:41:04 -0800 Subject: [PATCH] YARN-4675. Reorganize TimelineClient and TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed by Naganarasimha G R. --- .../jobhistory/JobHistoryEventHandler.java | 57 +- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 14 +- .../v2/app/rm/RMContainerAllocator.java | 4 +- .../TestJobHistoryEventHandler.java | 8 +- .../distributedshell/ApplicationMaster.java | 98 ++- .../hadoop/yarn/client/api/AMRMClient.java | 40 +- .../client/api/async/AMRMClientAsync.java | 21 +- .../api/async/impl/AMRMClientAsyncImpl.java | 5 +- .../yarn/client/api/impl/YarnClientImpl.java | 15 +- .../yarn/client/api/TimelineClient.java | 94 +- .../yarn/client/api/TimelineV2Client.java | 92 ++ .../client/api/impl/TimelineClientImpl.java | 825 ++---------------- .../client/api/impl/TimelineConnector.java | 440 ++++++++++ .../client/api/impl/TimelineV2ClientImpl.java | 459 ++++++++++ .../client/api/impl/TestTimelineClient.java | 39 +- .../api/impl/TestTimelineClientV2Impl.java | 4 +- .../timelineservice/NMTimelinePublisher.java | 22 +- .../TestNMTimelinePublisher.java | 10 +- .../TestTimelineServiceClientIntegration.java | 10 +- 19 files changed, 1272 insertions(+), 985 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 0cc605c528..285d36e318 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -72,13 +72,12 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientHandlerException; @@ -90,8 +89,6 @@ */ public class JobHistoryEventHandler extends AbstractService implements EventHandler { - private static final JsonNodeFactory FACTORY = - new ObjectMapper().getNodeFactory(); private final AppContext context; private final int startCount; @@ -133,9 +130,10 @@ public class JobHistoryEventHandler extends AbstractService // should job completion be force when the AM shuts down? protected volatile boolean forceJobCompletion = false; + @VisibleForTesting protected TimelineClient timelineClient; - - private boolean timelineServiceV2Enabled = false; + @VisibleForTesting + protected TimelineV2Client timelineV2Client; private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB"; private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK"; @@ -268,12 +266,17 @@ protected void serviceInit(Configuration conf) throws Exception { MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) { LOG.info("Emitting job history data to the timeline service is enabled"); if (YarnConfiguration.timelineServiceEnabled(conf)) { - - timelineClient = - ((MRAppMaster.RunningAppContext)context).getTimelineClient(); - timelineClient.init(conf); - timelineServiceV2Enabled = - YarnConfiguration.timelineServiceV2Enabled(conf); + boolean timelineServiceV2Enabled = + ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2); + if(timelineServiceV2Enabled) { + timelineV2Client = + ((MRAppMaster.RunningAppContext)context).getTimelineV2Client(); + timelineV2Client.init(conf); + } else { + timelineClient = + ((MRAppMaster.RunningAppContext) context).getTimelineClient(); + timelineClient.init(conf); + } LOG.info("Timeline service is enabled; version: " + YarnConfiguration.getTimelineServiceVersion(conf)); } else { @@ -324,6 +327,8 @@ private void mkdir(FileSystem fs, Path path, FsPermission fsp) protected void serviceStart() throws Exception { if (timelineClient != null) { timelineClient.start(); + } else if (timelineV2Client != null) { + timelineV2Client.start(); } eventHandlingThread = new Thread(new Runnable() { @Override @@ -448,6 +453,8 @@ protected void serviceStop() throws Exception { } if (timelineClient != null) { timelineClient.stop(); + } else if (timelineV2Client != null) { + timelineV2Client.stop(); } LOG.info("Stopped JobHistoryEventHandler. super.stop()"); super.serviceStop(); @@ -605,14 +612,12 @@ public void handleEvent(JobHistoryEvent event) { } processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); - if (timelineClient != null) { - if (timelineServiceV2Enabled) { - processEventForNewTimelineService(historyEvent, event.getJobID(), - event.getTimestamp()); - } else { - processEventForTimelineServer(historyEvent, event.getJobID(), - event.getTimestamp()); - } + if (timelineV2Client != null) { + processEventForNewTimelineService(historyEvent, event.getJobID(), + event.getTimestamp()); + } else if (timelineClient != null) { + processEventForTimelineServer(historyEvent, event.getJobID(), + event.getTimestamp()); } if (LOG.isDebugEnabled()) { LOG.debug("In HistoryEventHandler " @@ -1162,8 +1167,8 @@ private void publishConfigsOnJobSubmittedEvent(JobSubmittedEvent event, configSize += size; if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) { if (jobEntityForConfigs.getConfigs().size() > 0) { - timelineClient.putEntities(jobEntityForConfigs); - timelineClient.putEntities(appEntityForConfigs); + timelineV2Client.putEntities(jobEntityForConfigs); + timelineV2Client.putEntities(appEntityForConfigs); jobEntityForConfigs = createJobEntity(jobId); appEntityForConfigs = new ApplicationEntity(); appEntityForConfigs.setId(appId); @@ -1174,8 +1179,8 @@ private void publishConfigsOnJobSubmittedEvent(JobSubmittedEvent event, appEntityForConfigs.addConfig(entry.getKey(), entry.getValue()); } if (configSize > 0) { - timelineClient.putEntities(jobEntityForConfigs); - timelineClient.putEntities(appEntityForConfigs); + timelineV2Client.putEntities(jobEntityForConfigs); + timelineV2Client.putEntities(appEntityForConfigs); } } catch (IOException | YarnException e) { LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " + @@ -1295,9 +1300,9 @@ private void processEventForNewTimelineService(HistoryEvent event, } try { if (appEntityWithJobMetrics == null) { - timelineClient.putEntitiesAsync(tEntity); + timelineV2Client.putEntitiesAsync(tEntity); } else { - timelineClient.putEntities(tEntity, appEntityWithJobMetrics); + timelineV2Client.putEntities(tEntity, appEntityWithJobMetrics); } } catch (IOException | YarnException e) { LOG.error("Failed to process Event " + event.getEventType() diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 835c0aa87a..12df83dbea 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -38,6 +38,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.crypto.KeyGenerator; + import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -141,6 +143,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -154,8 +157,6 @@ import com.google.common.annotations.VisibleForTesting; -import javax.crypto.KeyGenerator; - /** * The Map-Reduce Application Master. * The state machine is encapsulated in the implementation of Job interface. @@ -1066,6 +1067,7 @@ public class RunningAppContext implements AppContext { private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; private TimelineClient timelineClient = null; + private TimelineV2Client timelineV2Client = null; private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor; @@ -1081,7 +1083,7 @@ public RunningAppContext(Configuration config, if (YarnConfiguration.timelineServiceV2Enabled(conf)) { // create new version TimelineClient - timelineClient = TimelineClient.createTimelineClient( + timelineV2Client = TimelineV2Client.createTimelineClient( appAttemptID.getApplicationId()); } else { timelineClient = TimelineClient.createTimelineClient(); @@ -1177,10 +1179,14 @@ public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() { return taskAttemptFinishingMonitor; } - // Get Timeline Collector's address (get sync from RM) public TimelineClient getTimelineClient() { return timelineClient; } + + // Get Timeline Collector's address (get sync from RM) + public TimelineV2Client getTimelineV2Client() { + return timelineV2Client; + } } @SuppressWarnings("unchecked") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 31bc380b41..1f88a2c70e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -882,8 +882,8 @@ private List getResources() throws Exception { MRAppMaster.RunningAppContext appContext = (MRAppMaster.RunningAppContext)this.getContext(); if (collectorAddr != null && !collectorAddr.isEmpty() - && appContext.getTimelineClient() != null) { - appContext.getTimelineClient().setTimelineServiceAddress( + && appContext.getTimelineV2Client() != null) { + appContext.getTimelineV2Client().setTimelineServiceAddress( response.getCollectorAddr()); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 0b33d6be67..6c5e604388 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -29,8 +29,8 @@ import java.io.File; import java.io.FileOutputStream; -import java.io.InputStream; import java.io.IOException; +import java.io.InputStream; import java.util.HashMap; import org.apache.commons.logging.Log; @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -829,6 +830,9 @@ private AppContext mockAppContext(Class contextClass, if (mockContext instanceof RunningAppContext) { when(((RunningAppContext)mockContext).getTimelineClient()). thenReturn(TimelineClient.createTimelineClient()); + when(((RunningAppContext) mockContext).getTimelineV2Client()) + .thenReturn(TimelineV2Client + .createTimelineClient(ApplicationId.newInstance(0, 1))); } return mockContext; } @@ -937,6 +941,8 @@ public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHist protected void serviceStart() { if (timelineClient != null) { timelineClient.start(); + } else if (timelineV2Client != null) { + timelineV2Client.start(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 5a06ef629a..4daebb53e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; @@ -219,7 +220,9 @@ public static enum DSEntity { // Tracking url to which app master publishes info for clients to monitor private String appMasterTrackingUrl = ""; - private boolean timelineServiceV2 = false; + private boolean timelineServiceV2Enabled = false; + + private boolean timelineServiceV1Enabled = false; // App Master configuration // No. of containers to run shell command on @@ -293,6 +296,10 @@ public static enum DSEntity { // Timeline Client @VisibleForTesting TimelineClient timelineClient; + + // Timeline v2 Client + private TimelineV2Client timelineV2Client; + static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS"; static final String APPID_TIMELINE_FILTER_NAME = "appId"; static final String USER_TIMELINE_FILTER_NAME = "user"; @@ -556,9 +563,12 @@ public boolean init(String[] args) throws ParseException, IOException { "container_retry_interval", "0")); if (YarnConfiguration.timelineServiceEnabled(conf)) { - timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf); + timelineServiceV2Enabled = + ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2); + timelineServiceV1Enabled = !timelineServiceV2Enabled; } else { timelineClient = null; + timelineV2Client = null; LOG.warn("Timeline service is not enabled"); } @@ -621,18 +631,17 @@ public void run() throws YarnException, IOException, InterruptedException { nmClientAsync.start(); startTimelineClient(conf); - if (timelineServiceV2) { + if (timelineServiceV2Enabled) { // need to bind timelineClient - amRMClient.registerTimelineClient(timelineClient); + amRMClient.registerTimelineV2Client(timelineV2Client); } - if(timelineClient != null) { - if (timelineServiceV2) { - publishApplicationAttemptEventOnTimelineServiceV2( - DSEvent.DS_APP_ATTEMPT_START); - } else { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); - } + + if (timelineServiceV2Enabled) { + publishApplicationAttemptEventOnTimelineServiceV2( + DSEvent.DS_APP_ATTEMPT_START); + } else if (timelineServiceV1Enabled) { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); } // Setup local RPC Server to accept status requests directly from clients @@ -704,18 +713,21 @@ void startTimelineClient(final Configuration conf) public Void run() throws Exception { if (YarnConfiguration.timelineServiceEnabled(conf)) { // Creating the Timeline Client - if (timelineServiceV2) { - timelineClient = TimelineClient.createTimelineClient( + if (timelineServiceV2Enabled) { + timelineV2Client = TimelineV2Client.createTimelineClient( appAttemptID.getApplicationId()); + timelineV2Client.init(conf); + timelineV2Client.start(); LOG.info("Timeline service V2 client is enabled"); } else { timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + timelineClient.start(); LOG.info("Timeline service V1 client is enabled"); } - timelineClient.init(conf); - timelineClient.start(); } else { timelineClient = null; + timelineV2Client = null; LOG.warn("Timeline service is not enabled"); } return null; @@ -741,14 +753,12 @@ protected boolean finish() { } catch (InterruptedException ex) {} } - if (timelineClient != null) { - if (timelineServiceV2) { - publishApplicationAttemptEventOnTimelineServiceV2( - DSEvent.DS_APP_ATTEMPT_END); - } else { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); - } + if (timelineServiceV2Enabled) { + publishApplicationAttemptEventOnTimelineServiceV2( + DSEvent.DS_APP_ATTEMPT_END); + } else if (timelineServiceV1Enabled) { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); } // Join all launched threads @@ -797,8 +807,10 @@ protected boolean finish() { amRMClient.stop(); // Stop Timeline Client - if(timelineClient != null) { + if(timelineServiceV1Enabled) { timelineClient.stop(); + } else if (timelineServiceV2Enabled) { + timelineV2Client.stop(); } return success; @@ -853,16 +865,14 @@ public void onContainersCompleted(List completedContainers) { LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); } - if(timelineClient != null) { - if (timelineServiceV2) { - publishContainerEndEventOnTimelineServiceV2(containerStatus); - } else { - publishContainerEndEvent( - timelineClient, containerStatus, domainId, appSubmitterUgi); - } + if (timelineServiceV2Enabled) { + publishContainerEndEventOnTimelineServiceV2(containerStatus); + } else if (timelineServiceV1Enabled) { + publishContainerEndEvent(timelineClient, containerStatus, domainId, + appSubmitterUgi); } } - + // ask for more containers if any failed int askCount = numTotalContainers - numRequestedContainers.get(); numRequestedContainers.addAndGet(askCount); @@ -983,15 +993,13 @@ public void onContainerStarted(ContainerId containerId, applicationMaster.nmClientAsync.getContainerStatusAsync( containerId, container.getNodeId()); } - if(applicationMaster.timelineClient != null) { - if (applicationMaster.timelineServiceV2) { - applicationMaster.publishContainerStartEventOnTimelineServiceV2( - container); - } else { - applicationMaster.publishContainerStartEvent( - applicationMaster.timelineClient, container, - applicationMaster.domainId, applicationMaster.appSubmitterUgi); - } + if (applicationMaster.timelineServiceV2Enabled) { + applicationMaster + .publishContainerStartEventOnTimelineServiceV2(container); + } else if (applicationMaster.timelineServiceV1Enabled) { + applicationMaster.publishContainerStartEvent( + applicationMaster.timelineClient, container, + applicationMaster.domainId, applicationMaster.appSubmitterUgi); } } @@ -1371,7 +1379,7 @@ private void publishContainerStartEventOnTimelineServiceV2( appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @Override public TimelinePutResponse run() throws Exception { - timelineClient.putEntities(entity); + timelineV2Client.putEntities(entity); return null; } }); @@ -1404,7 +1412,7 @@ private void publishContainerEndEventOnTimelineServiceV2( appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @Override public TimelinePutResponse run() throws Exception { - timelineClient.putEntities(entity); + timelineV2Client.putEntities(entity); return null; } }); @@ -1438,7 +1446,7 @@ private void publishApplicationAttemptEventOnTimelineServiceV2( appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @Override public TimelinePutResponse run() throws Exception { - timelineClient.putEntitiesAsync(entity); + timelineV2Client.putEntitiesAsync(entity); return null; } }); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 15d0065c79..69f37771c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -41,12 +42,13 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; -import org.apache.hadoop.yarn.util.resource.Resources; @InterfaceAudience.Public @InterfaceStability.Stable @@ -54,7 +56,8 @@ public abstract class AMRMClient extends AbstractService { private static final Log LOG = LogFactory.getLog(AMRMClient.class); - private TimelineClient timelineClient; + private TimelineV2Client timelineV2Client; + private boolean timelineServiceV2Enabled; /** * Create a new instance of AMRMClient. @@ -79,6 +82,12 @@ protected AMRMClient(String name) { nmTokenCache = NMTokenCache.getSingleton(); } + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + timelineServiceV2Enabled = YarnConfiguration.timelineServiceV2Enabled(conf); + } + /** * Object to represent a single container request for resources. Scheduler * documentation should be consulted for the specifics of how the parameters @@ -682,19 +691,30 @@ public NMTokenCache getNMTokenCache() { } /** - * Register TimelineClient to AMRMClient. - * @param client the timeline client to register + * Register TimelineV2Client to AMRMClient. Writer's address for the timeline + * V2 client will be updated dynamically if registered. + * + * @param client the timeline v2 client to register + * @throws YarnException when this method is invoked even when ATS V2 is not + * configured. */ - public void registerTimelineClient(TimelineClient client) { - this.timelineClient = client; + public void registerTimelineV2Client(TimelineV2Client client) + throws YarnException { + if (timelineServiceV2Enabled) { + timelineV2Client = client; + } else { + LOG.error("Trying to register timeline v2 client when not configured."); + throw new YarnException( + "register timeline v2 client when not configured."); + } } /** - * Get registered timeline client. - * @return the registered timeline client + * Get registered timeline v2 client. + * @return the registered timeline v2 client */ - public TimelineClient getRegisteredTimelineClient() { - return this.timelineClient; + public TimelineV2Client getRegisteredTimelineV2Client() { + return this.timelineV2Client; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 4cb27cde21..1ecfe1f588 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.client.api.async; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; import java.io.IOException; import java.util.Collection; import java.util.List; @@ -29,8 +27,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.Container; @@ -46,13 +44,15 @@ import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; -import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; /** * AMRMClientAsync handles communication with the ResourceManager @@ -346,17 +346,20 @@ public abstract void requestContainerUpdate( /** * Register TimelineClient to AMRMClient. * @param timelineClient + * @throws YarnException when this method is invoked even when ATS V2 is not + * configured. */ - public void registerTimelineClient(TimelineClient timelineClient) { - client.registerTimelineClient(timelineClient); + public void registerTimelineV2Client(TimelineV2Client timelineClient) + throws YarnException { + client.registerTimelineV2Client(timelineClient); } /** * Get registered timeline client. * @return the registered timeline client */ - public TimelineClient getRegisteredTimelineClient() { - return client.getRegisteredTimelineClient(); + public TimelineV2Client getRegisteredTimelineV2Client() { + return client.getRegisteredTimelineV2Client(); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 9e2c0e5901..6711da28cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; -import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; @@ -326,7 +326,8 @@ public void run() { AllocateResponse response = (AllocateResponse) object; String collectorAddress = response.getCollectorAddr(); - TimelineClient timelineClient = client.getRegisteredTimelineClient(); + TimelineV2Client timelineClient = + client.getRegisteredTimelineV2Client(); if (timelineClient != null && collectorAddress != null && !collectorAddress.isEmpty()) { if (collectorAddr == null diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index e406862a4b..4a27fee1aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -141,7 +141,7 @@ public class YarnClientImpl extends YarnClient { Text timelineService; @VisibleForTesting String timelineDTRenewer; - protected boolean timelineServiceEnabled; + private boolean timelineV1ServiceEnabled; protected boolean timelineServiceBestEffort; private static final String ROOT = "root"; @@ -167,9 +167,14 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); } + float timelineServiceVersion = + conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { - timelineServiceEnabled = true; + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) + && ((Float.compare(timelineServiceVersion, 1.0f) == 0) + || (Float.compare(timelineServiceVersion, 1.5f) == 0))) { + timelineV1ServiceEnabled = true; timelineDTRenewer = getTimelineDelegationTokenRenewer(conf); timelineService = TimelineUtils.buildTimelineTokenService(conf); } @@ -178,7 +183,7 @@ protected void serviceInit(Configuration conf) throws Exception { // TimelineServer which means we are able to get history information // for applications/applicationAttempts/containers by using ahsClient // when the TimelineServer is running. - if (timelineServiceEnabled || conf.getBoolean( + if (timelineV1ServiceEnabled || conf.getBoolean( YarnConfiguration.APPLICATION_HISTORY_ENABLED, YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { historyServiceEnabled = true; @@ -257,7 +262,7 @@ public YarnClientApplication createApplication() // Automatically add the timeline DT into the CLC // Only when the security and the timeline service are both enabled - if (isSecurityEnabled() && timelineServiceEnabled) { + if (isSecurityEnabled() && timelineV1ServiceEnabled) { addTimelineDelegationToken(appContext.getAMContainerSpec()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index cc76718dd0..4835239a92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -21,14 +21,12 @@ import java.io.Flushable; import java.io.IOException; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; @@ -39,24 +37,22 @@ /** * A client library that can be used to post some information in terms of a - * number of conceptual entities. + * number of conceptual entities. This client library needs to be used along + * with Timeline V.1.x server versions. + * Refer {@link TimelineV2Client} for ATS V2 interface. */ @Public @Evolving -public abstract class TimelineClient extends AbstractService implements +public abstract class TimelineClient extends CompositeService implements Flushable { - /** - * Create a timeline client. The current UGI when the user initialize the - * client will be used to do the put and the delegation token operations. The - * current user may use {@link UserGroupInformation#doAs} another user to - * construct and initialize a timeline client if the following operations are - * supposed to be conducted by that user. - */ - private ApplicationId contextAppId; - /** * Creates an instance of the timeline v.1.x client. + * The current UGI when the user initialize the client will be used to do the + * put and the delegation token operations. The current user may use + * {@link UserGroupInformation#doAs} another user to construct and initialize + * a timeline client if the following operations are supposed to be conducted + * by that user. * * @return the created timeline client instance */ @@ -66,23 +62,8 @@ public static TimelineClient createTimelineClient() { return client; } - /** - * Creates an instance of the timeline v.2 client. - * - * @param appId the application id with which the timeline client is - * associated - * @return the created timeline client instance - */ - @Public - public static TimelineClient createTimelineClient(ApplicationId appId) { - TimelineClient client = new TimelineClientImpl(appId); - return client; - } - - @Private - protected TimelineClient(String name, ApplicationId appId) { + protected TimelineClient(String name) { super(name); - setContextAppId(appId); } /** @@ -207,57 +188,4 @@ public abstract long renewDelegationToken( public abstract void cancelDelegationToken( Token timelineDT) throws IOException, YarnException; - - /** - *

- * Send the information of a number of conceptual entities to the timeline - * service v.2 collector. It is a blocking API. The method will not return - * until all the put entities have been persisted. If this method is invoked - * for a non-v.2 timeline client instance, a YarnException is thrown. - *

- * - * @param entities the collection of {@link - * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} - * @throws IOException - * @throws YarnException - */ - @Public - public abstract void putEntities( - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... - entities) throws IOException, YarnException; - - /** - *

- * Send the information of a number of conceptual entities to the timeline - * service v.2 collector. It is an asynchronous API. The method will return - * once all the entities are received. If this method is invoked for a - * non-v.2 timeline client instance, a YarnException is thrown. - *

- * - * @param entities the collection of {@link - * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} - * @throws IOException - * @throws YarnException - */ - @Public - public abstract void putEntitiesAsync( - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... - entities) throws IOException, YarnException; - - /** - *

- * Update the timeline service address where the request will be sent to. - *

- * @param address - * the timeline service address - */ - public abstract void setTimelineServiceAddress(String address); - - protected ApplicationId getContextAppId() { - return contextAppId; - } - - protected void setContextAppId(ApplicationId appId) { - this.contextAppId = appId; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java new file mode 100644 index 0000000000..32cf1e94ba --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * A client library that can be used to post some information in terms of a + * number of conceptual entities. This client library needs to be used along + * with time line v.2 server version. + * Refer {@link TimelineClient} for ATS V1 interface. + */ +public abstract class TimelineV2Client extends CompositeService { + /** + * Creates an instance of the timeline v.2 client. + * + * @param appId the application id with which the timeline client is + * associated + * @return the created timeline client instance + */ + @Public + public static TimelineV2Client createTimelineClient(ApplicationId appId) { + TimelineV2Client client = new TimelineV2ClientImpl(appId); + return client; + } + + protected TimelineV2Client(String name) { + super(name); + } + + /** + *

+ * Send the information of a number of conceptual entities to the timeline + * service v.2 collector. It is a blocking API. The method will not return + * until all the put entities have been persisted. + *

+ * + * @param entities the collection of {@link TimelineEntity} + * @throws IOException if there are I/O errors + * @throws YarnException if entities are incomplete/invalid + */ + @Public + public abstract void putEntities(TimelineEntity... entities) + throws IOException, YarnException; + + /** + *

+ * Send the information of a number of conceptual entities to the timeline + * service v.2 collector. It is an asynchronous API. The method will return + * once all the entities are received. + *

+ * + * @param entities the collection of {@link TimelineEntity} + * @throws IOException if there are I/O errors + * @throws YarnException if entities are incomplete/invalid + */ + @Public + public abstract void putEntitiesAsync(TimelineEntity... entities) + throws IOException, YarnException; + + /** + *

+ * Update the timeline service address where the request will be sent to. + *

+ * + * @param address the timeline service address + */ + public abstract void setTimelineServiceAddress(String address); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 4506c4868a..f49618b10b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -20,32 +20,10 @@ import java.io.File; import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.ConnectException; -import java.net.HttpURLConnection; import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; import java.net.URI; -import java.net.URL; -import java.net.URLConnection; -import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLSocketFactory; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; - -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -57,16 +35,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; -import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; -import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; -import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; -import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -79,19 +50,9 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.ClientRequest; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.api.client.filter.ClientFilter; -import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; -import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; -import com.sun.jersey.core.util.MultivaluedMapImpl; @Private @Evolving @@ -100,9 +61,6 @@ public class TimelineClientImpl extends TimelineClient { private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/"; - private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; - private static final Joiner JOINER = Joiner.on(""); - public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute private static Options opts; private static final String ENTITY_DATA_TYPE = "entity"; @@ -117,179 +75,38 @@ public class TimelineClientImpl extends TimelineClient { opts.addOption("help", false, "Print usage"); } - private Client client; - private ConnectionConfigurator connConfigurator; - private DelegationTokenAuthenticator authenticator; - private DelegationTokenAuthenticatedURL.Token token; - private UserGroupInformation authUgi; - private String doAsUser; - private Configuration configuration; + @VisibleForTesting + protected DelegationTokenAuthenticatedURL.Token token; + @VisibleForTesting + protected UserGroupInformation authUgi; + @VisibleForTesting + protected String doAsUser; + private float timelineServiceVersion; private TimelineWriter timelineWriter; - private SSLFactory sslFactory; - private volatile String timelineServiceAddress; - - // Retry parameters for identifying new timeline service - // TODO consider to merge with connection retry - private int maxServiceRetries; - private long serviceRetryInterval; - private boolean timelineServiceV2 = false; + private String timelineServiceAddress; @Private @VisibleForTesting - TimelineClientConnectionRetry connectionRetry; - - private TimelineEntityDispatcher entityDispatcher; - - // Abstract class for an operation that should be retried by timeline client - @Private - @VisibleForTesting - public static abstract class TimelineClientRetryOp { - // The operation that should be retried - public abstract Object run() throws IOException; - // The method to indicate if we should retry given the incoming exception - public abstract boolean shouldRetryOn(Exception e); - } - - // Class to handle retry - // Outside this class, only visible to tests - @Private - @VisibleForTesting - static class TimelineClientConnectionRetry { - - // maxRetries < 0 means keep trying - @Private - @VisibleForTesting - public int maxRetries; - - @Private - @VisibleForTesting - public long retryInterval; - - // Indicates if retries happened last time. Only tests should read it. - // In unit tests, retryOn() calls should _not_ be concurrent. - private boolean retried = false; - - @Private - @VisibleForTesting - boolean getRetired() { - return retried; - } - - // Constructor with default retry settings - public TimelineClientConnectionRetry(Configuration conf) { - Preconditions.checkArgument(conf.getInt( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES) >= -1, - "%s property value should be greater than or equal to -1", - YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES); - Preconditions - .checkArgument( - conf.getLong( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0, - "%s property value should be greater than zero", - YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); - maxRetries = conf.getInt( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); - retryInterval = conf.getLong( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); - } - - public Object retryOn(TimelineClientRetryOp op) - throws RuntimeException, IOException { - int leftRetries = maxRetries; - retried = false; - - // keep trying - while (true) { - try { - // try perform the op, if fail, keep retrying - return op.run(); - } catch (IOException | RuntimeException e) { - // break if there's no retries left - if (leftRetries == 0) { - break; - } - if (op.shouldRetryOn(e)) { - logException(e, leftRetries); - } else { - throw e; - } - } - if (leftRetries > 0) { - leftRetries--; - } - retried = true; - try { - // sleep for the given time interval - Thread.sleep(retryInterval); - } catch (InterruptedException ie) { - LOG.warn("Client retry sleep interrupted! "); - } - } - throw new RuntimeException("Failed to connect to timeline server. " - + "Connection retries limit exceeded. " - + "The posted timeline event may be missing"); - }; - - private void logException(Exception e, int leftRetries) { - if (leftRetries > 0) { - LOG.info("Exception caught by TimelineClientConnectionRetry," - + " will try " + leftRetries + " more time(s).\nMessage: " - + e.getMessage()); - } else { - // note that maxRetries may be -1 at the very beginning - LOG.info("ConnectionException caught by TimelineClientConnectionRetry," - + " will keep retrying.\nMessage: " - + e.getMessage()); - } - } - } - - private class TimelineJerseyRetryFilter extends ClientFilter { - @Override - public ClientResponse handle(final ClientRequest cr) - throws ClientHandlerException { - // Set up the retry operation - TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() { - @Override - public Object run() { - // Try pass the request, if fail, keep retrying - return getNext().handle(cr); - } - - @Override - public boolean shouldRetryOn(Exception e) { - // Only retry on connection exceptions - return (e instanceof ClientHandlerException) - && (e.getCause() instanceof ConnectException || - e.getCause() instanceof SocketTimeoutException); - } - }; - try { - return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp); - } catch (IOException e) { - throw new ClientHandlerException("Jersey retry failed!\nMessage: " - + e.getMessage()); - } - } - } + TimelineConnector connector; public TimelineClientImpl() { - super(TimelineClientImpl.class.getName(), null); - } - - public TimelineClientImpl(ApplicationId applicationId) { - super(TimelineClientImpl.class.getName(), applicationId); - this.timelineServiceV2 = true; + super(TimelineClientImpl.class.getName()); } protected void serviceInit(Configuration conf) throws Exception { - this.configuration = conf; + timelineServiceVersion = + conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); + LOG.info("Timeline service address: " + getTimelineServiceAddress()); + if (!YarnConfiguration.timelineServiceEnabled(conf) + || !((Float.compare(this.timelineServiceVersion, 1.0f) == 0) + || (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) { + throw new IOException("Timeline V1 client is not properly configured. " + + "Either timeline service is not enabled or version is not set to" + + " 1.x"); + } UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation realUgi = ugi.getRealUser(); if (realUgi != null) { @@ -299,62 +116,34 @@ protected void serviceInit(Configuration conf) throws Exception { authUgi = ugi; doAsUser = null; } - ClientConfig cc = new DefaultClientConfig(); - cc.getClasses().add(YarnJacksonJaxbJsonProvider.class); - connConfigurator = initConnConfigurator(conf); - if (UserGroupInformation.isSecurityEnabled()) { - authenticator = new KerberosDelegationTokenAuthenticator(); - } else { - authenticator = new PseudoDelegationTokenAuthenticator(); - } - authenticator.setConnectionConfigurator(connConfigurator); token = new DelegationTokenAuthenticatedURL.Token(); + connector = createTimelineConnector(); - connectionRetry = new TimelineClientConnectionRetry(conf); - client = new Client(new URLConnectionClientHandler( - new TimelineURLConnectionFactory()), cc); - TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter(); - // TODO need to cleanup filter retry later. - if (!timelineServiceV2) { - client.addFilter(retryFilter); - } - - // old version timeline service need to get address from configuration - // while new version need to auto discovery (with retry). - if (timelineServiceV2) { - maxServiceRetries = conf.getInt( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); - serviceRetryInterval = conf.getLong( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); - entityDispatcher = new TimelineEntityDispatcher(conf); + if (YarnConfiguration.useHttps(conf)) { + timelineServiceAddress = + conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS); } else { - if (YarnConfiguration.useHttps(conf)) { - setTimelineServiceAddress(conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS)); - } else { - setTimelineServiceAddress(conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS)); - } - timelineServiceVersion = - conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); - LOG.info("Timeline service address: " + getTimelineServiceAddress()); + timelineServiceAddress = + conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); } super.serviceInit(conf); } + @VisibleForTesting + protected TimelineConnector createTimelineConnector() { + TimelineConnector newConnector = + new TimelineConnector(true, authUgi, doAsUser, token); + addIfService(newConnector); + return newConnector; + } + @Override protected void serviceStart() throws Exception { - if (timelineServiceV2) { - entityDispatcher.start(); - } else { - timelineWriter = createTimelineWriter(configuration, authUgi, client, - constructResURI(getConfig(), timelineServiceAddress, false)); - } + timelineWriter = createTimelineWriter(getConfig(), authUgi, + connector.getClient(), TimelineConnector.constructResURI(getConfig(), + timelineServiceAddress, RESOURCE_URI_STR_V1)); } protected TimelineWriter createTimelineWriter(Configuration conf, @@ -373,12 +162,6 @@ protected void serviceStop() throws Exception { if (this.timelineWriter != null) { this.timelineWriter.close(); } - if (timelineServiceV2) { - entityDispatcher.stop(); - } - if (this.sslFactory != null) { - this.sslFactory.destroy(); - } super.serviceStop(); } @@ -390,132 +173,17 @@ public void flush() throws IOException { } @Override - public TimelinePutResponse putEntities( - TimelineEntity... entities) throws IOException, YarnException { + public TimelinePutResponse putEntities(TimelineEntity... entities) + throws IOException, YarnException { return timelineWriter.putEntities(entities); } - @Override - public void putEntities( - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... - entities) throws IOException, YarnException { - if (!timelineServiceV2) { - throw new YarnException("v.2 method is invoked on a v.1.x client"); - } - entityDispatcher.dispatchEntities(true, entities); - } - - @Override - public void putEntitiesAsync( - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... - entities) throws IOException, YarnException { - if (!timelineServiceV2) { - throw new YarnException("v.2 method is invoked on a v.1.x client"); - } - entityDispatcher.dispatchEntities(false, entities); - } - @Override public void putDomain(TimelineDomain domain) throws IOException, YarnException { timelineWriter.putDomain(domain); } - // Used for new timeline service only - @Private - protected void putObjects(String path, MultivaluedMap params, - Object obj) throws IOException, YarnException { - - int retries = verifyRestEndPointAvailable(); - - // timelineServiceAddress could be stale, add retry logic here. - boolean needRetry = true; - while (needRetry) { - try { - URI uri = constructResURI(getConfig(), timelineServiceAddress, true); - putObjects(uri, path, params, obj); - needRetry = false; - } catch (IOException e) { - // handle exception for timelineServiceAddress being updated. - checkRetryWithSleep(retries, e); - retries--; - } - } - } - - private int verifyRestEndPointAvailable() throws YarnException { - // timelineServiceAddress could haven't be initialized yet - // or stale (only for new timeline service) - int retries = pollTimelineServiceAddress(this.maxServiceRetries); - if (timelineServiceAddress == null) { - String errMessage = "TimelineClient has reached to max retry times : " - + this.maxServiceRetries - + ", but failed to fetch timeline service address. Please verify" - + " Timeline Auxiliary Service is configured in all the NMs"; - LOG.error(errMessage); - throw new YarnException(errMessage); - } - return retries; - } - - /** - * Check if reaching to maximum of retries. - * @param retries - * @param e - */ - private void checkRetryWithSleep(int retries, IOException e) - throws YarnException, IOException { - if (retries > 0) { - try { - Thread.sleep(this.serviceRetryInterval); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new YarnException("Interrupted while retrying to connect to ATS"); - } - } else { - StringBuilder msg = - new StringBuilder("TimelineClient has reached to max retry times : "); - msg.append(this.maxServiceRetries); - msg.append(" for service address: "); - msg.append(timelineServiceAddress); - LOG.error(msg.toString()); - throw new IOException(msg.toString(), e); - } - } - - protected void putObjects( - URI base, String path, MultivaluedMap params, Object obj) - throws IOException, YarnException { - ClientResponse resp; - try { - resp = client.resource(base).path(path).queryParams(params) - .accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON) - .put(ClientResponse.class, obj); - } catch (RuntimeException re) { - // runtime exception is expected if the client cannot connect the server - String msg = - "Failed to get the response from the timeline server."; - LOG.error(msg, re); - throw new IOException(re); - } - if (resp == null || - resp.getStatusInfo().getStatusCode() != - ClientResponse.Status.OK.getStatusCode()) { - String msg = "Response from the timeline server is " + - ((resp == null) ? "null": - "not successful," + " HTTP error code: " + resp.getStatus() - + ", Server response:\n" + resp.getEntity(String.class)); - LOG.error(msg); - throw new YarnException(msg); - } - } - - @Override - public void setTimelineServiceAddress(String address) { - this.timelineServiceAddress = address; - } - private String getTimelineServiceAddress() { return this.timelineServiceAddress; } @@ -532,17 +200,17 @@ public Token getDelegationToken( public Token run() throws Exception { DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(authenticator, - connConfigurator); + connector.getDelegationTokenAuthenticatedURL(); // TODO we should add retry logic here if timelineServiceAddress is // not available immediately. return (Token) authUrl.getDelegationToken( - constructResURI(getConfig(), - getTimelineServiceAddress(), false).toURL(), + TimelineConnector.constructResURI(getConfig(), + getTimelineServiceAddress(), RESOURCE_URI_STR_V1).toURL(), token, renewer, doAsUser); } }; - return (Token) operateDelegationToken(getDTAction); + return (Token) connector + .operateDelegationToken(getDTAction); } @SuppressWarnings("unchecked") @@ -568,26 +236,26 @@ public Long run() throws Exception { token.setDelegationToken((Token) timelineDT); } DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(authenticator, - connConfigurator); + connector.getDelegationTokenAuthenticatedURL(); // If the token service address is not available, fall back to use // the configured service address. - final URI serviceURI = isTokenServiceAddrEmpty ? - constructResURI(getConfig(), getTimelineServiceAddress(), false) + final URI serviceURI = isTokenServiceAddrEmpty + ? TimelineConnector.constructResURI(getConfig(), + getTimelineServiceAddress(), RESOURCE_URI_STR_V1) : new URI(scheme, null, address.getHostName(), - address.getPort(), RESOURCE_URI_STR_V1, null, null); + address.getPort(), RESOURCE_URI_STR_V1, null, null); return authUrl .renewDelegationToken(serviceURI.toURL(), token, doAsUser); } }; - return (Long) operateDelegationToken(renewDTAction); + return (Long) connector.operateDelegationToken(renewDTAction); } @SuppressWarnings("unchecked") @Override public void cancelDelegationToken( final Token timelineDT) - throws IOException, YarnException { + throws IOException, YarnException { final boolean isTokenServiceAddrEmpty = timelineDT.getService().toString().isEmpty(); final String scheme = isTokenServiceAddrEmpty ? null @@ -607,134 +275,29 @@ public Void run() throws Exception { token.setDelegationToken((Token) timelineDT); } DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(authenticator, - connConfigurator); + connector.getDelegationTokenAuthenticatedURL(); // If the token service address is not available, fall back to use // the configured service address. - final URI serviceURI = isTokenServiceAddrEmpty ? - constructResURI(getConfig(), getTimelineServiceAddress(), false) + final URI serviceURI = isTokenServiceAddrEmpty + ? TimelineConnector.constructResURI(getConfig(), + getTimelineServiceAddress(), RESOURCE_URI_STR_V1) : new URI(scheme, null, address.getHostName(), - address.getPort(), RESOURCE_URI_STR_V1, null, null); + address.getPort(), RESOURCE_URI_STR_V1, null, null); authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser); return null; } }; - operateDelegationToken(cancelDTAction); + connector.operateDelegationToken(cancelDTAction); } @Override public String toString() { return super.toString() + " with timeline server " - + constructResURI(getConfig(), getTimelineServiceAddress(), false) + + TimelineConnector.constructResURI(getConfig(), + getTimelineServiceAddress(), RESOURCE_URI_STR_V1) + " and writer " + timelineWriter; } - private Object operateDelegationToken( - final PrivilegedExceptionAction action) - throws IOException, YarnException { - // Set up the retry operation - TimelineClientRetryOp tokenRetryOp = - createTimelineClientRetryOpForOperateDelegationToken(action); - - return connectionRetry.retryOn(tokenRetryOp); - } - - /** - * Poll TimelineServiceAddress for maximum of retries times if it is null. - * - * @param retries - * @return the left retry times - * @throws IOException - */ - private int pollTimelineServiceAddress(int retries) throws YarnException { - while (timelineServiceAddress == null && retries > 0) { - try { - Thread.sleep(this.serviceRetryInterval); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new YarnException("Interrupted while trying to connect ATS"); - } - retries--; - } - return retries; - } - - private class TimelineURLConnectionFactory - implements HttpURLConnectionFactory { - - @Override - public HttpURLConnection getHttpURLConnection(final URL url) throws IOException { - authUgi.checkTGTAndReloginFromKeytab(); - try { - return new DelegationTokenAuthenticatedURL( - authenticator, connConfigurator).openConnection(url, token, - doAsUser); - } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); - } catch (AuthenticationException ae) { - throw new IOException(ae); - } - } - - } - - private ConnectionConfigurator initConnConfigurator(Configuration conf) { - try { - return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf); - } catch (Exception e) { - LOG.debug("Cannot load customized ssl related configuration. " + - "Fallback to system-generic settings.", e); - return DEFAULT_TIMEOUT_CONN_CONFIGURATOR; - } - } - - private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR = - new ConnectionConfigurator() { - @Override - public HttpURLConnection configure(HttpURLConnection conn) - throws IOException { - setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT); - return conn; - } - }; - - private ConnectionConfigurator initSslConnConfigurator(final int timeout, - Configuration conf) throws IOException, GeneralSecurityException { - final SSLSocketFactory sf; - final HostnameVerifier hv; - - sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); - sslFactory.init(); - sf = sslFactory.createSSLSocketFactory(); - hv = sslFactory.getHostnameVerifier(); - - return new ConnectionConfigurator() { - @Override - public HttpURLConnection configure(HttpURLConnection conn) - throws IOException { - if (conn instanceof HttpsURLConnection) { - HttpsURLConnection c = (HttpsURLConnection) conn; - c.setSSLSocketFactory(sf); - c.setHostnameVerifier(hv); - } - setTimeouts(conn, timeout); - return conn; - } - }; - } - - private static void setTimeouts(URLConnection connection, int socketTimeout) { - connection.setConnectTimeout(socketTimeout); - connection.setReadTimeout(socketTimeout); - } - - private static URI constructResURI( - Configuration conf, String address, boolean v2) { - return URI.create( - JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://", - address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1)); - } - public static void main(String[] argv) throws Exception { CommandLine cliParser = new GnuParser().parse(opts, argv); if (cliParser.hasOption("put")) { @@ -870,266 +433,4 @@ public void putDomain(ApplicationAttemptId appAttemptId, public void setTimelineWriter(TimelineWriter writer) { this.timelineWriter = writer; } - - @Private - @VisibleForTesting - public TimelineClientRetryOp - createTimelineClientRetryOpForOperateDelegationToken( - final PrivilegedExceptionAction action) throws IOException { - return new TimelineClientRetryOpForOperateDelegationToken( - this.authUgi, action); - } - - @Private - @VisibleForTesting - public class TimelineClientRetryOpForOperateDelegationToken - extends TimelineClientRetryOp { - - private final UserGroupInformation authUgi; - private final PrivilegedExceptionAction action; - - public TimelineClientRetryOpForOperateDelegationToken( - UserGroupInformation authUgi, PrivilegedExceptionAction action) { - this.authUgi = authUgi; - this.action = action; - } - - @Override - public Object run() throws IOException { - // Try pass the request, if fail, keep retrying - authUgi.checkTGTAndReloginFromKeytab(); - try { - return authUgi.doAs(action); - } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - @Override - public boolean shouldRetryOn(Exception e) { - // retry on connection exceptions - // and SocketTimeoutException - return (e instanceof ConnectException - || e instanceof SocketTimeoutException); - } - } - - private final class EntitiesHolder extends FutureTask { - private final - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities - entities; - private final boolean isSync; - - EntitiesHolder( - final - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities - entities, - final boolean isSync) { - super(new Callable() { - // publishEntities() - public Void call() throws Exception { - MultivaluedMap params = new MultivaluedMapImpl(); - params.add("appid", getContextAppId().toString()); - params.add("async", Boolean.toString(!isSync)); - putObjects("entities", params, entities); - return null; - } - }); - this.entities = entities; - this.isSync = isSync; - } - - public boolean isSync() { - return isSync; - } - - public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities - getEntities() { - return entities; - } - } - - /** - * This class is responsible for collecting the timeline entities and - * publishing them in async. - */ - private class TimelineEntityDispatcher { - /** - * Time period for which the timelineclient will wait for draining after - * stop. - */ - private static final long DRAIN_TIME_PERIOD = 2000L; - - private int numberOfAsyncsToMerge; - private final BlockingQueue timelineEntityQueue; - private ExecutorService executor; - - TimelineEntityDispatcher(Configuration conf) { - timelineEntityQueue = new LinkedBlockingQueue(); - numberOfAsyncsToMerge = - conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, - YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE); - } - - Runnable createRunnable() { - return new Runnable() { - @Override - public void run() { - try { - EntitiesHolder entitiesHolder; - while (!Thread.currentThread().isInterrupted()) { - // Merge all the async calls and make one push, but if its sync - // call push immediately - try { - entitiesHolder = timelineEntityQueue.take(); - } catch (InterruptedException ie) { - LOG.info("Timeline dispatcher thread was interrupted "); - Thread.currentThread().interrupt(); - return; - } - if (entitiesHolder != null) { - publishWithoutBlockingOnQueue(entitiesHolder); - } - } - } finally { - if (!timelineEntityQueue.isEmpty()) { - LOG.info("Yet to publish " + timelineEntityQueue.size() - + " timelineEntities, draining them now. "); - } - // Try to drain the remaining entities to be published @ the max for - // 2 seconds - long timeTillweDrain = - System.currentTimeMillis() + DRAIN_TIME_PERIOD; - while (!timelineEntityQueue.isEmpty()) { - publishWithoutBlockingOnQueue(timelineEntityQueue.poll()); - if (System.currentTimeMillis() > timeTillweDrain) { - // time elapsed stop publishing further.... - if (!timelineEntityQueue.isEmpty()) { - LOG.warn("Time to drain elapsed! Remaining " - + timelineEntityQueue.size() + "timelineEntities will not" - + " be published"); - // if some entities were not drained then we need interrupt - // the threads which had put sync EntityHolders to the queue. - EntitiesHolder nextEntityInTheQueue = null; - while ((nextEntityInTheQueue = - timelineEntityQueue.poll()) != null) { - nextEntityInTheQueue.cancel(true); - } - } - break; - } - } - } - } - - /** - * Publishes the given EntitiesHolder and return immediately if sync - * call, else tries to fetch the EntitiesHolder from the queue in non - * blocking fashion and collate the Entities if possible before - * publishing through REST. - * - * @param entitiesHolder - */ - private void publishWithoutBlockingOnQueue( - EntitiesHolder entitiesHolder) { - if (entitiesHolder.isSync()) { - entitiesHolder.run(); - return; - } - int count = 1; - while (true) { - // loop till we find a sync put Entities or there is nothing - // to take - EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll(); - if (nextEntityInTheQueue == null) { - // Nothing in the queue just publish and get back to the - // blocked wait state - entitiesHolder.run(); - break; - } else if (nextEntityInTheQueue.isSync()) { - // flush all the prev async entities first - entitiesHolder.run(); - // and then flush the sync entity - nextEntityInTheQueue.run(); - break; - } else { - // append all async entities together and then flush - entitiesHolder.getEntities().addEntities( - nextEntityInTheQueue.getEntities().getEntities()); - count++; - if (count == numberOfAsyncsToMerge) { - // Flush the entities if the number of the async - // putEntites merged reaches the desired limit. To avoid - // collecting multiple entities and delaying for a long - // time. - entitiesHolder.run(); - break; - } - } - } - } - }; - } - - public void dispatchEntities(boolean sync, - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[] - entitiesTobePublished) throws YarnException { - if (executor.isShutdown()) { - throw new YarnException("Timeline client is in the process of stopping," - + " not accepting any more TimelineEntities"); - } - - // wrap all TimelineEntity into TimelineEntities object - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities - entities = - new org.apache.hadoop.yarn.api.records.timelineservice. - TimelineEntities(); - for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity - entity : entitiesTobePublished) { - entities.addEntity(entity); - } - - // created a holder and place it in queue - EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync); - try { - timelineEntityQueue.put(entitiesHolder); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new YarnException( - "Failed while adding entity to the queue for publishing", e); - } - - if (sync) { - // In sync call we need to wait till its published and if any error then - // throw it back - try { - entitiesHolder.get(); - } catch (ExecutionException e) { - throw new YarnException("Failed while publishing entity", - e.getCause()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new YarnException("Interrupted while publishing entity", e); - } - } - } - - public void start() { - executor = Executors.newSingleThreadExecutor(); - executor.execute(createRunnable()); - } - - public void stop() { - LOG.info("Stopping TimelineClient."); - executor.shutdownNow(); - try { - executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - e.printStackTrace(); - } - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java new file mode 100644 index 0000000000..b5b5f7759e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java @@ -0,0 +1,440 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.ConnectException; +import java.net.HttpURLConnection; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.URL; +import java.net.URLConnection; +import java.security.GeneralSecurityException; +import java.security.PrivilegedExceptionAction; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; +import org.apache.hadoop.security.ssl.SSLFactory; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; +import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; +import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientRequest; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.api.client.filter.ClientFilter; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; + +/** + * Utility Connector class which is used by timeline clients to securely get + * connected to the timeline server. + * + */ +public class TimelineConnector extends AbstractService { + + private static final Joiner JOINER = Joiner.on(""); + private static final Log LOG = LogFactory.getLog(TimelineConnector.class); + public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute + + private SSLFactory sslFactory; + private Client client; + private ConnectionConfigurator connConfigurator; + private DelegationTokenAuthenticator authenticator; + private DelegationTokenAuthenticatedURL.Token token; + private UserGroupInformation authUgi; + private String doAsUser; + @VisibleForTesting + TimelineClientConnectionRetry connectionRetry; + private boolean requireConnectionRetry; + + public TimelineConnector(boolean requireConnectionRetry, + UserGroupInformation authUgi, String doAsUser, + DelegationTokenAuthenticatedURL.Token token) { + super("TimelineConnector"); + this.requireConnectionRetry = requireConnectionRetry; + this.authUgi = authUgi; + this.doAsUser = doAsUser; + this.token = token; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + ClientConfig cc = new DefaultClientConfig(); + cc.getClasses().add(YarnJacksonJaxbJsonProvider.class); + + sslFactory = getSSLFactory(conf); + connConfigurator = getConnConfigurator(sslFactory); + + if (UserGroupInformation.isSecurityEnabled()) { + authenticator = new KerberosDelegationTokenAuthenticator(); + } else { + authenticator = new PseudoDelegationTokenAuthenticator(); + } + authenticator.setConnectionConfigurator(connConfigurator); + + connectionRetry = new TimelineClientConnectionRetry(conf); + client = + new Client( + new URLConnectionClientHandler(new TimelineURLConnectionFactory( + authUgi, authenticator, connConfigurator, token, doAsUser)), + cc); + if (requireConnectionRetry) { + TimelineJerseyRetryFilter retryFilter = + new TimelineJerseyRetryFilter(connectionRetry); + client.addFilter(retryFilter); + } + } + + private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR + = new ConnectionConfigurator() { + @Override + public HttpURLConnection configure(HttpURLConnection conn) + throws IOException { + setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT); + return conn; + } + }; + + private ConnectionConfigurator getConnConfigurator(SSLFactory sslFactoryObj) { + try { + return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, sslFactoryObj); + } catch (Exception e) { + LOG.debug("Cannot load customized ssl related configuration. " + + "Fallback to system-generic settings.", e); + return DEFAULT_TIMEOUT_CONN_CONFIGURATOR; + } + } + + private static ConnectionConfigurator initSslConnConfigurator( + final int timeout, SSLFactory sslFactory) + throws IOException, GeneralSecurityException { + final SSLSocketFactory sf; + final HostnameVerifier hv; + + sf = sslFactory.createSSLSocketFactory(); + hv = sslFactory.getHostnameVerifier(); + + return new ConnectionConfigurator() { + @Override + public HttpURLConnection configure(HttpURLConnection conn) + throws IOException { + if (conn instanceof HttpsURLConnection) { + HttpsURLConnection c = (HttpsURLConnection) conn; + c.setSSLSocketFactory(sf); + c.setHostnameVerifier(hv); + } + setTimeouts(conn, timeout); + return conn; + } + }; + } + + protected SSLFactory getSSLFactory(Configuration conf) + throws GeneralSecurityException, IOException { + SSLFactory newSSLFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); + newSSLFactory.init(); + return newSSLFactory; + } + + private static void setTimeouts(URLConnection connection, int socketTimeout) { + connection.setConnectTimeout(socketTimeout); + connection.setReadTimeout(socketTimeout); + } + + public static URI constructResURI(Configuration conf, String address, + String uri) { + return URI.create( + JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://", + address, uri)); + } + + DelegationTokenAuthenticatedURL getDelegationTokenAuthenticatedURL() { + return new DelegationTokenAuthenticatedURL(authenticator, connConfigurator); + } + + protected void serviceStop() { + if (this.sslFactory != null) { + this.sslFactory.destroy(); + } + } + + public Client getClient() { + return client; + } + + public Object operateDelegationToken( + final PrivilegedExceptionAction action) + throws IOException, YarnException { + // Set up the retry operation + TimelineClientRetryOp tokenRetryOp = + createRetryOpForOperateDelegationToken(action); + + return connectionRetry.retryOn(tokenRetryOp); + } + + @Private + @VisibleForTesting + TimelineClientRetryOp createRetryOpForOperateDelegationToken( + final PrivilegedExceptionAction action) throws IOException { + return new TimelineClientRetryOpForOperateDelegationToken(this.authUgi, + action); + } + + /** + * Abstract class for an operation that should be retried by timeline client. + */ + @Private + @VisibleForTesting + public static abstract class TimelineClientRetryOp { + // The operation that should be retried + public abstract Object run() throws IOException; + + // The method to indicate if we should retry given the incoming exception + public abstract boolean shouldRetryOn(Exception e); + } + + private static class TimelineURLConnectionFactory + implements HttpURLConnectionFactory { + private DelegationTokenAuthenticator authenticator; + private UserGroupInformation authUgi; + private ConnectionConfigurator connConfigurator; + private Token token; + private String doAsUser; + + public TimelineURLConnectionFactory(UserGroupInformation authUgi, + DelegationTokenAuthenticator authenticator, + ConnectionConfigurator connConfigurator, + DelegationTokenAuthenticatedURL.Token token, String doAsUser) { + this.authUgi = authUgi; + this.authenticator = authenticator; + this.connConfigurator = connConfigurator; + this.token = token; + this.doAsUser = doAsUser; + } + + @Override + public HttpURLConnection getHttpURLConnection(final URL url) + throws IOException { + authUgi.checkTGTAndReloginFromKeytab(); + try { + return new DelegationTokenAuthenticatedURL(authenticator, + connConfigurator).openConnection(url, token, doAsUser); + } catch (UndeclaredThrowableException e) { + throw new IOException(e.getCause()); + } catch (AuthenticationException ae) { + throw new IOException(ae); + } + } + + } + + // Class to handle retry + // Outside this class, only visible to tests + @Private + @VisibleForTesting + static class TimelineClientConnectionRetry { + + // maxRetries < 0 means keep trying + @Private + @VisibleForTesting + public int maxRetries; + + @Private + @VisibleForTesting + public long retryInterval; + + // Indicates if retries happened last time. Only tests should read it. + // In unit tests, retryOn() calls should _not_ be concurrent. + private boolean retried = false; + + @Private + @VisibleForTesting + boolean getRetired() { + return retried; + } + + // Constructor with default retry settings + public TimelineClientConnectionRetry(Configuration conf) { + Preconditions.checkArgument( + conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES) + >= -1, + "%s property value should be greater than or equal to -1", + YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES); + Preconditions.checkArgument( + conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + YarnConfiguration. + DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0, + "%s property value should be greater than zero", + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); + maxRetries = + conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); + retryInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); + } + + public Object retryOn(TimelineClientRetryOp op) + throws RuntimeException, IOException { + int leftRetries = maxRetries; + retried = false; + + // keep trying + while (true) { + try { + // try perform the op, if fail, keep retrying + return op.run(); + } catch (IOException | RuntimeException e) { + // break if there's no retries left + if (leftRetries == 0) { + break; + } + if (op.shouldRetryOn(e)) { + logException(e, leftRetries); + } else { + throw e; + } + } + if (leftRetries > 0) { + leftRetries--; + } + retried = true; + try { + // sleep for the given time interval + Thread.sleep(retryInterval); + } catch (InterruptedException ie) { + LOG.warn("Client retry sleep interrupted! "); + } + } + throw new RuntimeException("Failed to connect to timeline server. " + + "Connection retries limit exceeded. " + + "The posted timeline event may be missing"); + }; + + private void logException(Exception e, int leftRetries) { + if (leftRetries > 0) { + LOG.info( + "Exception caught by TimelineClientConnectionRetry," + " will try " + + leftRetries + " more time(s).\nMessage: " + e.getMessage()); + } else { + // note that maxRetries may be -1 at the very beginning + LOG.info("ConnectionException caught by TimelineClientConnectionRetry," + + " will keep retrying.\nMessage: " + e.getMessage()); + } + } + } + + private static class TimelineJerseyRetryFilter extends ClientFilter { + private TimelineClientConnectionRetry connectionRetry; + + public TimelineJerseyRetryFilter( + TimelineClientConnectionRetry connectionRetry) { + this.connectionRetry = connectionRetry; + } + + @Override + public ClientResponse handle(final ClientRequest cr) + throws ClientHandlerException { + // Set up the retry operation + TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() { + @Override + public Object run() { + // Try pass the request, if fail, keep retrying + return getNext().handle(cr); + } + + @Override + public boolean shouldRetryOn(Exception e) { + // Only retry on connection exceptions + return (e instanceof ClientHandlerException) + && (e.getCause() instanceof ConnectException + || e.getCause() instanceof SocketTimeoutException); + } + }; + try { + return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp); + } catch (IOException e) { + throw new ClientHandlerException( + "Jersey retry failed!\nMessage: " + e.getMessage()); + } + } + } + + @Private + @VisibleForTesting + public static class TimelineClientRetryOpForOperateDelegationToken + extends TimelineClientRetryOp { + + private final UserGroupInformation authUgi; + private final PrivilegedExceptionAction action; + + public TimelineClientRetryOpForOperateDelegationToken( + UserGroupInformation authUgi, PrivilegedExceptionAction action) { + this.authUgi = authUgi; + this.action = action; + } + + @Override + public Object run() throws IOException { + // Try pass the request, if fail, keep retrying + authUgi.checkTGTAndReloginFromKeytab(); + try { + return authUgi.doAs(action); + } catch (UndeclaredThrowableException e) { + throw new IOException(e.getCause()); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + public boolean shouldRetryOn(Exception e) { + // retry on connection exceptions + // and SocketTimeoutException + return (e instanceof ConnectException + || e instanceof SocketTimeoutException); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java new file mode 100644 index 0000000000..cef7e5f579 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.core.util.MultivaluedMapImpl; + +/** + * Implementation of timeline v2 client interface. + * + */ +public class TimelineV2ClientImpl extends TimelineV2Client { + private static final Log LOG = LogFactory.getLog(TimelineV2ClientImpl.class); + + private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; + + private TimelineEntityDispatcher entityDispatcher; + private volatile String timelineServiceAddress; + + // Retry parameters for identifying new timeline service + // TODO consider to merge with connection retry + private int maxServiceRetries; + private long serviceRetryInterval; + + private TimelineConnector connector; + + private ApplicationId contextAppId; + + public TimelineV2ClientImpl(ApplicationId appId) { + super(TimelineV2ClientImpl.class.getName()); + this.contextAppId = appId; + } + + public ApplicationId getContextAppId() { + return contextAppId; + } + + protected void serviceInit(Configuration conf) throws Exception { + if (!YarnConfiguration.timelineServiceEnabled(conf) + || (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) { + throw new IOException("Timeline V2 client is not properly configured. " + + "Either timeline service is not enabled or version is not set to" + + " 2"); + } + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + UserGroupInformation realUgi = ugi.getRealUser(); + String doAsUser = null; + UserGroupInformation authUgi = null; + if (realUgi != null) { + authUgi = realUgi; + doAsUser = ugi.getShortUserName(); + } else { + authUgi = ugi; + doAsUser = null; + } + + // TODO need to add/cleanup filter retry later for ATSV2. similar to V1 + DelegationTokenAuthenticatedURL.Token token = + new DelegationTokenAuthenticatedURL.Token(); + connector = new TimelineConnector(false, authUgi, doAsUser, token); + addIfService(connector); + + // new version need to auto discovery (with retry till ATS v2 address is + // got). + maxServiceRetries = + conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); + serviceRetryInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); + entityDispatcher = new TimelineEntityDispatcher(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + entityDispatcher.start(); + } + + @Override + protected void serviceStop() throws Exception { + entityDispatcher.stop(); + super.serviceStop(); + } + + @Override + public void putEntities(TimelineEntity... entities) + throws IOException, YarnException { + entityDispatcher.dispatchEntities(true, entities); + } + + @Override + public void putEntitiesAsync(TimelineEntity... entities) + throws IOException, YarnException { + entityDispatcher.dispatchEntities(false, entities); + } + + @Override + public void setTimelineServiceAddress(String address) { + this.timelineServiceAddress = address; + } + + @Private + protected void putObjects(String path, MultivaluedMap params, + Object obj) throws IOException, YarnException { + + int retries = verifyRestEndPointAvailable(); + + // timelineServiceAddress could be stale, add retry logic here. + boolean needRetry = true; + while (needRetry) { + try { + URI uri = TimelineConnector.constructResURI(getConfig(), + timelineServiceAddress, RESOURCE_URI_STR_V2); + putObjects(uri, path, params, obj); + needRetry = false; + } catch (IOException e) { + // handle exception for timelineServiceAddress being updated. + checkRetryWithSleep(retries, e); + retries--; + } + } + } + + /** + * Check if reaching to maximum of retries. + * + * @param retries + * @param e + */ + private void checkRetryWithSleep(int retries, IOException e) + throws YarnException, IOException { + if (retries > 0) { + try { + Thread.sleep(this.serviceRetryInterval); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new YarnException("Interrupted while retrying to connect to ATS"); + } + } else { + StringBuilder msg = + new StringBuilder("TimelineClient has reached to max retry times : "); + msg.append(this.maxServiceRetries); + msg.append(" for service address: "); + msg.append(timelineServiceAddress); + LOG.error(msg.toString()); + throw new IOException(msg.toString(), e); + } + } + + protected void putObjects(URI base, String path, + MultivaluedMap params, Object obj) + throws IOException, YarnException { + ClientResponse resp; + try { + resp = connector.getClient().resource(base).path(path).queryParams(params) + .accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON) + .put(ClientResponse.class, obj); + } catch (RuntimeException re) { + // runtime exception is expected if the client cannot connect the server + String msg = "Failed to get the response from the timeline server."; + LOG.error(msg, re); + throw new IOException(re); + } + if (resp == null || resp.getStatusInfo() + .getStatusCode() != ClientResponse.Status.OK.getStatusCode()) { + String msg = + "Response from the timeline server is " + ((resp == null) ? "null" + : "not successful," + " HTTP error code: " + resp.getStatus() + + ", Server response:\n" + resp.getEntity(String.class)); + LOG.error(msg); + throw new YarnException(msg); + } + } + + private int verifyRestEndPointAvailable() throws YarnException { + // timelineServiceAddress could haven't be initialized yet + // or stale (only for new timeline service) + int retries = pollTimelineServiceAddress(this.maxServiceRetries); + if (timelineServiceAddress == null) { + String errMessage = "TimelineClient has reached to max retry times : " + + this.maxServiceRetries + + ", but failed to fetch timeline service address. Please verify" + + " Timeline Auxiliary Service is configured in all the NMs"; + LOG.error(errMessage); + throw new YarnException(errMessage); + } + return retries; + } + + /** + * Poll TimelineServiceAddress for maximum of retries times if it is null. + * + * @param retries + * @return the left retry times + * @throws IOException + */ + private int pollTimelineServiceAddress(int retries) throws YarnException { + while (timelineServiceAddress == null && retries > 0) { + try { + Thread.sleep(this.serviceRetryInterval); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new YarnException("Interrupted while trying to connect ATS"); + } + retries--; + } + return retries; + } + + private final class EntitiesHolder extends FutureTask { + private final TimelineEntities entities; + private final boolean isSync; + + EntitiesHolder(final TimelineEntities entities, final boolean isSync) { + super(new Callable() { + // publishEntities() + public Void call() throws Exception { + MultivaluedMap params = new MultivaluedMapImpl(); + params.add("appid", getContextAppId().toString()); + params.add("async", Boolean.toString(!isSync)); + putObjects("entities", params, entities); + return null; + } + }); + this.entities = entities; + this.isSync = isSync; + } + + public boolean isSync() { + return isSync; + } + + public TimelineEntities getEntities() { + return entities; + } + } + + /** + * This class is responsible for collecting the timeline entities and + * publishing them in async. + */ + private class TimelineEntityDispatcher { + /** + * Time period for which the timelineclient will wait for draining after + * stop. + */ + private static final long DRAIN_TIME_PERIOD = 2000L; + + private int numberOfAsyncsToMerge; + private final BlockingQueue timelineEntityQueue; + private ExecutorService executor; + + TimelineEntityDispatcher(Configuration conf) { + timelineEntityQueue = new LinkedBlockingQueue(); + numberOfAsyncsToMerge = + conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, + YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE); + } + + Runnable createRunnable() { + return new Runnable() { + @Override + public void run() { + try { + EntitiesHolder entitiesHolder; + while (!Thread.currentThread().isInterrupted()) { + // Merge all the async calls and make one push, but if its sync + // call push immediately + try { + entitiesHolder = timelineEntityQueue.take(); + } catch (InterruptedException ie) { + LOG.info("Timeline dispatcher thread was interrupted "); + Thread.currentThread().interrupt(); + return; + } + if (entitiesHolder != null) { + publishWithoutBlockingOnQueue(entitiesHolder); + } + } + } finally { + if (!timelineEntityQueue.isEmpty()) { + LOG.info("Yet to publish " + timelineEntityQueue.size() + + " timelineEntities, draining them now. "); + } + // Try to drain the remaining entities to be published @ the max for + // 2 seconds + long timeTillweDrain = + System.currentTimeMillis() + DRAIN_TIME_PERIOD; + while (!timelineEntityQueue.isEmpty()) { + publishWithoutBlockingOnQueue(timelineEntityQueue.poll()); + if (System.currentTimeMillis() > timeTillweDrain) { + // time elapsed stop publishing further.... + if (!timelineEntityQueue.isEmpty()) { + LOG.warn("Time to drain elapsed! Remaining " + + timelineEntityQueue.size() + "timelineEntities will not" + + " be published"); + // if some entities were not drained then we need interrupt + // the threads which had put sync EntityHolders to the queue. + EntitiesHolder nextEntityInTheQueue = null; + while ((nextEntityInTheQueue = + timelineEntityQueue.poll()) != null) { + nextEntityInTheQueue.cancel(true); + } + } + break; + } + } + } + } + + /** + * Publishes the given EntitiesHolder and return immediately if sync + * call, else tries to fetch the EntitiesHolder from the queue in non + * blocking fashion and collate the Entities if possible before + * publishing through REST. + * + * @param entitiesHolder + */ + private void publishWithoutBlockingOnQueue( + EntitiesHolder entitiesHolder) { + if (entitiesHolder.isSync()) { + entitiesHolder.run(); + return; + } + int count = 1; + while (true) { + // loop till we find a sync put Entities or there is nothing + // to take + EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll(); + if (nextEntityInTheQueue == null) { + // Nothing in the queue just publish and get back to the + // blocked wait state + entitiesHolder.run(); + break; + } else if (nextEntityInTheQueue.isSync()) { + // flush all the prev async entities first + entitiesHolder.run(); + // and then flush the sync entity + nextEntityInTheQueue.run(); + break; + } else { + // append all async entities together and then flush + entitiesHolder.getEntities().addEntities( + nextEntityInTheQueue.getEntities().getEntities()); + count++; + if (count == numberOfAsyncsToMerge) { + // Flush the entities if the number of the async + // putEntites merged reaches the desired limit. To avoid + // collecting multiple entities and delaying for a long + // time. + entitiesHolder.run(); + break; + } + } + } + } + }; + } + + public void dispatchEntities(boolean sync, + TimelineEntity[] entitiesTobePublished) throws YarnException { + if (executor.isShutdown()) { + throw new YarnException("Timeline client is in the process of stopping," + + " not accepting any more TimelineEntities"); + } + + // wrap all TimelineEntity into TimelineEntities object + TimelineEntities entities = new TimelineEntities(); + for (TimelineEntity entity : entitiesTobePublished) { + entities.addEntity(entity); + } + + // created a holder and place it in queue + EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync); + try { + timelineEntityQueue.put(entitiesHolder); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new YarnException( + "Failed while adding entity to the queue for publishing", e); + } + + if (sync) { + // In sync call we need to wait till its published and if any error then + // throw it back + try { + entitiesHolder.get(); + } catch (ExecutionException e) { + throw new YarnException("Failed while publishing entity", + e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new YarnException("Interrupted while publishing entity", e); + } + } + } + + public void start() { + executor = Executors.newSingleThreadExecutor(); + executor.execute(createRunnable()); + } + + public void stop() { + LOG.info("Stopping TimelineClient."); + executor.shutdownNow(); + try { + executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java index bfc7cbd1e5..f42c0780d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java @@ -215,11 +215,11 @@ public void testCheckRetryCount() throws Exception { + "Timeline server should be off to run this test. "); } catch (RuntimeException ce) { Assert.assertTrue( - "Handler exception for reason other than retry: " + ce.getMessage(), - ce.getMessage().contains("Connection retries limit exceeded")); + "Handler exception for reason other than retry: " + ce.getMessage(), + ce.getMessage().contains("Connection retries limit exceeded")); // we would expect this exception here, check if the client has retried - Assert.assertTrue("Retry filter didn't perform any retries! ", client - .connectionRetry.getRetired()); + Assert.assertTrue("Retry filter didn't perform any retries! ", + client.connector.connectionRetry.getRetired()); } } @@ -318,7 +318,7 @@ private void assertException(TimelineClientImpl client, RuntimeException ce) { .getMessage().contains("Connection retries limit exceeded")); // we would expect this exception here, check if the client has retried Assert.assertTrue("Retry filter didn't perform any retries! ", - client.connectionRetry.getRetired()); + client.connector.connectionRetry.getRetired()); } public static ClientResponse mockEntityClientResponse( @@ -419,17 +419,26 @@ protected TimelineWriter createTimelineWriter(Configuration conf, private TimelineClientImpl createTimelineClientFakeTimelineClientRetryOp( YarnConfiguration conf) { TimelineClientImpl client = new TimelineClientImpl() { - @Override - public TimelineClientRetryOp - createTimelineClientRetryOpForOperateDelegationToken( - final PrivilegedExceptionAction action) throws IOException { - TimelineClientRetryOpForOperateDelegationToken op = - spy(new TimelineClientRetryOpForOperateDelegationToken( - UserGroupInformation.getCurrentUser(), action)); - doThrow(new SocketTimeoutException("Test socketTimeoutException")) - .when(op).run(); - return op; + protected TimelineConnector createTimelineConnector() { + TimelineConnector connector = + new TimelineConnector(true, authUgi, doAsUser, token) { + @Override + public TimelineClientRetryOp + createRetryOpForOperateDelegationToken( + final PrivilegedExceptionAction action) + throws IOException { + TimelineClientRetryOpForOperateDelegationToken op = + spy(new TimelineClientRetryOpForOperateDelegationToken( + UserGroupInformation.getCurrentUser(), action)); + doThrow( + new SocketTimeoutException("Test socketTimeoutException")) + .when(op).run(); + return op; + } + }; + addIfService(connector); + return connector; } }; client.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java index 5813340da2..c5b02fd32c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java @@ -50,7 +50,7 @@ public class TestTimelineClientV2Impl { public void setup() { conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3); if (!currTestName.getMethodName() .contains("testRetryOnConnectionFailure")) { @@ -71,7 +71,7 @@ private TestV2TimelineClient createTimelineClient(YarnConfiguration config) { } private class TestV2TimelineClientForExceptionHandling - extends TimelineClientImpl { + extends TimelineV2ClientImpl { public TestV2TimelineClientForExceptionHandling(ApplicationId id) { super(id); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 8994582ac6..ce2c6561a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; -import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -76,7 +76,7 @@ public class NMTimelinePublisher extends CompositeService { private String httpAddress; - private final Map appToClientMap; + private final Map appToClientMap; public NMTimelinePublisher(Context context) { super(NMTimelinePublisher.class.getName()); @@ -102,7 +102,7 @@ protected void serviceStart() throws Exception { } @VisibleForTesting - Map getAppToClientMap() { + Map getAppToClientMap() { return appToClientMap; } @@ -145,7 +145,7 @@ public void reportContainerResourceUsage(Container container, Long pmemUsage, try { // no need to put it as part of publisher as timeline client already has // Queuing concept - TimelineClient timelineClient = getTimelineClient(appId); + TimelineV2Client timelineClient = getTimelineClient(appId); if (timelineClient != null) { timelineClient.putEntitiesAsync(entity); } else { @@ -234,7 +234,7 @@ private void publishContainerLocalizationEvent( try { // no need to put it as part of publisher as timeline client already has // Queuing concept - TimelineClient timelineClient = getTimelineClient(appId); + TimelineV2Client timelineClient = getTimelineClient(appId); if (timelineClient != null) { timelineClient.putEntitiesAsync(entity); } else { @@ -265,7 +265,7 @@ private void putEntity(TimelineEntity entity, ApplicationId appId) { LOG.debug("Publishing the entity " + entity + ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); } - TimelineClient timelineClient = getTimelineClient(appId); + TimelineV2Client timelineClient = getTimelineClient(appId); if (timelineClient != null) { timelineClient.putEntities(entity); } else { @@ -382,8 +382,8 @@ public TimelineEntity getTimelineEntityToPublish() { public void createTimelineClient(ApplicationId appId) { if (!appToClientMap.containsKey(appId)) { - TimelineClient timelineClient = - TimelineClient.createTimelineClient(appId); + TimelineV2Client timelineClient = + TimelineV2Client.createTimelineClient(appId); timelineClient.init(getConfig()); timelineClient.start(); appToClientMap.put(appId, timelineClient); @@ -391,7 +391,7 @@ public void createTimelineClient(ApplicationId appId) { } public void stopTimelineClient(ApplicationId appId) { - TimelineClient client = appToClientMap.remove(appId); + TimelineV2Client client = appToClientMap.remove(appId); if (client != null) { client.stop(); } @@ -399,13 +399,13 @@ public void stopTimelineClient(ApplicationId appId) { public void setTimelineServiceAddress(ApplicationId appId, String collectorAddr) { - TimelineClient client = appToClientMap.get(appId); + TimelineV2Client client = appToClientMap.get(appId); if (client != null) { client.setTimelineServiceAddress(collectorAddr); } } - private TimelineClient getTimelineClient(ApplicationId appId) { + private TimelineV2Client getTimelineClient(ApplicationId appId) { return appToClientMap.get(appId); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java index ae9397a78e..e116122931 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -50,7 +50,7 @@ public class TestNMTimelinePublisher { public void testContainerResourceUsage() { Context context = mock(Context.class); @SuppressWarnings("unchecked") - final DummyTimelineClient timelineClient = new DummyTimelineClient(); + final DummyTimelineClient timelineClient = new DummyTimelineClient(null); when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); when(context.getHttpPort()).thenReturn(0); NMTimelinePublisher publisher = new NMTimelinePublisher(context) { @@ -137,7 +137,11 @@ private void verifyPublishedResourceUsageMetrics( } } - protected static class DummyTimelineClient extends TimelineClientImpl { + protected static class DummyTimelineClient extends TimelineV2ClientImpl { + public DummyTimelineClient(ApplicationId appId) { + super(appId); + } + private TimelineEntity[] lastPublishedEntities; @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 3ec222fe6a..07058f62ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity; -import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; @@ -96,8 +96,8 @@ public static void tearDownClass() throws Exception { @Test public void testPutEntities() throws Exception { - TimelineClient client = - TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1)); + TimelineV2Client client = + TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1)); try { // set the timeline service address manually client.setTimelineServiceAddress( @@ -123,8 +123,8 @@ public void testPutEntities() throws Exception { @Test public void testPutExtendedEntities() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 1); - TimelineClient client = - TimelineClient.createTimelineClient(appId); + TimelineV2Client client = + TimelineV2Client.createTimelineClient(appId); try { // set the timeline service address manually client.setTimelineServiceAddress(