From f291d82cd49c04a81380bc45c97c279d791b571c Mon Sep 17 00:00:00 2001 From: Junping Du Date: Mon, 14 Mar 2016 08:28:38 -0700 Subject: [PATCH] YARN-4545. Allow YARN distributed shell to use ATS v1.5 APIs. Li Lu via junping_du --- hadoop-project/pom.xml | 13 ++ .../pom.xml | 21 +++ .../distributedshell/ApplicationMaster.java | 68 ++++++---- .../DistributedShellTimelinePlugin.java | 79 ++++++++++++ .../distributedshell/package-info.java | 19 +++ .../TestDistributedShell.java | 120 ++++++++++++++++-- .../yarn/util/timeline/TimelineUtils.java | 35 +++++ .../hadoop/yarn/server/MiniYARNCluster.java | 10 +- .../yarn/server/timeline/TimelineVersion.java | 31 +++++ .../timeline/TimelineVersionWatcher.java | 47 +++++++ .../pom.xml | 16 +++ .../server/timeline/PluginStoreTestUtils.java | 51 +++++++- 12 files changed, 469 insertions(+), 41 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellTimelinePlugin.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersion.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersionWatcher.java diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index f23b46ed81..3362c117f8 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -296,6 +296,19 @@ ${project.version} + + org.apache.hadoop + hadoop-yarn-server-timeline-pluginstorage + ${project.version} + + + + org.apache.hadoop + hadoop-yarn-server-timeline-pluginstorage + test-jar + ${project.version} + + org.apache.hadoop hadoop-mapreduce-client-jobclient diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml index 09a56ea675..c118603d5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml @@ -121,6 +121,27 @@ mockito-all test + + org.apache.hadoop + hadoop-yarn-server-timeline-pluginstorage + + + org.apache.hadoop + hadoop-yarn-server-timeline-pluginstorage + test-jar + test + + + org.apache.hadoop + hadoop-hdfs + test + + + org.apache.hadoop + hadoop-hdfs + test + test-jar + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 95dbddcd74..0f82903dfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -99,6 +100,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; @@ -277,6 +279,9 @@ public static enum DSEntity { // Timeline Client @VisibleForTesting TimelineClient timelineClient; + static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS"; + static final String APPID_TIMELINE_FILTER_NAME = "appId"; + static final String USER_TIMELINE_FILTER_NAME = "user"; private final String linux_bash_command = "bash"; private final String windows_command = "cmd /c"; @@ -904,7 +909,7 @@ public void onContainerStarted(ContainerId containerId, applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); } if(applicationMaster.timelineClient != null) { - ApplicationMaster.publishContainerStartEvent( + applicationMaster.publishContainerStartEvent( applicationMaster.timelineClient, container, applicationMaster.domainId, applicationMaster.appSubmitterUgi); } @@ -1120,15 +1125,17 @@ private String readContent(String filePath) throws IOException { org.apache.commons.io.IOUtils.closeQuietly(ds); } } - - private static void publishContainerStartEvent( - final TimelineClient timelineClient, Container container, String domainId, - UserGroupInformation ugi) { + + private void publishContainerStartEvent( + final TimelineClient timelineClient, final Container container, + String domainId, UserGroupInformation ugi) { final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); entity.setDomainId(domainId); - entity.addPrimaryFilter("user", ugi.getShortUserName()); + entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); + entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId() + .getApplicationAttemptId().getApplicationId().toString()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_START.toString()); @@ -1137,28 +1144,27 @@ private static void publishContainerStartEvent( entity.addEvent(event); try { - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public TimelinePutResponse run() throws Exception { - return processTimelineResponseErrors( - timelineClient.putEntities(entity)); - } - }); - } catch (Exception e) { + processTimelineResponseErrors( + putContainerEntity(timelineClient, + container.getId().getApplicationAttemptId(), + entity)); + } catch (YarnException | IOException e) { LOG.error("Container start event could not be published for " - + container.getId().toString(), - e instanceof UndeclaredThrowableException ? e.getCause() : e); + + container.getId().toString(), e); } } - private static void publishContainerEndEvent( + private void publishContainerEndEvent( final TimelineClient timelineClient, ContainerStatus container, String domainId, UserGroupInformation ugi) { final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getContainerId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); entity.setDomainId(domainId); - entity.addPrimaryFilter("user", ugi.getShortUserName()); + entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); + entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, + container.getContainerId().getApplicationAttemptId() + .getApplicationId().toString()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_END.toString()); @@ -1166,22 +1172,38 @@ private static void publishContainerEndEvent( event.addEventInfo("Exit Status", container.getExitStatus()); entity.addEvent(event); try { - TimelinePutResponse response = timelineClient.putEntities(entity); - processTimelineResponseErrors(response); + processTimelineResponseErrors( + putContainerEntity(timelineClient, + container.getContainerId().getApplicationAttemptId(), + entity)); } catch (YarnException | IOException e) { LOG.error("Container end event could not be published for " + container.getContainerId().toString(), e); } } - private static void publishApplicationAttemptEvent( + private TimelinePutResponse putContainerEntity( + TimelineClient timelineClient, ApplicationAttemptId currAttemptId, + TimelineEntity entity) + throws YarnException, IOException { + if (TimelineUtils.timelineServiceV1_5Enabled(conf)) { + TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance( + currAttemptId.getApplicationId(), + CONTAINER_ENTITY_GROUP_ID); + return timelineClient.putEntities(currAttemptId, groupId, entity); + } else { + return timelineClient.putEntities(entity); + } + } + + private void publishApplicationAttemptEvent( final TimelineClient timelineClient, String appAttemptId, DSEvent appEvent, String domainId, UserGroupInformation ugi) { final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(appAttemptId); entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); entity.setDomainId(domainId); - entity.addPrimaryFilter("user", ugi.getShortUserName()); + entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setEventType(appEvent.toString()); event.setTimestamp(System.currentTimeMillis()); @@ -1197,7 +1219,7 @@ private static void publishApplicationAttemptEvent( } } - private static TimelinePutResponse processTimelineResponseErrors( + private TimelinePutResponse processTimelineResponseErrors( TimelinePutResponse response) { List errors = response.getErrors(); if (errors.size() == 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellTimelinePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellTimelinePlugin.java new file mode 100644 index 0000000000..55fbd60b18 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellTimelinePlugin.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.apache.hadoop.yarn.server.timeline.NameValuePair; +import org.apache.hadoop.yarn.server.timeline.TimelineEntityGroupPlugin; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.SortedSet; + +/** + * Timeline v1.5 reader plugin for YARN distributed shell. It tranlsates an + * incoming getEntity request to a set of related timeline entity groups, via + * the information provided in the primary filter or entity id field. + */ +public class DistributedShellTimelinePlugin extends TimelineEntityGroupPlugin { + + @Override + public Set getTimelineEntityGroupId(String entityType, + NameValuePair primaryFilter, Collection secondaryFilters) { + if (ApplicationMaster.DSEntity.DS_CONTAINER.toString().equals(entityType)) { + if (primaryFilter == null) { + return null; + } + return toEntityGroupId(primaryFilter.getValue().toString()); + } + return null; + } + + @Override + public Set getTimelineEntityGroupId(String entityId, + String entityType) { + if (ApplicationMaster.DSEntity.DS_CONTAINER.toString().equals(entityId)) { + ContainerId containerId = ConverterUtils.toContainerId(entityId); + ApplicationId appId = containerId.getApplicationAttemptId() + .getApplicationId(); + return toEntityGroupId(appId.toString()); + } + return null; + } + + @Override + public Set getTimelineEntityGroupId(String entityType, + SortedSet entityIds, Set eventTypes) { + // Right now this method is not used by TimelineEntityGroupPlugin + return null; + } + + private Set toEntityGroupId(String strAppId) { + ApplicationId appId = ConverterUtils.toApplicationId(strAppId); + TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance( + appId, ApplicationMaster.CONTAINER_ENTITY_GROUP_ID); + Set result = new HashSet<>(); + result.add(groupId); + return result; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/package-info.java new file mode 100644 index 0000000000..299a286748 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 3197875f3d..65360508ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -36,12 +36,19 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; @@ -50,29 +57,50 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; +import org.apache.hadoop.yarn.server.timeline.NameValuePair; +import org.apache.hadoop.yarn.server.timeline.TimelineVersion; +import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; public class TestDistributedShell { private static final Log LOG = LogFactory.getLog(TestDistributedShell.class); - protected MiniYARNCluster yarnCluster = null; + protected MiniYARNCluster yarnCluster = null; + protected MiniDFSCluster hdfsCluster = null; + private FileSystem fs = null; protected YarnConfiguration conf = null; private static final int NUM_NMS = 1; + private static final float DEFAULT_TIMELINE_VERSION = 1.0f; protected final static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); + @Rule + public TimelineVersionWatcher timelineVersionWatcher + = new TimelineVersionWatcher(); + @Rule + public Timeout globalTimeout = new Timeout(90000); + @Before public void setup() throws Exception { - setupInternal(NUM_NMS); + setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion()); } protected void setupInternal(int numNodeManager) throws Exception { + setupInternal(numNodeManager, DEFAULT_TIMELINE_VERSION); + } + + private void setupInternal(int numNodeManager, float timelineVersion) + throws Exception { LOG.info("Starting up YARN cluster"); @@ -84,6 +112,26 @@ protected void setupInternal(int numNodeManager) throws Exception { conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); conf.set("mapreduce.jobhistory.address", "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10)); + + // ATS version specific settings + if (timelineVersion == 1.0f) { + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT); + } else if (timelineVersion == 1.5f) { + if (hdfsCluster == null) { + HdfsConfiguration hdfsConfig = new HdfsConfiguration(); + hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig) + .numDataNodes(1).build(); + } + fs = hdfsCluster.getFileSystem(); + PluginStoreTestUtils.prepareFileSystemForPluginStore(fs); + PluginStoreTestUtils.prepareConfiguration(conf, hdfsCluster); + conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES, + DistributedShellTimelinePlugin.class.getName()); + } else { + Assert.fail("Wrong timeline version number: " + timelineVersion); + } if (yarnCluster == null) { yarnCluster = @@ -138,6 +186,13 @@ public void tearDown() throws IOException { yarnCluster = null; } } + if (hdfsCluster != null) { + try { + hdfsCluster.shutdown(); + } finally { + hdfsCluster = null; + } + } FileContext fsContext = FileContext.getLocalFSFileContext(); fsContext .delete( @@ -146,16 +201,28 @@ public void tearDown() throws IOException { true); } - @Test(timeout=90000) + @Test public void testDSShellWithDomain() throws Exception { testDSShell(true); } - @Test(timeout=90000) + @Test public void testDSShellWithoutDomain() throws Exception { testDSShell(false); } + @Test + @TimelineVersion(1.5f) + public void testDSShellWithoutDomainV1_5() throws Exception { + testDSShell(false); + } + + @Test + @TimelineVersion(1.5f) + public void testDSShellWithDomainV1_5() throws Exception { + testDSShell(true); + } + public void testDSShell(boolean haveDomain) throws Exception { String[] args = { "--jar", @@ -239,6 +306,24 @@ public void run() { LOG.info("Client run completed. Result=" + result); Assert.assertTrue(result.get()); + if (timelineVersionWatcher.getTimelineVersion() == 1.5f) { + long scanInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS, + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT + ); + Path doneDir = new Path( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT + ); + // Wait till the data is moved to done dir, or timeout and fail + while (true) { + RemoteIterator iterApps = fs.listStatusIterator(doneDir); + if (iterApps.hasNext()) { + break; + } + Thread.sleep(scanInterval * 2); + } + } + TimelineDomain domain = null; if (haveDomain) { domain = yarnCluster.getApplicationHistoryServer() @@ -265,11 +350,18 @@ public void run() { Assert.assertEquals("DEFAULT", entitiesAttempts.getEntities().get(0).getDomainId()); } + String currAttemptEntityId + = entitiesAttempts.getEntities().get(0).getEntityId(); + ApplicationAttemptId attemptId + = ConverterUtils.toApplicationAttemptId(currAttemptEntityId); + NameValuePair primaryFilter = new NameValuePair( + ApplicationMaster.APPID_TIMELINE_FILTER_NAME, + attemptId.getApplicationId().toString()); TimelineEntities entities = yarnCluster .getApplicationHistoryServer() .getTimelineStore() .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null, - null, null, null, null, null, null, null, null); + null, null, null, null, primaryFilter, null, null, null); Assert.assertNotNull(entities); Assert.assertEquals(2, entities.getEntities().size()); Assert.assertEquals(entities.getEntities().get(0).getEntityType() @@ -341,7 +433,7 @@ private boolean checkIPs(String hostname, String localIP, String appIP) } - @Test(timeout=90000) + @Test public void testDSRestartWithPreviousRunningContainers() throws Exception { String[] args = { "--jar", @@ -376,7 +468,7 @@ public void testDSRestartWithPreviousRunningContainers() throws Exception { * how many attempt failures for previous 2.5 seconds. * The application is expected to be successful. */ - @Test(timeout=90000) + @Test public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { String[] args = { "--jar", @@ -414,7 +506,7 @@ public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { * how many attempt failure for previous 15 seconds. * The application is expected to be fail. */ - @Test(timeout=90000) + @Test public void testDSAttemptFailuresValidityIntervalFailed() throws Exception { String[] args = { "--jar", @@ -446,7 +538,7 @@ public void testDSAttemptFailuresValidityIntervalFailed() throws Exception { Assert.assertFalse(result); } - @Test(timeout=90000) + @Test public void testDSShellWithCustomLogPropertyFile() throws Exception { final File basedir = new File("target", TestDistributedShell.class.getName()); @@ -541,7 +633,7 @@ public void testDSShellWithCommands() throws Exception { verifyContainerLog(2, expectedContent, false, ""); } - @Test(timeout=90000) + @Test public void testDSShellWithMultipleArgs() throws Exception { String[] args = { "--jar", @@ -575,7 +667,7 @@ public void testDSShellWithMultipleArgs() throws Exception { verifyContainerLog(4, expectedContent, false, ""); } - @Test(timeout=90000) + @Test public void testDSShellWithShellScript() throws Exception { final File basedir = new File("target", TestDistributedShell.class.getName()); @@ -623,7 +715,7 @@ public void testDSShellWithShellScript() throws Exception { verifyContainerLog(1, expectedContent, false, ""); } - @Test(timeout=90000) + @Test public void testDSShellWithInvalidArgs() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); @@ -785,7 +877,7 @@ protected void waitForNMsToRegister() throws Exception { } } - @Test(timeout=90000) + @Test public void testContainerLaunchFailureHandling() throws Exception { String[] args = { "--jar", @@ -813,7 +905,7 @@ public void testContainerLaunchFailureHandling() throws Exception { } - @Test(timeout=90000) + @Test public void testDebugFlag() throws Exception { String[] args = { "--jar", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java index 4f838e659e..a794e9714c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java @@ -86,6 +86,41 @@ public static String dumpTimelineRecordtoJSON(Object o, boolean pretty) } } + /** + * Returns whether the timeline service is enabled via configuration. + * + * @param conf the configuration + * @return whether the timeline service is enabled. + */ + public static boolean timelineServiceEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); + } + + /** + * Returns the timeline service version. It does not check whether the + * timeline service itself is enabled. + * + * @param conf the configuration + * @return the timeline service version as a float. + */ + public static float getTimelineServiceVersion(Configuration conf) { + return conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); + } + + /** + * Returns whether the timeline service v.1.5 is enabled via configuration. + * + * @param conf the configuration + * @return whether the timeline service v.1.5 is enabled. V.1.5 refers to a + * version equal to 1.5. + */ + public static boolean timelineServiceV1_5Enabled(Configuration conf) { + return timelineServiceEnabled(conf) && + Math.abs(getTimelineServiceVersion(conf) - 1.5) < 0.00001; + } + public static TimelineAbout createTimelineAbout(String about) { TimelineAbout tsInfo = new TimelineAbout(about); tsInfo.setHadoopBuildVersion(VersionInfo.getBuildVersion()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 630b7ef677..024adc624e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -37,7 +37,6 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.service.ServiceStateException; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; @@ -76,6 +75,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; @@ -749,8 +749,12 @@ protected synchronized void serviceInit(Configuration conf) appHistoryServer = new ApplicationHistoryServer(); conf.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE, MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class); - conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, - MemoryTimelineStore.class, TimelineStore.class); + // Only set memory timeline store if timeline v1.5 is not enabled. + // Otherwise, caller has the freedom to choose storage impl. + if (!TimelineUtils.timelineServiceV1_5Enabled(conf)) { + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, + MemoryTimelineStore.class, TimelineStore.class); + } conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, MemoryTimelineStateStore.class, TimelineStateStore.class); if (!useFixedPorts) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersion.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersion.java new file mode 100644 index 0000000000..57439de078 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersion.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(value = RetentionPolicy.RUNTIME) +@Target(value = {ElementType.METHOD}) +public @interface TimelineVersion { + float value() default TimelineVersionWatcher.DEFAULT_TIMELINE_VERSION; +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersionWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersionWatcher.java new file mode 100644 index 0000000000..b00f13a0ab --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersionWatcher.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TimelineVersionWatcher extends TestWatcher { + static final float DEFAULT_TIMELINE_VERSION = 1.0f; + private TimelineVersion version; + + @Override + protected void starting(Description description) { + version = description.getAnnotation(TimelineVersion.class); + } + + /** + * @return the version number of timeline server for the current test (using + * timeline server v1.0 by default) + */ + public float getTimelineVersion() { + if(version == null) { + return DEFAULT_TIMELINE_VERSION; + } + return version.value(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml index 71f76d39dc..1dd301ab18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml @@ -137,4 +137,20 @@ jackson-databind + + + + + maven-jar-plugin + + + + test-jar + + test-compile + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java index f529b59fd6..c2c4101abb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java @@ -18,13 +18,16 @@ package org.apache.hadoop.yarn.server.timeline; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; import org.codehaus.jackson.JsonFactory; @@ -48,7 +51,53 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -class PluginStoreTestUtils { +/** + * Utility methods related to the ATS v1.5 plugin storage tests. + */ +public class PluginStoreTestUtils { + + /** + * For a given file system, setup directories ready to test the plugin storage. + * + * @param fs a {@link FileSystem} object that the plugin storage will work with + * @return the dfsCluster ready to start plugin storage tests. + * @throws IOException + */ + public static FileSystem prepareFileSystemForPluginStore(FileSystem fs) + throws IOException { + Path activeDir = new Path( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT + ); + Path doneDir = new Path( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT + ); + + fs.mkdirs(activeDir); + fs.mkdirs(doneDir); + return fs; + } + + /** + * Prepare configuration for plugin tests. This method will also add the mini + * DFS cluster's info to the configuration. + * Note: the test program needs to setup the reader plugin by itself. + * + * @param conf + * @param dfsCluster + * @return the modified configuration + */ + public static YarnConfiguration prepareConfiguration(YarnConfiguration conf, + MiniDFSCluster dfsCluster) { + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + dfsCluster.getURI().toString()); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f); + conf.setLong( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS, + 1); + conf.set(YarnConfiguration.TIMELINE_SERVICE_STORE, + EntityGroupFSTimelineStore.class.getName()); + return conf; + } static FSDataOutputStream createLogFile(Path logPath, FileSystem fs) throws IOException {