YARN-3047. [Data Serving] Set up ATS reader with basic request serving structure and lifecycle (Varun Saxena via sjlee)

(cherry picked from commit 4c5f88fb0f04b7919738d07598b0f006a9ff91f2)
This commit is contained in:
Sangjin Lee 2015-07-08 17:10:10 -07:00
parent 2d59bc4458
commit e27642abf4
9 changed files with 457 additions and 2 deletions

View File

@ -49,8 +49,10 @@ function hadoop_usage
hadoop_add_subcommand "scmadmin" "SharedCacheManager admin tools" hadoop_add_subcommand "scmadmin" "SharedCacheManager admin tools"
hadoop_add_subcommand "sharedcachemanager" "run the SharedCacheManager daemon" hadoop_add_subcommand "sharedcachemanager" "run the SharedCacheManager daemon"
hadoop_add_subcommand "timelineserver" "run the timeline server" hadoop_add_subcommand "timelineserver" "run the timeline server"
hadoop_add_subcommand "timelinereader" "run the timeline reader server"
hadoop_add_subcommand "top" "view cluster information" hadoop_add_subcommand "top" "view cluster information"
hadoop_add_subcommand "version" "print the version" hadoop_add_subcommand "version" "print the version"
hadoop_generate_usage "${MYNAME}" true
hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true
} }
@ -182,6 +184,10 @@ function yarncmd_case
HADOOP_HEAPSIZE_MAX="${YARN_TIMELINESERVER_HEAPSIZE}" HADOOP_HEAPSIZE_MAX="${YARN_TIMELINESERVER_HEAPSIZE}"
fi fi
;; ;;
timelinereader)
supportdaemonization="true"
CLASS='org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderServer'
;;
version) version)
HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo
hadoop_debug "Append YARN_CLIENT_OPTS onto HADOOP_OPTS" hadoop_debug "Append YARN_CLIENT_OPTS onto HADOOP_OPTS"

View File

@ -151,7 +151,7 @@ if "%1" == "--loglevel" (
set yarncommands=resourcemanager nodemanager proxyserver rmadmin version jar ^ set yarncommands=resourcemanager nodemanager proxyserver rmadmin version jar ^
application applicationattempt container node queue logs daemonlog historyserver ^ application applicationattempt container node queue logs daemonlog historyserver ^
timelineserver classpath timelineserver timelinereader classpath
for %%i in ( %yarncommands% ) do ( for %%i in ( %yarncommands% ) do (
if %yarn-command% == %%i set yarncommand=true if %yarn-command% == %%i set yarncommand=true
) )
@ -242,6 +242,11 @@ goto :eof
) )
goto :eof goto :eof
:timelinereader
set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%\timelineserver-config\log4j.properties
set CLASS=org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderServer
goto :eof
:nodemanager :nodemanager
set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%\nm-config\log4j.properties set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%\nm-config\log4j.properties
set CLASS=org.apache.hadoop.yarn.server.nodemanager.NodeManager set CLASS=org.apache.hadoop.yarn.server.nodemanager.NodeManager
@ -312,6 +317,7 @@ goto :eof
@echo resourcemanager run the ResourceManager @echo resourcemanager run the ResourceManager
@echo nodemanager run a nodemanager on each slave @echo nodemanager run a nodemanager on each slave
@echo timelineserver run the timeline server @echo timelineserver run the timeline server
@echo timelinereader run the timeline reader server
@echo rmadmin admin tools @echo rmadmin admin tools
@echo version print the version @echo version print the version
@echo jar ^<jar^> run a jar file @echo jar ^<jar^> run a jar file

View File

@ -1964,6 +1964,9 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String TIMELINE_SERVICE_WRITER_CLASS = public static final String TIMELINE_SERVICE_WRITER_CLASS =
TIMELINE_SERVICE_PREFIX + "writer.class"; TIMELINE_SERVICE_PREFIX + "writer.class";
public static final String TIMELINE_SERVICE_READER_CLASS =
TIMELINE_SERVICE_PREFIX + "reader.class";
// mark app-history related configs @Private as application history is going // mark app-history related configs @Private as application history is going
// to be integrated into the timeline service // to be integrated into the timeline service
@Private @Private

View File

@ -275,6 +275,10 @@ public static String getNMWebAppURLWithoutScheme(Configuration conf) {
} }
public static String getAHSWebAppURLWithoutScheme(Configuration conf) { public static String getAHSWebAppURLWithoutScheme(Configuration conf) {
return getTimelineReaderWebAppURL(conf);
}
public static String getTimelineReaderWebAppURL(Configuration conf) {
if (YarnConfiguration.useHttps(conf)) { if (YarnConfiguration.useHttps(conf)) {
return conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, return conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS); YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
@ -283,7 +287,7 @@ public static String getAHSWebAppURLWithoutScheme(Configuration conf) {
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
} }
} }
/** /**
* if url has scheme then it will be returned as it is else it will return * if url has scheme then it will be returned as it is else it will return
* url with scheme. * url with scheme.

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
@Private
@Unstable
public class TimelineReaderManager extends AbstractService {
private TimelineReader reader;
public TimelineReaderManager(TimelineReader timelineReader) {
super(TimelineReaderManager.class.getName());
this.reader = timelineReader;
}
}

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader;
import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
/** Main class for Timeline Reader */
@Private
@Unstable
public class TimelineReaderServer extends CompositeService {
private static final Log LOG = LogFactory.getLog(TimelineReaderServer.class);
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final String TIMELINE_READER_MANAGER_ATTR =
"timeline.reader.manager";
private HttpServer2 readerWebServer;
private TimelineReaderManager timelineReaderManager;
public TimelineReaderServer() {
super(TimelineReaderServer.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
TimelineReader timelineReaderStore = createTimelineReaderStore(conf);
addService(timelineReaderStore);
timelineReaderManager = createTimelineReaderManager(timelineReaderStore);
addService(timelineReaderManager);
super.serviceInit(conf);
}
private TimelineReader createTimelineReaderStore(Configuration conf) {
TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
FileSystemTimelineReaderImpl.class, TimelineReader.class), conf);
readerStore.init(conf);
return readerStore;
}
private TimelineReaderManager createTimelineReaderManager(
TimelineReader timelineReaderStore) {
return new TimelineReaderManager(timelineReaderStore);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
startTimelineReaderWebApp();
}
@Override
protected void serviceStop() throws Exception {
if (readerWebServer != null) {
readerWebServer.stop();
}
super.serviceStop();
}
private void startTimelineReaderWebApp() {
Configuration conf = getConfig();
String bindAddress = WebAppUtils.getWebAppBindURL(conf,
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
WebAppUtils.getTimelineReaderWebAppURL(conf));
LOG.info("Instantiating TimelineReaderWebApp at " + bindAddress);
try {
HttpServer2.Builder builder = new HttpServer2.Builder()
.setName("timeline")
.setConf(conf)
.addEndpoint(URI.create("http://" + bindAddress));
readerWebServer = builder.build();
Map<String, String> options = new HashMap<>();
String username = conf.get(HADOOP_HTTP_STATIC_USER,
DEFAULT_HADOOP_HTTP_STATIC_USER);
options.put(HADOOP_HTTP_STATIC_USER, username);
HttpServer2.defineFilter(readerWebServer.getWebAppContext(),
"static_user_filter_timeline",
StaticUserWebFilter.StaticUserFilter.class.getName(),
options, new String[] {"/*"});
readerWebServer.addJerseyResourcePackage(
TimelineReaderWebServices.class.getPackage().getName() + ";"
+ GenericExceptionHandler.class.getPackage().getName() + ";"
+ YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
"/*");
readerWebServer.setAttribute(TIMELINE_READER_MANAGER_ATTR,
timelineReaderManager);
readerWebServer.start();
} catch (Exception e) {
String msg = "TimelineReaderWebApp failed to start.";
LOG.error(msg, e);
throw new YarnRuntimeException(msg, e);
}
}
@VisibleForTesting
int getWebServerPort() {
return readerWebServer.getConnectorAddress(0).getPort();
}
static TimelineReaderServer startTimelineReaderServer(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(
new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(TimelineReaderServer.class,
args, LOG);
TimelineReaderServer timelineReaderServer = null;
try {
timelineReaderServer = new TimelineReaderServer();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(timelineReaderServer),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration();
timelineReaderServer.init(conf);
timelineReaderServer.start();
} catch (Throwable t) {
LOG.fatal("Error starting TimelineReaderWebServer", t);
ExitUtil.terminate(-1, "Error starting TimelineReaderWebServer");
}
return timelineReaderServer;
}
public static void main(String[] args) {
startTimelineReaderServer(args);
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.inject.Singleton;
/** REST end point for Timeline Reader */
@Private
@Unstable
@Singleton
@Path("/ws/v2/timeline")
public class TimelineReaderWebServices {
private void init(HttpServletResponse response) {
response.setContentType(null);
}
/**
* Return the description of the timeline reader web services.
*/
@GET
@Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
public TimelineAbout about(
@Context HttpServletRequest req,
@Context HttpServletResponse res) {
init(res);
return TimelineUtils.createTimelineAbout("Timeline Reader API");
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderServer;
import org.junit.Test;
public class TestTimelineReaderServer {
@Test(timeout = 60000)
public void testStartStopServer() throws Exception {
TimelineReaderServer server = new TimelineReaderServer();
Configuration config = new YarnConfiguration();
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
"localhost:0");
try {
server.init(config);
assertEquals(STATE.INITED, server.getServiceState());
assertEquals(2, server.getServices().size());
server.start();
assertEquals(STATE.STARTED, server.getServiceState());
server.stop();
assertEquals(STATE.STOPPED, server.getServiceState());
} finally {
server.stop();
}
}
}

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
public class TestTimelineReaderWebServices {
private int serverPort;
private TimelineReaderServer server;
@Before
public void init() throws Exception {
try {
Configuration config = new YarnConfiguration();
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
"localhost:0");
server = new TimelineReaderServer();
server.init(config);
server.start();
serverPort = server.getWebServerPort();
} catch (Exception e) {
Assert.fail("Web server failed to start");
}
}
@After
public void stop() throws Exception {
if (server != null) {
server.stop();
server = null;
}
}
private static Client createClient() {
ClientConfig cfg = new DefaultClientConfig();
cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
return new Client(new URLConnectionClientHandler(
new DummyURLConnectionFactory()), cfg);
}
private static ClientResponse getResponse(Client client, URI uri) throws Exception {
ClientResponse resp =
client.resource(uri).accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
if (resp == null ||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
System.out.println(resp.getClientResponseStatus());
throw new IOException("Incorrect response from timeline reader.");
}
return resp;
}
private static class DummyURLConnectionFactory
implements HttpURLConnectionFactory {
@Override
public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
try {
return (HttpURLConnection)url.openConnection();
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
}
}
}
@Test
public void testAbout()
throws IOException {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/");
Client client = createClient();
try {
ClientResponse resp = getResponse(client, uri);
TimelineAbout about = resp.getEntity(TimelineAbout.class);
Assert.assertNotNull(about);
Assert.assertEquals("Timeline Reader API", about.getAbout());
} catch (Exception re) {
throw new IOException(
"Failed to get the response from timeline reader.", re);
} finally {
client.destroy();
}
}
}