YARN-3240. Implement client API to put generic entities. Contributed by Zhijie Shen

(cherry picked from commit 4487da249f448d5c67b712cd0aa723e764eed77d)
This commit is contained in:
Junping Du 2015-02-25 02:40:55 -08:00 committed by Sangjin Lee
parent c77f86bf63
commit 4f0c7eaff3
12 changed files with 341 additions and 46 deletions

View File

@ -305,6 +305,13 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-applications-distributedshell</artifactId>

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.HashSet;
import java.util.Set;
@XmlRootElement(name = "entities")
@XmlAccessorType(XmlAccessType.NONE)
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class TimelineEntities {
private Set<TimelineEntity> entities = new HashSet<>();
public TimelineEntities() {
}
@XmlElement(name = "entities")
public Set<TimelineEntity> getEntities() {
return entities;
}
public void setEntities(Set<TimelineEntity> entities) {
this.entities = entities;
}
public void addEntities(Set<TimelineEntity> entities) {
this.entities.addAll(entities);
}
public void addEntity(TimelineEntity entity) {
entities.add(entity);
}
}

View File

@ -215,6 +215,7 @@
<exclude>src/main/resources/webapps/jobhistory/.keep</exclude>
<exclude>src/main/resources/webapps/yarn/.keep</exclude>
<exclude>src/main/resources/webapps/applicationhistory/.keep</exclude>
<exclude>src/main/resources/webapps/timeline/.keep</exclude>
<exclude>src/main/resources/webapps/cluster/.keep</exclude>
<exclude>src/main/resources/webapps/test/.keep</exclude>
<exclude>src/main/resources/webapps/proxy/.keep</exclude>

View File

@ -31,6 +31,9 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -54,15 +57,25 @@ public abstract class TimelineClient extends AbstractService implements
*
* @return a timeline client
*/
protected ApplicationId contextAppId;
protected String timelineServiceAddress;
@Public
public static TimelineClient createTimelineClient() {
TimelineClient client = new TimelineClientImpl();
return client;
}
@Public
public static TimelineClient createTimelineClient(ApplicationId appId) {
TimelineClient client = new TimelineClientImpl(appId);
return client;
}
@Private
protected TimelineClient(String name) {
protected TimelineClient(String name, ApplicationId appId) {
super(name);
contextAppId = appId;
}
/**
@ -187,4 +200,49 @@ public abstract long renewDelegationToken(
public abstract void cancelDelegationToken(
Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException;
/**
* <p>
* Send the information of a number of conceptual entities to the timeline
* aggregator. It is a blocking API. The method will not return until all the
* put entities have been persisted.
* </p>
*
* @param entities
* the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
* @throws IOException
* @throws YarnException
*/
@Public
public abstract void putEntities(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
throws IOException, YarnException;
/**
* <p>
* Send the information of a number of conceptual entities to the timeline
* aggregator. It is an asynchronous API. The method will return once all the
* entities are received.
* </p>
*
* @param entities
* the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
* @throws IOException
* @throws YarnException
*/
@Public
public abstract void putEntitiesAsync(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
throws IOException, YarnException;
/**
* <p>
* Update the timeline service address where the request will be sent to
* </p>
* @param address
* the timeline service address
*/
public void setTimelineServiceAddress(String address) {
timelineServiceAddress = address;
}
}

View File

@ -34,6 +34,9 @@
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@ -43,8 +46,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
@ -54,6 +57,7 @@
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@ -78,13 +82,15 @@
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import com.sun.jersey.core.util.MultivaluedMapImpl;
@Private
@Evolving
public class TimelineClientImpl extends TimelineClient {
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
private static final Joiner JOINER = Joiner.on("");
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
@ -253,7 +259,11 @@ public boolean shouldRetryOn(Exception e) {
}
public TimelineClientImpl() {
super(TimelineClientImpl.class.getName());
super(TimelineClientImpl.class.getName(), null);
}
public TimelineClientImpl(ApplicationId applicationId) {
super(TimelineClientImpl.class.getName(), applicationId);
}
protected void serviceInit(Configuration conf) throws Exception {
@ -285,21 +295,19 @@ protected void serviceInit(Configuration conf) throws Exception {
client.addFilter(retryFilter);
if (YarnConfiguration.useHttps(conf)) {
resURI = URI
.create(JOINER.join("https://", conf.get(
timelineServiceAddress = conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
RESOURCE_URI_STR));
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
} else {
resURI = URI.create(JOINER.join("http://", conf.get(
timelineServiceAddress = conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
RESOURCE_URI_STR));
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
}
LOG.info("Timeline service address: " + resURI);
timelineServiceVersion =
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
LOG.info("Timeline service address: " + timelineServiceAddress);
super.serviceInit(conf);
}
@ -341,6 +349,39 @@ public TimelinePutResponse putEntities(
return timelineWriter.putEntities(entities);
}
@Override
public void putEntities(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
throws IOException, YarnException {
putEntities(false, entities);
}
@Override
public void putEntitiesAsync(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
throws IOException, YarnException {
putEntities(true, entities);
}
private void putEntities(boolean async,
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
throws IOException, YarnException {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
entitiesContainer =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) {
entitiesContainer.addEntity(entity);
}
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
if (contextAppId != null) {
params.add("appid", contextAppId.toString());
}
if (async) {
params.add("async", Boolean.TRUE.toString());
}
putObjects(constructResURI(getConfig(), timelineServiceAddress, true),
"entities", params, entitiesContainer);
}
@Override
public void putDomain(TimelineDomain domain) throws IOException,
@ -348,6 +389,36 @@ public void putDomain(TimelineDomain domain) throws IOException,
timelineWriter.putDomain(domain);
}
private void putObjects(
URI base, String path, MultivaluedMap<String, String> params, Object obj)
throws IOException, YarnException {
ClientResponse resp;
try {
resp = client.resource(base).path(path).queryParams(params)
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.put(ClientResponse.class, obj);
} catch (RuntimeException re) {
// runtime exception is expected if the client cannot connect the server
String msg =
"Failed to get the response from the timeline server.";
LOG.error(msg, re);
throw new IOException(re);
}
if (resp == null ||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
String msg =
"Failed to get the response from the timeline server.";
LOG.error(msg);
if (LOG.isDebugEnabled() && resp != null) {
String output = resp.getEntity(String.class);
LOG.debug("HTTP error code: " + resp.getStatus()
+ " Server response:\n" + output);
}
throw new YarnException(msg);
}
}
@SuppressWarnings("unchecked")
@Override
public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
@ -362,7 +433,8 @@ public Token<TimelineDelegationTokenIdentifier> run()
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
return (Token) authUrl.getDelegationToken(
resURI.toURL(), token, renewer, doAsUser);
constructResURI(getConfig(), timelineServiceAddress, false).toURL(),
token, renewer, doAsUser);
}
};
return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
@ -397,7 +469,7 @@ public Long run() throws Exception {
// the configured service address.
final URI serviceURI = isTokenServiceAddrEmpty ? resURI
: new URI(scheme, null, address.getHostName(),
address.getPort(), RESOURCE_URI_STR, null, null);
address.getPort(), RESOURCE_URI_STR_V1, null, null);
return authUrl
.renewDelegationToken(serviceURI.toURL(), token, doAsUser);
}
@ -434,7 +506,7 @@ public Void run() throws Exception {
// the configured service address.
final URI serviceURI = isTokenServiceAddrEmpty ? resURI
: new URI(scheme, null, address.getHostName(),
address.getPort(), RESOURCE_URI_STR, null, null);
address.getPort(), RESOURCE_URI_STR_V1, null, null);
authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
return null;
}
@ -528,6 +600,13 @@ private static void setTimeouts(URLConnection connection, int socketTimeout) {
connection.setReadTimeout(socketTimeout);
}
private static URI constructResURI(
Configuration conf, String address, boolean v2) {
return URI.create(
JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
}
public static void main(String[] argv) throws Exception {
CommandLine cliParser = new GnuParser().parse(opts, argv);
if (cliParser.hasOption("put")) {

View File

@ -76,6 +76,13 @@ public void testTimelineEntities() throws Exception {
entity.addIsRelatedToEntity("test type 4", "test id 4");
entity.addIsRelatedToEntity("test type 5", "test id 5");
LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entity, true));
TimelineEntities entities = new TimelineEntities();
TimelineEntity entity1 = new TimelineEntity();
entities.addEntity(entity1);
TimelineEntity entity2 = new TimelineEntity();
entities.addEntity(entity2);
LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entities, true));
}
@Test

View File

@ -86,6 +86,18 @@
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>

View File

@ -0,0 +1,54 @@
package org.apache.hadoop.yarn.server.timelineservice;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.fail;
public class TestTimelineServiceClientIntegration {
private static PerNodeAggregatorServer server;
@BeforeClass
public static void setupClass() throws Exception {
try {
server = PerNodeAggregatorServer.launchServer(new String[0]);
server.addApplication(ApplicationId.newInstance(0, 1));
} catch (ExitUtil.ExitException e) {
fail();
}
}
@AfterClass
public static void tearDownClass() throws Exception {
if (server != null) {
server.stop();
}
}
@Test
public void testPutEntities() throws Exception {
TimelineClient client =
TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
try {
client.init(new YarnConfiguration());
client.start();
TimelineEntity entity = new TimelineEntity();
entity.setType("test entity type");
entity.setId("test entity id");
client.putEntities(entity);
client.putEntitiesAsync(entity);
} catch(Exception e) {
fail();
} finally {
client.stop();
}
}
}

View File

@ -114,6 +114,17 @@
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -25,8 +25,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
/**
* Service that handles writes to the timeline service and writes them to the
@ -70,16 +69,14 @@ protected void serviceStop() throws Exception {
*
* @param entities entities to post
* @param callerUgi the caller UGI
* @return the response that contains the result of the post.
*/
public TimelinePutResponse postEntities(TimelineEntities entities,
public void postEntities(TimelineEntities entities,
UserGroupInformation callerUgi) {
// TODO implement
if (LOG.isDebugEnabled()) {
LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
callerUgi + ")");
}
return null;
}
/**

View File

@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import com.google.inject.Inject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -39,9 +40,7 @@
import org.apache.hadoop.yarn.server.api.ContainerContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.*;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
@ -120,6 +119,8 @@ private static class TimelineServiceWebApp
extends WebApp implements YarnWebParams {
@Override
public void setup() {
bind(YarnJacksonJaxbJsonProvider.class);
bind(GenericExceptionHandler.class);
bind(PerNodeAggregatorWebService.class);
// bind to the global singleton
bind(AppLevelServiceManager.class).
@ -214,7 +215,7 @@ public ByteBuffer getMetaData() {
}
@VisibleForTesting
static PerNodeAggregatorServer launchServer(String[] args) {
public static PerNodeAggregatorServer launchServer(String[] args) {
Thread
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args,

View File

@ -20,12 +20,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@ -40,14 +35,17 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.net.URI;
/**
* The main per-node REST end point for timeline service writes. It is
* essentially a container service that routes requests to the appropriate
@ -112,11 +110,14 @@ public AboutInfo about(
* the request to the app level aggregator. It expects an application as a
* context.
*/
@POST
@PUT
@Path("/entities")
@Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
public TimelinePutResponse postEntities(
public Response putEntities(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@QueryParam("async") String async,
@QueryParam("appid") String appId,
TimelineEntities entities) {
init(res);
UserGroupInformation callerUgi = getUser(req);
@ -127,13 +128,20 @@ public TimelinePutResponse postEntities(
}
// TODO how to express async posts and handle them
boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
try {
AppLevelAggregatorService service = getAggregatorService(req);
appId = parseApplicationId(appId);
if (appId == null) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
AppLevelAggregatorService service = serviceManager.getService(appId);
if (service == null) {
LOG.error("Application not found");
throw new NotFoundException(); // different exception?
}
return service.postEntities(entities, callerUgi);
service.postEntities(entities, callerUgi);
return Response.ok().build();
} catch (Exception e) {
LOG.error("Error putting entities", e);
throw new WebApplicationException(e,
@ -141,17 +149,19 @@ public TimelinePutResponse postEntities(
}
}
private AppLevelAggregatorService
getAggregatorService(HttpServletRequest req) {
String appIdString = getApplicationId(req);
return serviceManager.getService(appIdString);
}
private String getApplicationId(HttpServletRequest req) {
// TODO the application id from the request
// (most likely from the URI)
private String parseApplicationId(String appId) {
// Make sure the appId is not null and is valid
ApplicationId appID;
try {
if (appId != null) {
return ConverterUtils.toApplicationId(appId.trim()).toString();
} else {
return null;
}
} catch (Exception e) {
return null;
}
}
private void init(HttpServletResponse response) {
response.setContentType(null);