From e74e117ad3e0b6c0572913f602a28934f87bba70 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Mon, 10 Feb 2014 21:31:34 +0000 Subject: [PATCH] YARN-1637. Implemented a client library for Java users to post timeline entities and events. Contributed by Zhijie Shen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1566752 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop-yarn/hadoop-yarn-client/pom.xml | 4 + .../yarn/client/api/TimelineClient.java | 70 +++++++++ .../client/api/impl/TimelineClientImpl.java | 106 ++++++++++++++ .../client/api/impl/TestTimelineClient.java | 137 ++++++++++++++++++ 5 files changed, 320 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3039c6f9a0..59afe84998 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -119,6 +119,9 @@ Release 2.4.0 - UNRELEASED YARN-1635. Implemented a Leveldb based ApplicationTimelineStore. (Billie Rinaldi via zjshen) + YARN-1637. Implemented a client library for Java users to post timeline + entities and events. (zjshen) + IMPROVEMENTS YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml index 54da659fee..6091686a03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -79,6 +79,10 @@ org.mortbay.jetty jetty-util + + com.sun.jersey + jersey-client + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java new file mode 100644 index 0000000000..8be00ac6ff --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors; +import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * A client library that can be used to post some information in terms of a + * number of conceptual entities. + * + * @See ATSEntity + */ +@Public +@Unstable +public abstract class TimelineClient extends AbstractService { + + @Public + public static TimelineClient createTimelineClient() { + TimelineClient client = new TimelineClientImpl(); + return client; + } + + @Private + protected TimelineClient(String name) { + super(name); + } + + /** + *

+ * Post the information of a number of conceptual entities of an application + * to the timeline server. It is a blocking API. The method will not return + * until it gets the response from the timeline server. + *

+ * + * @param entities + * the collection of {@link ATSEntity} + * @return the error information if the post entities are not correctly stored + * @throws IOException + * @throws YarnException + */ + @Public + public abstract ATSPutErrors postEntities( + ATSEntity... entities) throws IOException, YarnException; + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java new file mode 100644 index 0000000000..9fcc2bd6e3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; + +import javax.ws.rs.core.MediaType; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; + +@Private +@Unstable +public class TimelineClientImpl extends TimelineClient { + + private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class); + private static final String RESOURCE_URI_STR = "/ws/v1/apptimeline/"; + private static final Joiner JOINER = Joiner.on(""); + + private Client client; + private URI resURI; + + public TimelineClientImpl() { + super(TimelineClientImpl.class.getName()); + ClientConfig cc = new DefaultClientConfig(); + cc.getClasses().add(YarnJacksonJaxbJsonProvider.class); + client = Client.create(cc); + } + + protected void serviceInit(Configuration conf) throws Exception { + resURI = new URI(JOINER.join(HttpConfig.getSchemePrefix(), + HttpConfig.isSecure() ? conf.get( + YarnConfiguration.AHS_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_AHS_WEBAPP_HTTPS_ADDRESS) : conf.get( + YarnConfiguration.AHS_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_AHS_WEBAPP_ADDRESS), RESOURCE_URI_STR)); + super.serviceInit(conf); + } + + @Override + public ATSPutErrors postEntities( + ATSEntity... entities) throws IOException, YarnException { + ATSEntities entitiesContainer = new ATSEntities(); + entitiesContainer.addEntities(Arrays.asList(entities)); + ClientResponse resp = doPostingEntities(entitiesContainer); + if (resp.getClientResponseStatus() != ClientResponse.Status.OK) { + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg); + if (LOG.isDebugEnabled()) { + String output = resp.getEntity(String.class); + LOG.debug("HTTP error code: " + resp.getStatus() + + " Server response : \n" + output); + } + throw new YarnException(msg); + } + return resp.getEntity(ATSPutErrors.class); + } + + @Private + @VisibleForTesting + public ClientResponse doPostingEntities(ATSEntities entities) { + WebResource webResource = client.resource(resURI); + return webResource.accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON) + .post(ClientResponse.class, entities); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java new file mode 100644 index 0000000000..a3917a2da5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.sun.jersey.api.client.ClientResponse; + +public class TestTimelineClient { + + private TimelineClientImpl client; + + @Before + public void setup() { + client = spy((TimelineClientImpl) TimelineClient.createTimelineClient()); + client.init(new YarnConfiguration()); + client.start(); + } + + @After + public void tearDown() { + client.stop(); + } + + @Test + public void testPostEntities() throws Exception { + mockClientResponse(ClientResponse.Status.OK, false); + try { + ATSPutErrors errors = client.postEntities(generateATSEntity()); + Assert.assertEquals(0, errors.getErrors().size()); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + } + + @Test + public void testPostEntitiesWithError() throws Exception { + mockClientResponse(ClientResponse.Status.OK, true); + try { + ATSPutErrors errors = client.postEntities(generateATSEntity()); + Assert.assertEquals(1, errors.getErrors().size()); + Assert.assertEquals("test entity id", errors.getErrors().get(0) + .getEntityId()); + Assert.assertEquals("test entity type", errors.getErrors().get(0) + .getEntityType()); + Assert.assertEquals(ATSPutErrors.ATSPutError.IO_EXCEPTION, + errors.getErrors().get(0).getErrorCode()); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + } + + @Test + public void testPostEntitiesNoResponse() throws Exception { + mockClientResponse(ClientResponse.Status.INTERNAL_SERVER_ERROR, false); + try { + client.postEntities(generateATSEntity()); + Assert.fail("Exception is expected"); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains( + "Failed to get the response from the timeline server.")); + } + } + + private ClientResponse mockClientResponse(ClientResponse.Status status, + boolean hasError) { + ClientResponse response = mock(ClientResponse.class); + doReturn(response).when(client) + .doPostingEntities(any(ATSEntities.class)); + when(response.getClientResponseStatus()).thenReturn(status); + ATSPutErrors.ATSPutError error = new ATSPutErrors.ATSPutError(); + error.setEntityId("test entity id"); + error.setEntityType("test entity type"); + error.setErrorCode(ATSPutErrors.ATSPutError.IO_EXCEPTION); + ATSPutErrors errors = new ATSPutErrors(); + if (hasError) { + errors.addError(error); + } + when(response.getEntity(ATSPutErrors.class)).thenReturn(errors); + return response; + } + + private static ATSEntity generateATSEntity() { + ATSEntity entity = new ATSEntity(); + entity.setEntityId("entity id"); + entity.setEntityType("entity type"); + entity.setStartTime(System.currentTimeMillis()); + for (int i = 0; i < 2; ++i) { + ATSEvent event = new ATSEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType("test event type " + i); + event.addEventInfo("key1", "val1"); + event.addEventInfo("key2", "val2"); + entity.addEvent(event); + } + entity.addRelatedEntity("test ref type 1", "test ref id 1"); + entity.addRelatedEntity("test ref type 2", "test ref id 2"); + entity.addPrimaryFilter("pkey1", "pval1"); + entity.addPrimaryFilter("pkey2", "pval2"); + entity.addOtherInfo("okey1", "oval1"); + entity.addOtherInfo("okey2", "oval2"); + return entity; + } + +}