YARN-4625. Make ApplicationSubmissionContext and ApplicationSubmissionContextInfo more consistent. Contributed by Xuan Gong.

This commit is contained in:
Varun Vasudev 2016-02-03 16:26:28 +05:30
parent eb2fb943fd
commit 1adb64e09b
7 changed files with 386 additions and 7 deletions

View File

@ -118,6 +118,10 @@ Release 2.9.0 - UNRELEASED
YARN-4649. Add additional logging to some NM state store operations.
(Sidharta Seethana via vvasudev)
YARN-4625. Make ApplicationSubmissionContext and
ApplicationSubmissionContextInfo more consistent.
(Xuan Gong via vvasudev)
OPTIMIZATIONS
BUG FIXES

View File

@ -25,7 +25,6 @@
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@ -86,6 +85,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -93,6 +93,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
@ -124,6 +125,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AMBlackListingRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@ -142,6 +144,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LogAggregationContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
@ -1410,6 +1413,7 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
ApplicationSubmissionContext appContext =
createAppSubmissionContext(newApp);
final SubmitApplicationRequest req =
SubmitApplicationRequest.newInstance(appContext);
@ -1495,7 +1499,22 @@ protected ApplicationSubmissionContext createAppSubmissionContext(
newApp.getAppNodeLabelExpression(),
newApp.getAMContainerNodeLabelExpression());
appContext.setApplicationTags(newApp.getApplicationTags());
appContext.setAttemptFailuresValidityInterval(
newApp.getAttemptFailuresValidityInterval());
if (newApp.getLogAggregationContextInfo() != null) {
appContext.setLogAggregationContext(createLogAggregationContext(
newApp.getLogAggregationContextInfo()));
}
String reservationIdStr = newApp.getReservationId();
if (reservationIdStr != null && !reservationIdStr.isEmpty()) {
ReservationId reservationId = ReservationId.parseReservationId(
reservationIdStr);
appContext.setReservationID(reservationId);
}
if (newApp.getAMBlackListingRequestInfo() != null) {
appContext.setAMBlackListRequest(createAMBlackListingRequest(
newApp.getAMBlackListingRequestInfo()));
}
return appContext;
}
@ -1633,6 +1652,24 @@ private UserGroupInformation createKerberosUserGroupInformation(
return callerUGI;
}
private LogAggregationContext createLogAggregationContext(
LogAggregationContextInfo logAggregationContextInfo) {
return LogAggregationContext.newInstance(
logAggregationContextInfo.getIncludePattern(),
logAggregationContextInfo.getExcludePattern(),
logAggregationContextInfo.getRolledLogsIncludePattern(),
logAggregationContextInfo.getRolledLogsExcludePattern(),
logAggregationContextInfo.getLogAggregationPolicyClassName(),
logAggregationContextInfo.getLogAggregationPolicyParameters());
}
private AMBlackListingRequest createAMBlackListingRequest(
AMBlackListingRequestInfo amBlackListingRequestInfo) {
return AMBlackListingRequest.newInstance(
amBlackListingRequestInfo.getAMBlackListingEnabled(),
amBlackListingRequestInfo.getBlackListingDisableFailureThreshold());
}
@POST
@Path("/delegation-token")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Simple class to allow users to send information required to create a
* AMBlackListingRequest which can then be used as part of the
* ApplicationSubmissionContext
*
*/
@XmlRootElement(name = "am-black-listing-requests")
@XmlAccessorType(XmlAccessType.FIELD)
public class AMBlackListingRequestInfo {
@XmlElement(name = "am-black-listing-enabled")
boolean isAMBlackListingEnabled;
@XmlElement(name = "disable-failure-threshold")
float disableFailureThreshold;
public AMBlackListingRequestInfo() {
}
public boolean getAMBlackListingEnabled() {
return isAMBlackListingEnabled;
}
public void setAMBlackListingEnabled(boolean isAMBlackListingEnabled) {
this.isAMBlackListingEnabled = isAMBlackListingEnabled;
}
public float getBlackListingDisableFailureThreshold() {
return disableFailureThreshold;
}
public void setBlackListingDisableFailureThreshold(
float disableFailureThreshold) {
this.disableFailureThreshold = disableFailureThreshold;
}
}

View File

@ -78,6 +78,18 @@ public class ApplicationSubmissionContextInfo {
@XmlElement(name = "am-container-node-label-expression")
String amContainerNodeLabelExpression;
@XmlElement(name = "log-aggregation-context")
LogAggregationContextInfo logAggregationContextInfo;
@XmlElement(name = "attempt-failures-validity-interval")
long attemptFailuresValidityInterval;
@XmlElement(name = "reservation-id")
String reservationId;
@XmlElement(name = "am-black-listing-requests")
AMBlackListingRequestInfo amBlackListingRequestInfo;
public ApplicationSubmissionContextInfo() {
applicationId = "";
applicationName = "";
@ -91,6 +103,10 @@ public ApplicationSubmissionContextInfo() {
tags = new HashSet<String>();
appNodeLabelExpression = "";
amContainerNodeLabelExpression = "";
logAggregationContextInfo = null;
attemptFailuresValidityInterval = -1;
reservationId = "";
amBlackListingRequestInfo = null;
}
public String getApplicationId() {
@ -149,6 +165,22 @@ public String getAMContainerNodeLabelExpression() {
return amContainerNodeLabelExpression;
}
public LogAggregationContextInfo getLogAggregationContextInfo() {
return logAggregationContextInfo;
}
public long getAttemptFailuresValidityInterval() {
return attemptFailuresValidityInterval;
}
public AMBlackListingRequestInfo getAMBlackListingRequestInfo() {
return amBlackListingRequestInfo;
}
public String getReservationId() {
return reservationId;
}
public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}
@ -206,4 +238,23 @@ public void setAppNodeLabelExpression(String appNodeLabelExpression) {
public void setAMContainerNodeLabelExpression(String nodeLabelExpression) {
this.amContainerNodeLabelExpression = nodeLabelExpression;
}
public void setLogAggregationContextInfo(
LogAggregationContextInfo logAggregationContextInfo) {
this.logAggregationContextInfo = logAggregationContextInfo;
}
public void setAttemptFailuresValidityInterval(
long attemptFailuresValidityInterval) {
this.attemptFailuresValidityInterval = attemptFailuresValidityInterval;
}
public void setReservationId(String reservationId) {
this.reservationId = reservationId;
}
public void setAMBlackListingRequestInfo(
AMBlackListingRequestInfo amBlackListingRequestInfo) {
this.amBlackListingRequestInfo = amBlackListingRequestInfo;
}
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Simple class to allow users to send information required to create a
* ContainerLaunchContext which can then be used as part of the
* ApplicationSubmissionContext
*
*/
@XmlRootElement(name = "log-aggregation-context")
@XmlAccessorType(XmlAccessType.FIELD)
public class LogAggregationContextInfo {
@XmlElement(name = "log-include-pattern")
String logIncludePattern;
@XmlElement(name = "log-exclude-pattern")
String logExcludePattern;
@XmlElement(name = "rolled-log-include-pattern")
String rolledLogsIncludePattern;
@XmlElement(name = "rolled-log-exclude-pattern")
String rolledLogsExcludePattern;
@XmlElement(name = "log-aggregation-policy-class-name")
String policyClassName;
@XmlElement(name = "log-aggregation-policy-parameters")
String policyParameters;
public LogAggregationContextInfo() {
}
public String getIncludePattern() {
return this.logIncludePattern;
}
public void setIncludePattern(String includePattern) {
this.logIncludePattern = includePattern;
}
public String getExcludePattern() {
return this.logExcludePattern;
}
public void setExcludePattern(String excludePattern) {
this.logExcludePattern = excludePattern;
}
public String getRolledLogsIncludePattern() {
return this.rolledLogsIncludePattern;
}
public void setRolledLogsIncludePattern(
String rolledLogsIncludePattern) {
this.rolledLogsIncludePattern = rolledLogsIncludePattern;
}
public String getRolledLogsExcludePattern() {
return this.rolledLogsExcludePattern;
}
public void setRolledLogsExcludePattern(
String rolledLogsExcludePattern) {
this.rolledLogsExcludePattern = rolledLogsExcludePattern;
}
public String getLogAggregationPolicyClassName() {
return this.policyClassName;
}
public void setLogAggregationPolicyClassName(
String className) {
this.policyClassName = className;
}
public String getLogAggregationPolicyParameters() {
return this.policyParameters;
}
public void setLogAggregationPolicyParameters(
String parameters) {
this.policyParameters = parameters;
}
}

View File

@ -44,18 +44,23 @@
import javax.xml.parsers.ParserConfigurationException;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -67,10 +72,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
@ -792,6 +793,47 @@ public void testAppSubmit(String acceptMedia, String contentMedia)
appInfo.getResource().setvCores(1);
appInfo.setApplicationTags(tags);
// Set LogAggregationContextInfo
String includePattern = "file1";
String excludePattern = "file2";
String rolledLogsIncludePattern = "file3";
String rolledLogsExcludePattern = "file4";
String className = "policy_class";
String parameters = "policy_parameter";
LogAggregationContextInfo logAggregationContextInfo
= new LogAggregationContextInfo();
logAggregationContextInfo.setIncludePattern(includePattern);
logAggregationContextInfo.setExcludePattern(excludePattern);
logAggregationContextInfo.setRolledLogsIncludePattern(
rolledLogsIncludePattern);
logAggregationContextInfo.setRolledLogsExcludePattern(
rolledLogsExcludePattern);
logAggregationContextInfo.setLogAggregationPolicyClassName(className);
logAggregationContextInfo.setLogAggregationPolicyParameters(parameters);
appInfo.setLogAggregationContextInfo(logAggregationContextInfo);
// Set attemptFailuresValidityInterval
long attemptFailuresValidityInterval = 5000;
appInfo.setAttemptFailuresValidityInterval(
attemptFailuresValidityInterval);
// Set ReservationId
String reservationId = ReservationId.newInstance(
System.currentTimeMillis(), 1).toString();
appInfo.setReservationId(reservationId);
// Set AMBlackListingRequestInfo
boolean isAMBlackListingEnabled = true;
float disableFailureThreshold = 0.01f;
AMBlackListingRequestInfo amBlackListingRequestInfo
= new AMBlackListingRequestInfo();
amBlackListingRequestInfo.setAMBlackListingEnabled(
isAMBlackListingEnabled);
amBlackListingRequestInfo.setBlackListingDisableFailureThreshold(
disableFailureThreshold);
appInfo.setAMBlackListingRequestInfo(amBlackListingRequestInfo);
ClientResponse response =
this.constructWebResource(urlPath).accept(acceptMedia)
.entity(appInfo, contentMedia).post(ClientResponse.class);
@ -849,6 +891,30 @@ public void testAppSubmit(String acceptMedia, String contentMedia)
.getAllSecretKeys().contains(key));
assertEquals("mysecret", new String(cs.getSecretKey(key), "UTF-8"));
// Check LogAggregationContext
ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
LogAggregationContext lac = asc.getLogAggregationContext();
assertEquals(includePattern, lac.getIncludePattern());
assertEquals(excludePattern, lac.getExcludePattern());
assertEquals(rolledLogsIncludePattern, lac.getRolledLogsIncludePattern());
assertEquals(rolledLogsExcludePattern, lac.getRolledLogsExcludePattern());
assertEquals(className, lac.getLogAggregationPolicyClassName());
assertEquals(parameters, lac.getLogAggregationPolicyParameters());
// Check attemptFailuresValidityInterval
assertEquals(attemptFailuresValidityInterval,
asc.getAttemptFailuresValidityInterval());
// Check ReservationId
assertEquals(reservationId, app.getReservationId().toString());
// Check AMBlackListingRequestInfo
AMBlackListingRequest amBlackListingRequest = asc.getAMBlackListRequest();
assertEquals(isAMBlackListingEnabled,
amBlackListingRequest.isAMBlackListingEnabled());
assertTrue(disableFailureThreshold == amBlackListingRequest
.getBlackListingDisableFailureThreshold());
response =
this.constructWebResource("apps", appId).accept(acceptMedia)
.get(ClientResponse.class);

View File

@ -2236,6 +2236,10 @@ Please note that this feature is currently in the alpha stage and may change in
| application-type | string | The application type(MapReduce, Pig, Hive, etc) |
| keep-containers-across-application-attempts | boolean | Should YARN keep the containers used by this application instead of destroying them |
| application-tags | object | List of application tags, please see the request examples on how to speciy the tags |
| log-aggregation-context| object | Represents all of the information needed by the NodeManager to handle the logs for this application |
| attempt-failures-validity-interval| long | The failure number will no take attempt failures which happen out of the validityInterval into failure count|
| reservation-id| string | Represent the unique id of the corresponding reserved resource allocation in the scheduler |
| am-black-listing-requests| object | Contains blacklisting information such as "enable/disable AM blacklisting" and "disable failure threshold" |
Elements of the *am-container-spec* object
@ -2278,6 +2282,24 @@ Elements of the POST request body *resource* object
| memory | int | Memory required for each container |
| vCores | int | Virtual cores required for each container |
Elements of the POST request body *log-aggregation-context* object
| Item | Data Type | Description |
|:---- |:---- |:---- |
| log-include-pattern | string | The log files which match the defined include pattern will be uploaded when the applicaiton finishes |
| log-exclude-pattern | string | The log files which match the defined exclude pattern will not be uploaded when the applicaiton finishes |
| rolled-log-include-pattern | string | The log files which match the defined include pattern will be aggregated in a rolling fashion |
| rolled-log-exclude-pattern | string | The log files which match the defined exclude pattern will not be aggregated in a rolling fashion |
| log-aggregation-policy-class-name | string | The policy which will be used by NodeManager to aggregate the logs |
| log-aggregation-policy-parameters | string | The parameters passed to the policy class |
Elements of the POST request body *am-black-listing-requests* object
| Item | Data Type | Description |
|:---- |:---- |:---- |
| am-black-listing-enabled | boolean | Whether AM Blacklisting is enabled |
| disable-failure-threshold | float | AM Blacklisting disable failure threshold |
**JSON response**
HTTP Request:
@ -2343,7 +2365,23 @@ HTTP Request:
"vCores":1
},
"application-type":"YARN",
"keep-containers-across-application-attempts":false
"keep-containers-across-application-attempts":false,
"log-aggregation-context":
{
"log-include-pattern":"file1",
"log-exclude-pattern":"file2",
"rolled-log-include-pattern":"file3",
"rolled-log-exclude-pattern":"file4",
"log-aggregation-policy-class-name":"org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy",
"log-aggregation-policy-parameters":""
},
"attempt-failures-validity-interval":3600000,
"reservation-id":"reservation_1454114874_1",
"am-black-listing-requests":
{
"am-black-listing-enabled":true,
"disable-failure-threshold":0.01
}
}
```
@ -2445,6 +2483,20 @@ Content-Type: application/xml
<tag>tag 2</tag>
<tag>tag1</tag>
</application-tags>
<log-aggregation-context>
<log-include-pattern>file1</log-include-pattern>
<log-exclude-pattern>file2</log-exclude-pattern>
<rolled-log-include-pattern>file3</rolled-log-include-pattern>
<rolled-log-exclude-pattern>file4</rolled-log-exclude-pattern>
<log-aggregation-policy-class-name>org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy</log-aggregation-policy-class-name>
<log-aggregation-policy-parameters></log-aggregation-policy-parameters>
</log-aggregation-context>
<attempt-failures-validity-interval>3600000</attempt-failures-validity-interval>
<reservation-id>reservation_1454114874_1</reservation-id>
<am-black-listing-requests>
<am-black-listing-enabled>true</am-black-listing-enabled>
<disable-failure-threshold>0.01</disable-failure-threshold>
</am-black-listing-requests>
</application-submission-context>
```