YARN-4545. Allow YARN distributed shell to use ATS v1.5 APIs. Li Lu via junping_du

This commit is contained in:
Junping Du 2016-03-14 08:28:38 -07:00
parent 658ee95ff3
commit f291d82cd4
12 changed files with 469 additions and 41 deletions

View File

@ -296,6 +296,19 @@
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId>

View File

@ -121,6 +121,27 @@
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -88,6 +88,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@ -99,6 +100,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -277,6 +279,9 @@ public static enum DSEntity {
// Timeline Client // Timeline Client
@VisibleForTesting @VisibleForTesting
TimelineClient timelineClient; TimelineClient timelineClient;
static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
static final String APPID_TIMELINE_FILTER_NAME = "appId";
static final String USER_TIMELINE_FILTER_NAME = "user";
private final String linux_bash_command = "bash"; private final String linux_bash_command = "bash";
private final String windows_command = "cmd /c"; private final String windows_command = "cmd /c";
@ -904,7 +909,7 @@ public void onContainerStarted(ContainerId containerId,
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
} }
if(applicationMaster.timelineClient != null) { if(applicationMaster.timelineClient != null) {
ApplicationMaster.publishContainerStartEvent( applicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container, applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi); applicationMaster.domainId, applicationMaster.appSubmitterUgi);
} }
@ -1120,15 +1125,17 @@ private String readContent(String filePath) throws IOException {
org.apache.commons.io.IOUtils.closeQuietly(ds); org.apache.commons.io.IOUtils.closeQuietly(ds);
} }
} }
private static void publishContainerStartEvent( private void publishContainerStartEvent(
final TimelineClient timelineClient, Container container, String domainId, final TimelineClient timelineClient, final Container container,
UserGroupInformation ugi) { String domainId, UserGroupInformation ugi) {
final TimelineEntity entity = new TimelineEntity(); final TimelineEntity entity = new TimelineEntity();
entity.setEntityId(container.getId().toString()); entity.setEntityId(container.getId().toString());
entity.setEntityType(DSEntity.DS_CONTAINER.toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString());
entity.setDomainId(domainId); entity.setDomainId(domainId);
entity.addPrimaryFilter("user", ugi.getShortUserName()); entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId()
.getApplicationAttemptId().getApplicationId().toString());
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis()); event.setTimestamp(System.currentTimeMillis());
event.setEventType(DSEvent.DS_CONTAINER_START.toString()); event.setEventType(DSEvent.DS_CONTAINER_START.toString());
@ -1137,28 +1144,27 @@ private static void publishContainerStartEvent(
entity.addEvent(event); entity.addEvent(event);
try { try {
ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { processTimelineResponseErrors(
@Override putContainerEntity(timelineClient,
public TimelinePutResponse run() throws Exception { container.getId().getApplicationAttemptId(),
return processTimelineResponseErrors( entity));
timelineClient.putEntities(entity)); } catch (YarnException | IOException e) {
}
});
} catch (Exception e) {
LOG.error("Container start event could not be published for " LOG.error("Container start event could not be published for "
+ container.getId().toString(), + container.getId().toString(), e);
e instanceof UndeclaredThrowableException ? e.getCause() : e);
} }
} }
private static void publishContainerEndEvent( private void publishContainerEndEvent(
final TimelineClient timelineClient, ContainerStatus container, final TimelineClient timelineClient, ContainerStatus container,
String domainId, UserGroupInformation ugi) { String domainId, UserGroupInformation ugi) {
final TimelineEntity entity = new TimelineEntity(); final TimelineEntity entity = new TimelineEntity();
entity.setEntityId(container.getContainerId().toString()); entity.setEntityId(container.getContainerId().toString());
entity.setEntityType(DSEntity.DS_CONTAINER.toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString());
entity.setDomainId(domainId); entity.setDomainId(domainId);
entity.addPrimaryFilter("user", ugi.getShortUserName()); entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME,
container.getContainerId().getApplicationAttemptId()
.getApplicationId().toString());
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis()); event.setTimestamp(System.currentTimeMillis());
event.setEventType(DSEvent.DS_CONTAINER_END.toString()); event.setEventType(DSEvent.DS_CONTAINER_END.toString());
@ -1166,22 +1172,38 @@ private static void publishContainerEndEvent(
event.addEventInfo("Exit Status", container.getExitStatus()); event.addEventInfo("Exit Status", container.getExitStatus());
entity.addEvent(event); entity.addEvent(event);
try { try {
TimelinePutResponse response = timelineClient.putEntities(entity); processTimelineResponseErrors(
processTimelineResponseErrors(response); putContainerEntity(timelineClient,
container.getContainerId().getApplicationAttemptId(),
entity));
} catch (YarnException | IOException e) { } catch (YarnException | IOException e) {
LOG.error("Container end event could not be published for " LOG.error("Container end event could not be published for "
+ container.getContainerId().toString(), e); + container.getContainerId().toString(), e);
} }
} }
private static void publishApplicationAttemptEvent( private TimelinePutResponse putContainerEntity(
TimelineClient timelineClient, ApplicationAttemptId currAttemptId,
TimelineEntity entity)
throws YarnException, IOException {
if (TimelineUtils.timelineServiceV1_5Enabled(conf)) {
TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
currAttemptId.getApplicationId(),
CONTAINER_ENTITY_GROUP_ID);
return timelineClient.putEntities(currAttemptId, groupId, entity);
} else {
return timelineClient.putEntities(entity);
}
}
private void publishApplicationAttemptEvent(
final TimelineClient timelineClient, String appAttemptId, final TimelineClient timelineClient, String appAttemptId,
DSEvent appEvent, String domainId, UserGroupInformation ugi) { DSEvent appEvent, String domainId, UserGroupInformation ugi) {
final TimelineEntity entity = new TimelineEntity(); final TimelineEntity entity = new TimelineEntity();
entity.setEntityId(appAttemptId); entity.setEntityId(appAttemptId);
entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
entity.setDomainId(domainId); entity.setDomainId(domainId);
entity.addPrimaryFilter("user", ugi.getShortUserName()); entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
event.setEventType(appEvent.toString()); event.setEventType(appEvent.toString());
event.setTimestamp(System.currentTimeMillis()); event.setTimestamp(System.currentTimeMillis());
@ -1197,7 +1219,7 @@ private static void publishApplicationAttemptEvent(
} }
} }
private static TimelinePutResponse processTimelineResponseErrors( private TimelinePutResponse processTimelineResponseErrors(
TimelinePutResponse response) { TimelinePutResponse response) {
List<TimelinePutResponse.TimelinePutError> errors = response.getErrors(); List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
if (errors.size() == 0) { if (errors.size() == 0) {

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.TimelineEntityGroupPlugin;
import org.apache.hadoop.yarn.util.ConverterUtils;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.SortedSet;
/**
* Timeline v1.5 reader plugin for YARN distributed shell. It tranlsates an
* incoming getEntity request to a set of related timeline entity groups, via
* the information provided in the primary filter or entity id field.
*/
public class DistributedShellTimelinePlugin extends TimelineEntityGroupPlugin {
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters) {
if (ApplicationMaster.DSEntity.DS_CONTAINER.toString().equals(entityType)) {
if (primaryFilter == null) {
return null;
}
return toEntityGroupId(primaryFilter.getValue().toString());
}
return null;
}
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
String entityType) {
if (ApplicationMaster.DSEntity.DS_CONTAINER.toString().equals(entityId)) {
ContainerId containerId = ConverterUtils.toContainerId(entityId);
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
return toEntityGroupId(appId.toString());
}
return null;
}
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
SortedSet<String> entityIds, Set<String> eventTypes) {
// Right now this method is not used by TimelineEntityGroupPlugin
return null;
}
private Set<TimelineEntityGroupId> toEntityGroupId(String strAppId) {
ApplicationId appId = ConverterUtils.toApplicationId(strAppId);
TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
appId, ApplicationMaster.CONTAINER_ENTITY_GROUP_ID);
Set<TimelineEntityGroupId> result = new HashSet<>();
result.add(groupId);
return result;
}
}

View File

@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;

View File

@ -36,12 +36,19 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
@ -50,29 +57,50 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout;
public class TestDistributedShell { public class TestDistributedShell {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(TestDistributedShell.class); LogFactory.getLog(TestDistributedShell.class);
protected MiniYARNCluster yarnCluster = null; protected MiniYARNCluster yarnCluster = null;
protected MiniDFSCluster hdfsCluster = null;
private FileSystem fs = null;
protected YarnConfiguration conf = null; protected YarnConfiguration conf = null;
private static final int NUM_NMS = 1; private static final int NUM_NMS = 1;
private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
protected final static String APPMASTER_JAR = protected final static String APPMASTER_JAR =
JarFinder.getJar(ApplicationMaster.class); JarFinder.getJar(ApplicationMaster.class);
@Rule
public TimelineVersionWatcher timelineVersionWatcher
= new TimelineVersionWatcher();
@Rule
public Timeout globalTimeout = new Timeout(90000);
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
setupInternal(NUM_NMS); setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion());
} }
protected void setupInternal(int numNodeManager) throws Exception { protected void setupInternal(int numNodeManager) throws Exception {
setupInternal(numNodeManager, DEFAULT_TIMELINE_VERSION);
}
private void setupInternal(int numNodeManager, float timelineVersion)
throws Exception {
LOG.info("Starting up YARN cluster"); LOG.info("Starting up YARN cluster");
@ -84,6 +112,26 @@ protected void setupInternal(int numNodeManager) throws Exception {
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
conf.set("mapreduce.jobhistory.address", conf.set("mapreduce.jobhistory.address",
"0.0.0.0:" + ServerSocketUtil.getPort(10021, 10)); "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10));
// ATS version specific settings
if (timelineVersion == 1.0f) {
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
} else if (timelineVersion == 1.5f) {
if (hdfsCluster == null) {
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
.numDataNodes(1).build();
}
fs = hdfsCluster.getFileSystem();
PluginStoreTestUtils.prepareFileSystemForPluginStore(fs);
PluginStoreTestUtils.prepareConfiguration(conf, hdfsCluster);
conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
DistributedShellTimelinePlugin.class.getName());
} else {
Assert.fail("Wrong timeline version number: " + timelineVersion);
}
if (yarnCluster == null) { if (yarnCluster == null) {
yarnCluster = yarnCluster =
@ -138,6 +186,13 @@ public void tearDown() throws IOException {
yarnCluster = null; yarnCluster = null;
} }
} }
if (hdfsCluster != null) {
try {
hdfsCluster.shutdown();
} finally {
hdfsCluster = null;
}
}
FileContext fsContext = FileContext.getLocalFSFileContext(); FileContext fsContext = FileContext.getLocalFSFileContext();
fsContext fsContext
.delete( .delete(
@ -146,16 +201,28 @@ public void tearDown() throws IOException {
true); true);
} }
@Test(timeout=90000) @Test
public void testDSShellWithDomain() throws Exception { public void testDSShellWithDomain() throws Exception {
testDSShell(true); testDSShell(true);
} }
@Test(timeout=90000) @Test
public void testDSShellWithoutDomain() throws Exception { public void testDSShellWithoutDomain() throws Exception {
testDSShell(false); testDSShell(false);
} }
@Test
@TimelineVersion(1.5f)
public void testDSShellWithoutDomainV1_5() throws Exception {
testDSShell(false);
}
@Test
@TimelineVersion(1.5f)
public void testDSShellWithDomainV1_5() throws Exception {
testDSShell(true);
}
public void testDSShell(boolean haveDomain) throws Exception { public void testDSShell(boolean haveDomain) throws Exception {
String[] args = { String[] args = {
"--jar", "--jar",
@ -239,6 +306,24 @@ public void run() {
LOG.info("Client run completed. Result=" + result); LOG.info("Client run completed. Result=" + result);
Assert.assertTrue(result.get()); Assert.assertTrue(result.get());
if (timelineVersionWatcher.getTimelineVersion() == 1.5f) {
long scanInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT
);
Path doneDir = new Path(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT
);
// Wait till the data is moved to done dir, or timeout and fail
while (true) {
RemoteIterator<FileStatus> iterApps = fs.listStatusIterator(doneDir);
if (iterApps.hasNext()) {
break;
}
Thread.sleep(scanInterval * 2);
}
}
TimelineDomain domain = null; TimelineDomain domain = null;
if (haveDomain) { if (haveDomain) {
domain = yarnCluster.getApplicationHistoryServer() domain = yarnCluster.getApplicationHistoryServer()
@ -265,11 +350,18 @@ public void run() {
Assert.assertEquals("DEFAULT", Assert.assertEquals("DEFAULT",
entitiesAttempts.getEntities().get(0).getDomainId()); entitiesAttempts.getEntities().get(0).getDomainId());
} }
String currAttemptEntityId
= entitiesAttempts.getEntities().get(0).getEntityId();
ApplicationAttemptId attemptId
= ConverterUtils.toApplicationAttemptId(currAttemptEntityId);
NameValuePair primaryFilter = new NameValuePair(
ApplicationMaster.APPID_TIMELINE_FILTER_NAME,
attemptId.getApplicationId().toString());
TimelineEntities entities = yarnCluster TimelineEntities entities = yarnCluster
.getApplicationHistoryServer() .getApplicationHistoryServer()
.getTimelineStore() .getTimelineStore()
.getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null, .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
null, null, null, null, null, null, null, null); null, null, null, null, primaryFilter, null, null, null);
Assert.assertNotNull(entities); Assert.assertNotNull(entities);
Assert.assertEquals(2, entities.getEntities().size()); Assert.assertEquals(2, entities.getEntities().size());
Assert.assertEquals(entities.getEntities().get(0).getEntityType() Assert.assertEquals(entities.getEntities().get(0).getEntityType()
@ -341,7 +433,7 @@ private boolean checkIPs(String hostname, String localIP, String appIP)
} }
@Test(timeout=90000) @Test
public void testDSRestartWithPreviousRunningContainers() throws Exception { public void testDSRestartWithPreviousRunningContainers() throws Exception {
String[] args = { String[] args = {
"--jar", "--jar",
@ -376,7 +468,7 @@ public void testDSRestartWithPreviousRunningContainers() throws Exception {
* how many attempt failures for previous 2.5 seconds. * how many attempt failures for previous 2.5 seconds.
* The application is expected to be successful. * The application is expected to be successful.
*/ */
@Test(timeout=90000) @Test
public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { public void testDSAttemptFailuresValidityIntervalSucess() throws Exception {
String[] args = { String[] args = {
"--jar", "--jar",
@ -414,7 +506,7 @@ public void testDSAttemptFailuresValidityIntervalSucess() throws Exception {
* how many attempt failure for previous 15 seconds. * how many attempt failure for previous 15 seconds.
* The application is expected to be fail. * The application is expected to be fail.
*/ */
@Test(timeout=90000) @Test
public void testDSAttemptFailuresValidityIntervalFailed() throws Exception { public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
String[] args = { String[] args = {
"--jar", "--jar",
@ -446,7 +538,7 @@ public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
Assert.assertFalse(result); Assert.assertFalse(result);
} }
@Test(timeout=90000) @Test
public void testDSShellWithCustomLogPropertyFile() throws Exception { public void testDSShellWithCustomLogPropertyFile() throws Exception {
final File basedir = final File basedir =
new File("target", TestDistributedShell.class.getName()); new File("target", TestDistributedShell.class.getName());
@ -541,7 +633,7 @@ public void testDSShellWithCommands() throws Exception {
verifyContainerLog(2, expectedContent, false, ""); verifyContainerLog(2, expectedContent, false, "");
} }
@Test(timeout=90000) @Test
public void testDSShellWithMultipleArgs() throws Exception { public void testDSShellWithMultipleArgs() throws Exception {
String[] args = { String[] args = {
"--jar", "--jar",
@ -575,7 +667,7 @@ public void testDSShellWithMultipleArgs() throws Exception {
verifyContainerLog(4, expectedContent, false, ""); verifyContainerLog(4, expectedContent, false, "");
} }
@Test(timeout=90000) @Test
public void testDSShellWithShellScript() throws Exception { public void testDSShellWithShellScript() throws Exception {
final File basedir = final File basedir =
new File("target", TestDistributedShell.class.getName()); new File("target", TestDistributedShell.class.getName());
@ -623,7 +715,7 @@ public void testDSShellWithShellScript() throws Exception {
verifyContainerLog(1, expectedContent, false, ""); verifyContainerLog(1, expectedContent, false, "");
} }
@Test(timeout=90000) @Test
public void testDSShellWithInvalidArgs() throws Exception { public void testDSShellWithInvalidArgs() throws Exception {
Client client = new Client(new Configuration(yarnCluster.getConfig())); Client client = new Client(new Configuration(yarnCluster.getConfig()));
@ -785,7 +877,7 @@ protected void waitForNMsToRegister() throws Exception {
} }
} }
@Test(timeout=90000) @Test
public void testContainerLaunchFailureHandling() throws Exception { public void testContainerLaunchFailureHandling() throws Exception {
String[] args = { String[] args = {
"--jar", "--jar",
@ -813,7 +905,7 @@ public void testContainerLaunchFailureHandling() throws Exception {
} }
@Test(timeout=90000) @Test
public void testDebugFlag() throws Exception { public void testDebugFlag() throws Exception {
String[] args = { String[] args = {
"--jar", "--jar",

View File

@ -86,6 +86,41 @@ public static String dumpTimelineRecordtoJSON(Object o, boolean pretty)
} }
} }
/**
* Returns whether the timeline service is enabled via configuration.
*
* @param conf the configuration
* @return whether the timeline service is enabled.
*/
public static boolean timelineServiceEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
}
/**
* Returns the timeline service version. It does not check whether the
* timeline service itself is enabled.
*
* @param conf the configuration
* @return the timeline service version as a float.
*/
public static float getTimelineServiceVersion(Configuration conf) {
return conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
}
/**
* Returns whether the timeline service v.1.5 is enabled via configuration.
*
* @param conf the configuration
* @return whether the timeline service v.1.5 is enabled. V.1.5 refers to a
* version equal to 1.5.
*/
public static boolean timelineServiceV1_5Enabled(Configuration conf) {
return timelineServiceEnabled(conf) &&
Math.abs(getTimelineServiceVersion(conf) - 1.5) < 0.00001;
}
public static TimelineAbout createTimelineAbout(String about) { public static TimelineAbout createTimelineAbout(String about) {
TimelineAbout tsInfo = new TimelineAbout(about); TimelineAbout tsInfo = new TimelineAbout(about);
tsInfo.setHadoopBuildVersion(VersionInfo.getBuildVersion()); tsInfo.setHadoopBuildVersion(VersionInfo.getBuildVersion());

View File

@ -37,7 +37,6 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
@ -76,6 +75,7 @@
import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -749,8 +749,12 @@ protected synchronized void serviceInit(Configuration conf)
appHistoryServer = new ApplicationHistoryServer(); appHistoryServer = new ApplicationHistoryServer();
conf.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE, conf.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE,
MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class); MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, // Only set memory timeline store if timeline v1.5 is not enabled.
MemoryTimelineStore.class, TimelineStore.class); // Otherwise, caller has the freedom to choose storage impl.
if (!TimelineUtils.timelineServiceV1_5Enabled(conf)) {
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class);
}
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
MemoryTimelineStateStore.class, TimelineStateStore.class); MemoryTimelineStateStore.class, TimelineStateStore.class);
if (!useFixedPorts) { if (!useFixedPorts) {

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = {ElementType.METHOD})
public @interface TimelineVersion {
float value() default TimelineVersionWatcher.DEFAULT_TIMELINE_VERSION;
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TimelineVersionWatcher extends TestWatcher {
static final float DEFAULT_TIMELINE_VERSION = 1.0f;
private TimelineVersion version;
@Override
protected void starting(Description description) {
version = description.getAnnotation(TimelineVersion.class);
}
/**
* @return the version number of timeline server for the current test (using
* timeline server v1.0 by default)
*/
public float getTimelineVersion() {
if(version == null) {
return DEFAULT_TIMELINE_VERSION;
}
return version.value();
}
}

View File

@ -137,4 +137,20 @@
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -18,13 +18,16 @@
package org.apache.hadoop.yarn.server.timeline; package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonFactory;
@ -48,7 +51,53 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
class PluginStoreTestUtils { /**
* Utility methods related to the ATS v1.5 plugin storage tests.
*/
public class PluginStoreTestUtils {
/**
* For a given file system, setup directories ready to test the plugin storage.
*
* @param fs a {@link FileSystem} object that the plugin storage will work with
* @return the dfsCluster ready to start plugin storage tests.
* @throws IOException
*/
public static FileSystem prepareFileSystemForPluginStore(FileSystem fs)
throws IOException {
Path activeDir = new Path(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT
);
Path doneDir = new Path(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT
);
fs.mkdirs(activeDir);
fs.mkdirs(doneDir);
return fs;
}
/**
* Prepare configuration for plugin tests. This method will also add the mini
* DFS cluster's info to the configuration.
* Note: the test program needs to setup the reader plugin by itself.
*
* @param conf
* @param dfsCluster
* @return the modified configuration
*/
public static YarnConfiguration prepareConfiguration(YarnConfiguration conf,
MiniDFSCluster dfsCluster) {
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
dfsCluster.getURI().toString());
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
conf.setLong(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
1);
conf.set(YarnConfiguration.TIMELINE_SERVICE_STORE,
EntityGroupFSTimelineStore.class.getName());
return conf;
}
static FSDataOutputStream createLogFile(Path logPath, FileSystem fs) static FSDataOutputStream createLogFile(Path logPath, FileSystem fs)
throws IOException { throws IOException {