YARN-6113. Re-direct NM Web Service to get container logs for finished applications. Contributed by Xuan Gong.
This commit is contained in:
parent
243c0f33ec
commit
464ff479ce
@ -1078,6 +1078,9 @@ public static boolean isAclEnabled(Configuration conf) {
|
||||
public static final String YARN_LOG_SERVER_URL =
|
||||
YARN_PREFIX + "log.server.url";
|
||||
|
||||
public static final String YARN_LOG_SERVER_WEBSERVICE_URL =
|
||||
YARN_PREFIX + "log.server.web-service.url";
|
||||
|
||||
public static final String YARN_TRACKING_URL_GENERATOR =
|
||||
YARN_PREFIX + "tracking.url.generator";
|
||||
|
||||
|
@ -26,6 +26,7 @@
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
@ -428,7 +429,8 @@ public static List<String> listSupportedLogContentType() {
|
||||
return Arrays.asList("text", "octet-stream");
|
||||
}
|
||||
|
||||
private static String getURLEncodedQueryString(HttpServletRequest request) {
|
||||
private static String getURLEncodedQueryString(HttpServletRequest request,
|
||||
String parameterToRemove) {
|
||||
String queryString = request.getQueryString();
|
||||
if (queryString != null && !queryString.isEmpty()) {
|
||||
String reqEncoding = request.getCharacterEncoding();
|
||||
@ -436,12 +438,33 @@ private static String getURLEncodedQueryString(HttpServletRequest request) {
|
||||
reqEncoding = "ISO-8859-1";
|
||||
}
|
||||
Charset encoding = Charset.forName(reqEncoding);
|
||||
List<NameValuePair> params = URLEncodedUtils.parse(queryString, encoding);
|
||||
List<NameValuePair> params = URLEncodedUtils.parse(queryString,
|
||||
encoding);
|
||||
if (parameterToRemove != null && !parameterToRemove.isEmpty()) {
|
||||
Iterator<NameValuePair> paramIterator = params.iterator();
|
||||
while(paramIterator.hasNext()) {
|
||||
NameValuePair current = paramIterator.next();
|
||||
if (current.getName().equals(parameterToRemove)) {
|
||||
paramIterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
return URLEncodedUtils.format(params, encoding);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a query string which removes the passed parameter.
|
||||
* @param httpRequest HttpServletRequest with the request details
|
||||
* @param parameterName the query parameters must be removed
|
||||
* @return the query parameter string
|
||||
*/
|
||||
public static String removeQueryParams(HttpServletRequest httpRequest,
|
||||
String parameterName) {
|
||||
return getURLEncodedQueryString(httpRequest, parameterName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a HTML escaped uri with the query parameters of the request.
|
||||
* @param request HttpServletRequest with the request details
|
||||
@ -449,7 +472,7 @@ private static String getURLEncodedQueryString(HttpServletRequest request) {
|
||||
*/
|
||||
public static String getHtmlEscapedURIWithQueryString(
|
||||
HttpServletRequest request) {
|
||||
String urlEncodedQueryString = getURLEncodedQueryString(request);
|
||||
String urlEncodedQueryString = getURLEncodedQueryString(request, null);
|
||||
if (urlEncodedQueryString != null) {
|
||||
return HtmlQuoting.quoteHtmlChars(
|
||||
request.getRequestURI() + "?" + urlEncodedQueryString);
|
||||
@ -466,7 +489,7 @@ public static String getHtmlEscapedURIWithQueryString(
|
||||
public static String appendQueryParams(HttpServletRequest request,
|
||||
String targetUri) {
|
||||
String ret = targetUri;
|
||||
String urlEncodedQueryString = getURLEncodedQueryString(request);
|
||||
String urlEncodedQueryString = getURLEncodedQueryString(request, null);
|
||||
if (urlEncodedQueryString != null) {
|
||||
ret += "?" + urlEncodedQueryString;
|
||||
}
|
||||
|
@ -2648,6 +2648,14 @@
|
||||
<value></value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
URL for log aggregation server web service
|
||||
</description>
|
||||
<name>yarn.log.server.web-service.url</name>
|
||||
<value></value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
RM Application Tracking URL
|
||||
|
@ -50,6 +50,7 @@
|
||||
import org.apache.hadoop.http.JettyUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
@ -87,6 +88,7 @@ public class NMWebServices {
|
||||
private WebApp webapp;
|
||||
private static RecordFactory recordFactory = RecordFactoryProvider
|
||||
.getRecordFactory(null);
|
||||
private final String redirectWSUrl;
|
||||
|
||||
private @javax.ws.rs.core.Context
|
||||
HttpServletRequest request;
|
||||
@ -103,6 +105,8 @@ public NMWebServices(final Context nm, final ResourceView view,
|
||||
this.nmContext = nm;
|
||||
this.rview = view;
|
||||
this.webapp = webapp;
|
||||
this.redirectWSUrl = this.nmContext.getConf().get(
|
||||
YarnConfiguration.YARN_LOG_SERVER_WEBSERVICE_URL);
|
||||
}
|
||||
|
||||
private void init() {
|
||||
@ -270,6 +274,9 @@ public Response getContainerLogsInfo(
|
||||
} catch (IOException ex) {
|
||||
// Something wrong with we tries to access the remote fs for the logs.
|
||||
// Skip it and do nothing
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(ex.getMessage());
|
||||
}
|
||||
}
|
||||
GenericEntity<List<ContainerLogsInfo>> meta = new GenericEntity<List<
|
||||
ContainerLogsInfo>>(containersLogsInfo){};
|
||||
@ -280,7 +287,13 @@ public Response getContainerLogsInfo(
|
||||
resp.header("X-Content-Type-Options", "nosniff");
|
||||
return resp.build();
|
||||
} catch (Exception ex) {
|
||||
throw new WebApplicationException(ex);
|
||||
if (redirectWSUrl == null || redirectWSUrl.isEmpty()) {
|
||||
throw new WebApplicationException(ex);
|
||||
}
|
||||
// redirect the request to the configured log server
|
||||
String redirectURI = "/containers/" + containerIdStr
|
||||
+ "/logs";
|
||||
return createRedirectResponse(hsr, redirectWSUrl, redirectURI);
|
||||
}
|
||||
}
|
||||
|
||||
@ -377,7 +390,14 @@ public Response getLogs(
|
||||
logFile = ContainerLogsUtils.getContainerLogFile(
|
||||
containerId, filename, request.getRemoteUser(), nmContext);
|
||||
} catch (NotFoundException ex) {
|
||||
return Response.status(Status.NOT_FOUND).entity(ex.getMessage()).build();
|
||||
if (redirectWSUrl == null || redirectWSUrl.isEmpty()) {
|
||||
return Response.status(Status.NOT_FOUND).entity(ex.getMessage())
|
||||
.build();
|
||||
}
|
||||
// redirect the request to the configured log server
|
||||
String redirectURI = "/containers/" + containerIdStr
|
||||
+ "/logs/" + filename;
|
||||
return createRedirectResponse(request, redirectWSUrl, redirectURI);
|
||||
} catch (YarnException ex) {
|
||||
return Response.serverError().entity(ex.getMessage()).build();
|
||||
}
|
||||
@ -464,4 +484,25 @@ private long parseLongParam(String bytes) {
|
||||
}
|
||||
return Long.parseLong(bytes);
|
||||
}
|
||||
|
||||
private Response createRedirectResponse(HttpServletRequest httpRequest,
|
||||
String redirectWSUrlPrefix, String uri) {
|
||||
// redirect the request to the configured log server
|
||||
StringBuilder redirectPath = new StringBuilder();
|
||||
if (redirectWSUrlPrefix.endsWith("/")) {
|
||||
redirectWSUrlPrefix = redirectWSUrlPrefix.substring(0,
|
||||
redirectWSUrlPrefix.length() - 1);
|
||||
}
|
||||
redirectPath.append(redirectWSUrlPrefix + uri);
|
||||
// append all the request query parameters except nodeId parameter
|
||||
String requestParams = WebAppUtils.removeQueryParams(httpRequest,
|
||||
YarnWebServiceParams.NM_ID);
|
||||
if (requestParams != null && !requestParams.isEmpty()) {
|
||||
redirectPath.append("?" + requestParams);
|
||||
}
|
||||
ResponseBuilder res = Response.status(
|
||||
HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
||||
res.header("Location", redirectPath.toString());
|
||||
return res.build();
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@ -27,7 +28,11 @@
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringReader;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
@ -59,6 +64,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
@ -97,6 +103,7 @@ public class TestNMWebServices extends JerseyTestBase {
|
||||
private static ApplicationACLsManager aclsManager;
|
||||
private static LocalDirsHandlerService dirsHandler;
|
||||
private static WebApp nmWebApp;
|
||||
private static final String LOGSERVICEWSADDR = "test:1234";
|
||||
|
||||
private static final File testRootDir = new File("target",
|
||||
TestNMWebServices.class.getSimpleName());
|
||||
@ -115,6 +122,8 @@ protected void configureServlets() {
|
||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
testRemoteLogDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.YARN_LOG_SERVER_WEBSERVICE_URL,
|
||||
LOGSERVICEWSADDR);
|
||||
dirsHandler = new LocalDirsHandlerService();
|
||||
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(
|
||||
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
|
||||
@ -351,6 +360,58 @@ public void testContainerLogsWithOldAPI() throws IOException, JSONException{
|
||||
testContainerLogs(r, containerId);
|
||||
}
|
||||
|
||||
@Test (timeout = 10000)
|
||||
public void testNMRedirect() {
|
||||
ApplicationId noExistAppId = ApplicationId.newInstance(
|
||||
System.currentTimeMillis(), 2000);
|
||||
ApplicationAttemptId noExistAttemptId = ApplicationAttemptId.newInstance(
|
||||
noExistAppId, 150);
|
||||
ContainerId noExistContainerId = ContainerId.newContainerId(
|
||||
noExistAttemptId, 250);
|
||||
String fileName = "syslog";
|
||||
WebResource r = resource();
|
||||
|
||||
// check the old api
|
||||
URI requestURI = r.path("ws").path("v1").path("node")
|
||||
.path("containerlogs").path(noExistContainerId.toString())
|
||||
.path(fileName).queryParam("user.name", "user")
|
||||
.queryParam(YarnWebServiceParams.NM_ID, "localhost:1111")
|
||||
.getURI();
|
||||
String redirectURL = getRedirectURL(requestURI.toString());
|
||||
assertTrue(redirectURL != null);
|
||||
assertTrue(redirectURL.contains(LOGSERVICEWSADDR));
|
||||
assertTrue(redirectURL.contains(noExistContainerId.toString()));
|
||||
assertTrue(redirectURL.contains("/logs/" + fileName));
|
||||
assertTrue(redirectURL.contains("user.name=" + "user"));
|
||||
assertFalse(redirectURL.contains(YarnWebServiceParams.NM_ID));
|
||||
|
||||
// check the new api
|
||||
requestURI = r.path("ws").path("v1").path("node")
|
||||
.path("containers").path(noExistContainerId.toString())
|
||||
.path("logs").path(fileName).queryParam("user.name", "user")
|
||||
.queryParam(YarnWebServiceParams.NM_ID, "localhost:1111")
|
||||
.getURI();
|
||||
redirectURL = getRedirectURL(requestURI.toString());
|
||||
assertTrue(redirectURL != null);
|
||||
assertTrue(redirectURL.contains(LOGSERVICEWSADDR));
|
||||
assertTrue(redirectURL.contains(noExistContainerId.toString()));
|
||||
assertTrue(redirectURL.contains("/logs/" + fileName));
|
||||
assertTrue(redirectURL.contains("user.name=" + "user"));
|
||||
assertFalse(redirectURL.contains(YarnWebServiceParams.NM_ID));
|
||||
|
||||
requestURI = r.path("ws").path("v1").path("node")
|
||||
.path("containers").path(noExistContainerId.toString())
|
||||
.path("logs").queryParam("user.name", "user")
|
||||
.queryParam(YarnWebServiceParams.NM_ID, "localhost:1111")
|
||||
.getURI();
|
||||
redirectURL = getRedirectURL(requestURI.toString());
|
||||
assertTrue(redirectURL != null);
|
||||
assertTrue(redirectURL.contains(LOGSERVICEWSADDR));
|
||||
assertTrue(redirectURL.contains(noExistContainerId.toString()));
|
||||
assertTrue(redirectURL.contains("user.name=" + "user"));
|
||||
assertFalse(redirectURL.contains(YarnWebServiceParams.NM_ID));
|
||||
}
|
||||
|
||||
private void testContainerLogs(WebResource r, ContainerId containerId)
|
||||
throws IOException {
|
||||
final String containerIdStr = containerId.toString();
|
||||
@ -451,13 +512,12 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
|
||||
+ WebAppUtils.listSupportedLogContentType(), responseText);
|
||||
assertEquals(400, response.getStatus());
|
||||
|
||||
// ask for file that doesn't exist
|
||||
response = r.path("uhhh")
|
||||
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
|
||||
assertEquals(Status.NOT_FOUND.getStatusCode(),
|
||||
response.getStatus());
|
||||
responseText = response.getEntity(String.class);
|
||||
assertTrue(responseText.contains("Cannot find this log on the local disk."));
|
||||
// ask for file that doesn't exist and it will re-direct to
|
||||
// the log server
|
||||
URI requestURI = r.path("uhhh").getURI();
|
||||
String redirectURL = getRedirectURL(requestURI.toString());
|
||||
assertTrue(redirectURL != null);
|
||||
assertTrue(redirectURL.contains(LOGSERVICEWSADDR));
|
||||
|
||||
// Get container log files' name
|
||||
WebResource r1 = resource();
|
||||
@ -630,4 +690,21 @@ private String getLogContext(String fullMessage) {
|
||||
int postfixIndex = fullMessage.indexOf(postfix);
|
||||
return fullMessage.substring(prefixIndex, postfixIndex);
|
||||
}
|
||||
|
||||
private static String getRedirectURL(String url) {
|
||||
String redirectUrl = null;
|
||||
try {
|
||||
HttpURLConnection conn = (HttpURLConnection) new URL(url)
|
||||
.openConnection();
|
||||
// do not automatically follow the redirection
|
||||
// otherwise we get too many redirections exception
|
||||
conn.setInstanceFollowRedirects(false);
|
||||
if(conn.getResponseCode() == HttpServletResponse.SC_TEMPORARY_REDIRECT) {
|
||||
redirectUrl = conn.getHeaderField("Location");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// throw new RuntimeException(e);
|
||||
}
|
||||
return redirectUrl;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user