From d45ff878c4cb8b359abb17ecf09d24b6f862874c Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Fri, 27 Feb 2015 08:46:42 -0800 Subject: [PATCH] YARN-3125. Made the distributed shell use timeline service next gen and add an integration test for it. Contributed by Junping Du and Li Lu. (cherry picked from commit bf08f7f0ed4900ce52f98137297dd1a47ba2a536) --- .../distributedshell/ApplicationMaster.java | 167 +++++++++++++++++- .../applications/distributedshell/Client.java | 20 ++- .../TestDistributedShell.java | 67 ++++++- .../aggregator/BaseAggregatorService.java | 6 + .../aggregator/PerNodeAggregatorServer.java | 4 +- 5 files changed, 245 insertions(+), 19 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 380b822deb..2e416a695a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -218,6 +218,8 @@ public static enum DSEntity { private int appMasterRpcPort = -1; // Tracking url to which app master publishes info for clients to monitor private String appMasterTrackingUrl = ""; + + private boolean newTimelineService = false; // App Master configuration // No. of containers to run shell command on @@ -401,7 +403,8 @@ public boolean init(String[] args) throws ParseException, IOException { opts.addOption("container_retry_interval", true, "Interval between each retry, unit is milliseconds"); opts.addOption("debug", false, "Dump out debug information"); - + opts.addOption("timeline_service_version", true, + "Version for timeline service"); opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -552,6 +555,30 @@ public boolean init(String[] args) throws ParseException, IOException { cliParser.getOptionValue("container_max_retries", "0")); containrRetryInterval = Integer.parseInt(cliParser.getOptionValue( "container_retry_interval", "0")); + + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + if (cliParser.hasOption("timeline_service_version")) { + String timelineServiceVersion = + cliParser.getOptionValue("timeline_service_version", "v1"); + if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) { + newTimelineService = false; + } else if (timelineServiceVersion.trim().equalsIgnoreCase("v2")) { + newTimelineService = true; + } else { + throw new IllegalArgumentException( + "timeline_service_version is not set properly, should be 'v1' or 'v2'"); + } + } + } else { + timelineClient = null; + LOG.warn("Timeline service is not enabled"); + if (cliParser.hasOption("timeline_service_version")) { + throw new IllegalArgumentException( + "Timeline service is not enabled"); + } + } + return true; } @@ -599,7 +626,6 @@ public void run() throws YarnException, IOException, InterruptedException { UserGroupInformation.createRemoteUser(appSubmitterUserName); appSubmitterUgi.addCredentials(credentials); - AMRMClientAsync.AbstractCallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); @@ -613,8 +639,14 @@ public void run() throws YarnException, IOException, InterruptedException { startTimelineClient(conf); if(timelineClient != null) { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); + if (newTimelineService) { + publishApplicationAttemptEventOnNewTimelineService(timelineClient, + appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId, + appSubmitterUgi); + } else { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); + } } // Setup local RPC Server to accept status requests directly from clients @@ -717,9 +749,15 @@ protected boolean finish() { } catch (InterruptedException ex) {} } - if(timelineClient != null) { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + if (timelineClient != null) { + if (newTimelineService) { + publishApplicationAttemptEventOnNewTimelineService(timelineClient, + appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId, + appSubmitterUgi); + } else { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); + } } // Join all launched threads @@ -825,8 +863,13 @@ public void onContainersCompleted(List completedContainers) { + containerStatus.getContainerId()); } if(timelineClient != null) { - publishContainerEndEvent( - timelineClient, containerStatus, domainId, appSubmitterUgi); + if (newTimelineService) { + publishContainerEndEventOnNewTimelineService( + timelineClient, containerStatus, domainId, appSubmitterUgi); + } else { + publishContainerEndEvent( + timelineClient, containerStatus, domainId, appSubmitterUgi); + } } } @@ -952,6 +995,16 @@ public void onContainerStarted(ContainerId containerId, applicationMaster.publishContainerStartEvent( applicationMaster.timelineClient, container, applicationMaster.domainId, applicationMaster.appSubmitterUgi); + + if (applicationMaster.newTimelineService) { + ApplicationMaster.publishContainerStartEventOnNewTimelineService( + applicationMaster.timelineClient, container, + applicationMaster.domainId, applicationMaster.appSubmitterUgi); + } else { + applicationMaster.publishContainerStartEvent( + applicationMaster.timelineClient, container, + applicationMaster.domainId, applicationMaster.appSubmitterUgi); + } } } @@ -1306,4 +1359,102 @@ Thread createLaunchContainerThread(Container allocatedContainer, shellId); return new Thread(runnableLaunchContainer); } + + private static void publishContainerStartEventOnNewTimelineService( + final TimelineClient timelineClient, Container container, String domainId, + UserGroupInformation ugi) { + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); + entity.setId(container.getId().toString()); + entity.setType(DSEntity.DS_CONTAINER.toString()); + //entity.setDomainId(domainId); + entity.addInfo("user", ugi.getShortUserName()); + + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setId(DSEvent.DS_CONTAINER_START.toString()); + event.addInfo("Node", container.getNodeId().toString()); + event.addInfo("Resources", container.getResource().toString()); + entity.addEvent(event); + + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + timelineClient.putEntities(entity); + return null; + } + }); + } catch (Exception e) { + LOG.error("Container start event could not be published for " + + container.getId().toString(), + e instanceof UndeclaredThrowableException ? e.getCause() : e); + } + } + + private static void publishContainerEndEventOnNewTimelineService( + final TimelineClient timelineClient, ContainerStatus container, + String domainId, UserGroupInformation ugi) { + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); + entity.setId(container.getContainerId().toString()); + entity.setType(DSEntity.DS_CONTAINER.toString()); + //entity.setDomainId(domainId); + entity.addInfo("user", ugi.getShortUserName()); + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setId(DSEvent.DS_CONTAINER_END.toString()); + event.addInfo("State", container.getState().name()); + event.addInfo("Exit Status", container.getExitStatus()); + entity.addEvent(event); + + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + timelineClient.putEntities(entity); + return null; + } + }); + } catch (Exception e) { + LOG.error("Container end event could not be published for " + + container.getContainerId().toString(), + e instanceof UndeclaredThrowableException ? e.getCause() : e); + } + } + + private static void publishApplicationAttemptEventOnNewTimelineService( + final TimelineClient timelineClient, String appAttemptId, + DSEvent appEvent, String domainId, UserGroupInformation ugi) { + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); + entity.setId(appAttemptId); + entity.setType(DSEntity.DS_APP_ATTEMPT.toString()); + //entity.setDomainId(domainId); + entity.addInfo("user", ugi.getShortUserName()); + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); + event.setId(appEvent.toString()); + event.setTimestamp(System.currentTimeMillis()); + entity.addEvent(event); + + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + timelineClient.putEntities(entity); + return null; + } + }); + } catch (Exception e) { + LOG.error("App Attempt " + + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + + " event could not be published for " + + appAttemptId.toString(), + e instanceof UndeclaredThrowableException ? e.getCause() : e); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index c8c9303a0e..af424241c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -189,6 +189,8 @@ public class Client { // Command line options private Options opts; + + private String timelineServiceVersion; private static final String shellCommandPath = "shellCommands"; private static final String shellArgsPath = "shellArgs"; @@ -264,6 +266,7 @@ public Client(Configuration conf) throws Exception { opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("log_properties", true, "log4j.properties file"); + opts.addOption("timeline_service_version", true, "Version for timeline service"); opts.addOption("keep_containers_across_application_attempts", false, "Flag to indicate whether to keep containers across application attempts." + " If the flag is true, running containers will not be killed when" + @@ -370,6 +373,16 @@ public boolean init(String[] args) throws ParseException { throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting." + " Specified virtual cores=" + amVCores); } + + if (cliParser.hasOption("timeline_service_version")) { + timelineServiceVersion = + cliParser.getOptionValue("timeline_service_version", "v1"); + if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") || + timelineServiceVersion.trim().equalsIgnoreCase("v2"))) { + throw new IllegalArgumentException( + "timeline_service_version is not set properly, should be 'v1' or 'v2'"); + } + } if (!cliParser.hasOption("jar")) { throw new IllegalArgumentException("No jar file specified for application master"); @@ -667,13 +680,16 @@ public boolean run() throws IOException, YarnException { for (Map.Entry entry : shellEnv.entrySet()) { vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); - } + } if (debugFlag) { vargs.add("--debug"); } vargs.addAll(containerRetryOptions); + if (timelineServiceVersion != null) { + vargs.add("--timeline_service_version " + timelineServiceVersion); + } vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); @@ -683,7 +699,7 @@ public boolean run() throws IOException, YarnException { command.append(str).append(" "); } - LOG.info("Completed setting up app master command " + command.toString()); + LOG.info("Completed setting up app master command " + command.toString()); List commands = new ArrayList(); commands.add(command.toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 9448cf14bc..8b06c51bf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -85,8 +86,6 @@ import org.junit.Test; import org.junit.rules.Timeout; -import com.sun.jersey.api.client.ClientHandlerException; - public class TestDistributedShell { private static final Log LOG = @@ -99,6 +98,7 @@ public class TestDistributedShell { protected YarnConfiguration conf = null; private static final int NUM_NMS = 1; private static final float DEFAULT_TIMELINE_VERSION = 1.0f; + private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_aggregator"; protected final static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); @@ -122,11 +122,16 @@ private void setupInternal(int numNodeManager, float timelineVersion) throws Exception { LOG.info("Starting up YARN cluster"); - + conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); conf.set("yarn.log.dir", "target"); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + // mark if we need to launch the v1 timeline server + boolean enableATSServer = true; + // disable aux-service based timeline aggregators + conf.set(YarnConfiguration.NM_AUX_SERVICES, ""); + conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); conf.set("mapreduce.jobhistory.address", @@ -148,6 +153,13 @@ private void setupInternal(int numNodeManager, float timelineVersion) PluginStoreTestUtils.prepareConfiguration(conf, hdfsCluster); conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES, DistributedShellTimelinePlugin.class.getName()); + } else if (timelineVersion == 2.0f) { + // disable v1 timeline server since we no longer have a server here + enableATSServer = false; + // enable aux-service based timeline aggregators + conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); + conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME + + ".class", PerNodeAggregatorServer.class.getName()); } else { Assert.fail("Wrong timeline version number: " + timelineVersion); } @@ -155,7 +167,7 @@ private void setupInternal(int numNodeManager, float timelineVersion) if (yarnCluster == null) { yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, - numNodeManager, 1, 1); + numNodeManager, 1, 1, enableATSServer); yarnCluster.init(conf); yarnCluster.start(); @@ -242,6 +254,12 @@ public void testDSShellWithDomainV1_5() throws Exception { testDSShell(true); } + @Test(timeout=90000) + @TimelineVersion(2.0f) + public void testDSShellWithoutDomainV2() throws Exception { + testDSShell(false); + } + public void testDSShell(boolean haveDomain) throws Exception { String[] args = { "--jar", @@ -269,9 +287,17 @@ public void testDSShell(boolean haveDomain) throws Exception { "writer_user writer_group", "--create" }; - List argsList = new ArrayList(Arrays.asList(args)); - argsList.addAll(Arrays.asList(domainArgs)); - args = argsList.toArray(new String[argsList.size()]); + args = mergeArgs(args, domainArgs); + } + boolean isTestingTimelineV2 = false; + if (timelineVersionWatcher.getTimelineVersion() == 2.0f) { + String[] timelineArgs = { + "--timeline_service_version", + "v2" + }; + isTestingTimelineV2 = true; + args = mergeArgs(args, timelineArgs); + LOG.info("Setup: Using timeline v2!"); } LOG.info("Initializing DS Client"); @@ -344,6 +370,15 @@ public void run() { } TimelineDomain domain = null; + if (!isTestingTimelineV2) { + checkTimelineV1(haveDomain); + } else { + checkTimelineV2(haveDomain); + } + } + + private void checkTimelineV1(boolean haveDomain) throws Exception { + TimelineDomain domain = null; if (haveDomain) { domain = yarnCluster.getApplicationHistoryServer() .getTimelineStore().getDomain("TEST_DOMAIN"); @@ -394,6 +429,24 @@ public void run() { } } + private void checkTimelineV2(boolean haveDomain) { + // TODO check timeline V2 here after we have a storage layer + } + + /** + * Utility function to merge two String arrays to form a new String array for + * our argumemts. + * + * @param args + * @param newArgs + * @return a String array consists of {args, newArgs} + */ + private String[] mergeArgs(String[] args, String[] newArgs) { + List argsList = new ArrayList(Arrays.asList(args)); + argsList.addAll(Arrays.asList(newArgs)); + return argsList.toArray(new String[argsList.size()]); + } + /* * NetUtils.getHostname() returns a string in the form "hostname/ip". * Sometimes the hostname we get is the FQDN and sometimes the short name. In diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java index 46e5574c07..e3621398c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java @@ -72,6 +72,12 @@ protected void serviceStop() throws Exception { */ public void postEntities(TimelineEntities entities, UserGroupInformation callerUgi) { + // Add this output temporarily for our prototype + // TODO remove this after we have an actual implementation + LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE"); + LOG.info("postEntities(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + // TODO implement if (LOG.isDebugEnabled()) { LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java index 55c6271103..deb21c7681 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java @@ -118,8 +118,8 @@ private void startWebApp() { .setConf(conf) .addEndpoint(URI.create("http://" + bindAddress)); timelineRestServer = builder.build(); - // TODO: replace this by an authentification filter in future. - HashMap options = new HashMap(); + // TODO: replace this by an authentication filter in future. + HashMap options = new HashMap<>(); String username = conf.get(HADOOP_HTTP_STATIC_USER, DEFAULT_HADOOP_HTTP_STATIC_USER); options.put(HADOOP_HTTP_STATIC_USER, username);