YARN-563. Add the concept of an application-type for each application. Contributed by Mayank Bansal.
MAPREDUCE-5246. Specify application-type at the time of job submission after YARN-563. Contributed by Mayank Bansal. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1485790 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d1ff045c41
commit
43876770d9
@ -254,6 +254,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||
MAPREDUCE-5235. Bring back old fields and exceptions in Counters for
|
||||
binary compatibility with mapred in 1.x. (Mayank Bansal via vinodkv)
|
||||
|
||||
MAPREDUCE-5246. Specify application-type at the time of job submission after
|
||||
YARN-563. (Mayank Bansal via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
|
||||
|
@ -684,4 +684,6 @@ public interface MRJobConfig {
|
||||
|
||||
public static final int DEFAULT_MR_AM_MAX_ATTEMPTS = 2;
|
||||
|
||||
public static final String MR_APPLICATION_TYPE = "MAPREDUCE";
|
||||
|
||||
}
|
||||
|
@ -66,6 +66,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
@ -88,8 +89,9 @@ private ApplicationReport getUnknownApplicationReport() {
|
||||
// Setting AppState to NEW and finalStatus to UNDEFINED as they are never
|
||||
// used for a non running job
|
||||
return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId,
|
||||
"N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
|
||||
"N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f);
|
||||
"N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
|
||||
"N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
|
||||
YarnConfiguration.DEFAULT_APPLICATION_TYPE);
|
||||
}
|
||||
|
||||
NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
|
||||
|
@ -497,6 +497,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
|
||||
conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
|
||||
MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
|
||||
appContext.setResource(capability);
|
||||
appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
|
||||
return appContext;
|
||||
}
|
||||
|
||||
|
@ -429,9 +429,9 @@ private ApplicationReport getFinishedApplicationReport() {
|
||||
ApplicationAttemptId attemptId = BuilderUtils.newApplicationAttemptId(
|
||||
appId, 0);
|
||||
return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue",
|
||||
"appname", "host", 124, null, YarnApplicationState.FINISHED,
|
||||
"diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
|
||||
"N/A", 0.0f);
|
||||
"appname", "host", 124, null, YarnApplicationState.FINISHED,
|
||||
"diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
|
||||
"N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE);
|
||||
}
|
||||
|
||||
private ApplicationReport getRunningApplicationReport(String host, int port) {
|
||||
@ -439,9 +439,9 @@ private ApplicationReport getRunningApplicationReport(String host, int port) {
|
||||
ApplicationAttemptId attemptId = BuilderUtils.newApplicationAttemptId(
|
||||
appId, 0);
|
||||
return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue",
|
||||
"appname", host, port, null, YarnApplicationState.RUNNING,
|
||||
"diagnostics", "url", 0, 0, FinalApplicationStatus.UNDEFINED, null,
|
||||
"N/A", 0.0f);
|
||||
"appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics",
|
||||
"url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
|
||||
YarnConfiguration.DEFAULT_APPLICATION_TYPE);
|
||||
}
|
||||
|
||||
private ResourceMgrDelegate getRMDelegate() throws IOException {
|
||||
|
@ -135,6 +135,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||
YARN-45. Add protocol for schedulers to request containers back from
|
||||
ApplicationMasters. (Carlo Curino, cdouglas)
|
||||
|
||||
YARN-563. Add the concept of an application-type for each application.
|
||||
(Mayank Bansal via vinodkv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-365. Change NM heartbeat handling to not generate a scheduler event
|
||||
|
@ -265,4 +265,16 @@ public interface ApplicationReport {
|
||||
@Private
|
||||
@Unstable
|
||||
void setProgress(float progress);
|
||||
|
||||
/**
|
||||
* Get the application's Type
|
||||
* @return application's Type
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
String getApplicationType();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setApplicationType(String applicationType);
|
||||
}
|
||||
|
@ -199,4 +199,23 @@ public interface ApplicationSubmissionContext {
|
||||
@Public
|
||||
@Stable
|
||||
public void setResource(Resource resource);
|
||||
|
||||
/**
|
||||
* Get the <em>applicationType</em> is the application type
|
||||
*
|
||||
* @return <em>applicationType</em> is the application type
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public String getApplicationType();
|
||||
|
||||
/**
|
||||
* Set the <em>applicationType</em> is the application type
|
||||
*
|
||||
* @param applicationType
|
||||
* <em>applicationType</em> is the application type
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public void setApplicationType(String applicationType);
|
||||
}
|
@ -219,6 +219,15 @@ public float getProgress() {
|
||||
return p.getProgress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getApplicationType() {
|
||||
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasApplicationType()) {
|
||||
return null;
|
||||
}
|
||||
return p.getApplicationType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationId(ApplicationId applicationId) {
|
||||
maybeInitBuilder();
|
||||
@ -318,6 +327,16 @@ public void setUser(String user) {
|
||||
}
|
||||
builder.setUser((user));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationType(String applicationType) {
|
||||
maybeInitBuilder();
|
||||
if (applicationType == null) {
|
||||
builder.clearApplicationType();
|
||||
return;
|
||||
}
|
||||
builder.setApplicationType((applicationType));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDiagnostics(String diagnostics) {
|
||||
|
@ -164,6 +164,15 @@ public String getQueue() {
|
||||
return (p.getQueue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getApplicationType() {
|
||||
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasApplicationType()) {
|
||||
return null;
|
||||
}
|
||||
return (p.getApplicationType());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setQueue(String queue) {
|
||||
maybeInitBuilder();
|
||||
@ -173,6 +182,16 @@ public void setQueue(String queue) {
|
||||
}
|
||||
builder.setQueue((queue));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationType(String applicationType) {
|
||||
maybeInitBuilder();
|
||||
if (applicationType == null) {
|
||||
builder.clearApplicationType();
|
||||
return;
|
||||
}
|
||||
builder.setApplicationType((applicationType));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerLaunchContext getAMContainerSpec() {
|
||||
|
@ -253,6 +253,12 @@ public class YarnConfiguration extends Configuration {
|
||||
/** Default application name */
|
||||
public static final String DEFAULT_APPLICATION_NAME = "N/A";
|
||||
|
||||
/** Default application type */
|
||||
public static final String DEFAULT_APPLICATION_TYPE = "YARN";
|
||||
|
||||
/** Default application type length */
|
||||
public static final int APPLICATION_TYPE_LENGTH = 20;
|
||||
|
||||
/** Default queue name */
|
||||
public static final String DEFAULT_QUEUE_NAME = "default";
|
||||
|
||||
|
@ -162,6 +162,7 @@ message ApplicationReportProto {
|
||||
optional string originalTrackingUrl = 15;
|
||||
optional ApplicationAttemptIdProto currentApplicationAttemptId = 16;
|
||||
optional float progress = 17;
|
||||
optional string applicationType = 18;
|
||||
}
|
||||
|
||||
enum NodeStateProto {
|
||||
@ -219,6 +220,7 @@ message ApplicationSubmissionContextProto {
|
||||
optional bool unmanaged_am = 7 [default = false];
|
||||
optional int32 maxAppAttempts = 8 [default = 0];
|
||||
optional ResourceProto resource = 9;
|
||||
optional string applicationType = 10 [default = "YARN"];
|
||||
}
|
||||
|
||||
enum ApplicationAccessTypeProto {
|
||||
|
@ -35,7 +35,7 @@
|
||||
|
||||
public class ApplicationCLI extends YarnCLI {
|
||||
private static final String APPLICATIONS_PATTERN =
|
||||
"%30s\t%20s\t%10s\t%10s\t%18s\t%18s\t%15s\t%35s" +
|
||||
"%30s\t%20s\t%20s\t%10s\t%10s\t%18s\t%18s\t%15s\t%35s" +
|
||||
System.getProperty("line.separator");
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
@ -99,16 +99,16 @@ private void listAllApplications() throws YarnRemoteException, IOException {
|
||||
|
||||
writer.println("Total Applications:" + appsReport.size());
|
||||
writer.printf(APPLICATIONS_PATTERN, "Application-Id",
|
||||
"Application-Name", "User", "Queue", "State", "Final-State",
|
||||
"Progress", "Tracking-URL");
|
||||
"Application-Name","Application-Type", "User", "Queue",
|
||||
"State", "Final-State","Progress", "Tracking-URL");
|
||||
for (ApplicationReport appReport : appsReport) {
|
||||
DecimalFormat formatter = new DecimalFormat("###.##%");
|
||||
String progress = formatter.format(appReport.getProgress());
|
||||
writer.printf(APPLICATIONS_PATTERN, appReport.getApplicationId(),
|
||||
appReport.getName(), appReport.getUser(), appReport.getQueue(),
|
||||
appReport.getYarnApplicationState(), appReport
|
||||
.getFinalApplicationStatus(),
|
||||
progress, appReport.getOriginalTrackingUrl());
|
||||
appReport.getName(),appReport.getApplicationType(), appReport.getUser(),
|
||||
appReport.getQueue(),appReport.getYarnApplicationState(),
|
||||
appReport.getFinalApplicationStatus(),progress,
|
||||
appReport.getOriginalTrackingUrl());
|
||||
}
|
||||
writer.flush();
|
||||
}
|
||||
@ -146,6 +146,8 @@ private void printApplicationReport(String applicationId)
|
||||
appReportStr.println(appReport.getApplicationId());
|
||||
appReportStr.print("\tApplication-Name : ");
|
||||
appReportStr.println(appReport.getName());
|
||||
appReportStr.print("\tApplication-Type : ");
|
||||
appReportStr.println(appReport.getApplicationType());
|
||||
appReportStr.print("\tUser : ");
|
||||
appReportStr.println(appReport.getUser());
|
||||
appReportStr.print("\tQueue : ");
|
||||
|
@ -25,6 +25,7 @@
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
@ -32,14 +33,20 @@
|
||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestYarnClient {
|
||||
@ -102,6 +109,36 @@ public void testSubmitApplication() {
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testApplicationType() throws Exception {
|
||||
Logger rootLogger = LogManager.getRootLogger();
|
||||
rootLogger.setLevel(Level.DEBUG);
|
||||
MockRM rm = new MockRM();
|
||||
rm.start();
|
||||
RMApp app = rm.submitApp(2000);
|
||||
RMApp app1 =
|
||||
rm.submitApp(200, "name", "user",
|
||||
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
|
||||
null, "MAPREDUCE");
|
||||
Assert.assertEquals("YARN", app.getApplicationType());
|
||||
Assert.assertEquals("MAPREDUCE", app1.getApplicationType());
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testApplicationTypeLimit() throws Exception {
|
||||
Logger rootLogger = LogManager.getRootLogger();
|
||||
rootLogger.setLevel(Level.DEBUG);
|
||||
MockRM rm = new MockRM();
|
||||
rm.start();
|
||||
RMApp app1 =
|
||||
rm.submitApp(200, "name", "user",
|
||||
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
|
||||
null, "MAPREDUCE-LENGTH-IS-20");
|
||||
Assert.assertEquals("MAPREDUCE-LENGTH-IS-", app1.getApplicationType());
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
private static class MockYarnClient extends YarnClientImpl {
|
||||
private ApplicationReport mockReport;
|
||||
|
||||
|
@ -76,7 +76,7 @@ public void testGetApplicationReport() throws Exception {
|
||||
applicationId, BuilderUtils.newApplicationAttemptId(applicationId, 1),
|
||||
"user", "queue", "appname", "host", 124, null,
|
||||
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f);
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN");
|
||||
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
|
||||
newApplicationReport);
|
||||
int result = cli.run(new String[] { "-status", applicationId.toString() });
|
||||
@ -87,6 +87,7 @@ public void testGetApplicationReport() throws Exception {
|
||||
pw.println("Application Report : ");
|
||||
pw.println("\tApplication-Id : application_1234_0005");
|
||||
pw.println("\tApplication-Name : appname");
|
||||
pw.println("\tApplication-Type : YARN");
|
||||
pw.println("\tUser : user");
|
||||
pw.println("\tQueue : queue");
|
||||
pw.println("\tStart-Time : 0");
|
||||
@ -112,7 +113,7 @@ public void testGetAllApplications() throws Exception {
|
||||
applicationId, BuilderUtils.newApplicationAttemptId(applicationId, 1),
|
||||
"user", "queue", "appname", "host", 124, null,
|
||||
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f);
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN");
|
||||
List<ApplicationReport> applicationReports = new ArrayList<ApplicationReport>();
|
||||
applicationReports.add(newApplicationReport);
|
||||
when(client.getApplicationList()).thenReturn(applicationReports);
|
||||
@ -124,11 +125,13 @@ public void testGetAllApplications() throws Exception {
|
||||
PrintWriter pw = new PrintWriter(baos);
|
||||
pw.println("Total Applications:1");
|
||||
pw.print(" Application-Id\t Application-Name");
|
||||
pw.print("\t Application-Type");
|
||||
pw.print("\t User\t Queue\t State\t ");
|
||||
pw.print("Final-State\t Progress");
|
||||
pw.println("\t Tracking-URL");
|
||||
pw.print(" application_1234_0005\t ");
|
||||
pw.print("appname\t user\t queue\t FINISHED\t ");
|
||||
pw.print("appname\t YARN\t user\t ");
|
||||
pw.print("queue\t FINISHED\t ");
|
||||
pw.print("SUCCEEDED\t 53.79%");
|
||||
pw.println("\t N/A");
|
||||
pw.close();
|
||||
|
@ -35,6 +35,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ClientToken;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
@ -336,7 +337,7 @@ public static ApplicationReport newApplicationReport(
|
||||
String url, long startTime, long finishTime,
|
||||
FinalApplicationStatus finalStatus,
|
||||
ApplicationResourceUsageReport appResources, String origTrackingUrl,
|
||||
float progress) {
|
||||
float progress, String appType) {
|
||||
ApplicationReport report = recordFactory
|
||||
.newRecordInstance(ApplicationReport.class);
|
||||
report.setApplicationId(applicationId);
|
||||
@ -356,9 +357,40 @@ public static ApplicationReport newApplicationReport(
|
||||
report.setApplicationResourceUsageReport(appResources);
|
||||
report.setOriginalTrackingUrl(origTrackingUrl);
|
||||
report.setProgress(progress);
|
||||
report.setApplicationType(appType);
|
||||
return report;
|
||||
}
|
||||
|
||||
public static ApplicationSubmissionContext newApplicationSubmissionContext(
|
||||
ApplicationId applicationId, String applicationName, String queue,
|
||||
Priority priority, ContainerLaunchContext amContainer,
|
||||
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
|
||||
int maxAppAttempts, Resource resource, String applicationType) {
|
||||
ApplicationSubmissionContext context =
|
||||
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
|
||||
context.setApplicationId(applicationId);
|
||||
context.setApplicationName(applicationName);
|
||||
context.setQueue(queue);
|
||||
context.setPriority(priority);
|
||||
context.setAMContainerSpec(amContainer);
|
||||
context.setUnmanagedAM(isUnmanagedAM);
|
||||
context.setCancelTokensWhenComplete(cancelTokensWhenComplete);
|
||||
context.setMaxAppAttempts(maxAppAttempts);
|
||||
context.setResource(resource);
|
||||
context.setApplicationType(applicationType);
|
||||
return context;
|
||||
}
|
||||
|
||||
public static ApplicationSubmissionContext newApplicationSubmissionContext(
|
||||
ApplicationId applicationId, String applicationName, String queue,
|
||||
Priority priority, ContainerLaunchContext amContainer,
|
||||
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
|
||||
int maxAppAttempts, Resource resource) {
|
||||
return newApplicationSubmissionContext(applicationId, applicationName,
|
||||
queue, priority, amContainer, isUnmanagedAM, cancelTokensWhenComplete,
|
||||
maxAppAttempts, resource, null);
|
||||
}
|
||||
|
||||
public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
|
||||
int numUsedContainers, int numReservedContainers, Resource usedResources,
|
||||
Resource reservedResources, Resource neededResources) {
|
||||
|
@ -23,6 +23,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
@ -53,10 +54,11 @@ protected static ApplicationReport createApplicationReport(
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(timestamp, appIdInt);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
BuilderUtils.newApplicationAttemptId(appId, appAttemptIdInt);
|
||||
ApplicationReport appReport = BuilderUtils.newApplicationReport(
|
||||
appId, appAttemptId, "user", "queue", "appname", "host", 124, null,
|
||||
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f);
|
||||
ApplicationReport appReport =
|
||||
BuilderUtils.newApplicationReport(appId, appAttemptId, "user", "queue",
|
||||
"appname", "host", 124, null, YarnApplicationState.FINISHED,
|
||||
"diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
|
||||
"N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE);
|
||||
return appReport;
|
||||
}
|
||||
|
||||
|
@ -298,6 +298,16 @@ public SubmitApplicationResponse submitApplication(
|
||||
submissionContext.setApplicationName(
|
||||
YarnConfiguration.DEFAULT_APPLICATION_NAME);
|
||||
}
|
||||
if (submissionContext.getApplicationType() == null) {
|
||||
submissionContext
|
||||
.setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
|
||||
} else {
|
||||
if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
|
||||
submissionContext.setApplicationType(submissionContext
|
||||
.getApplicationType().substring(0,
|
||||
YarnConfiguration.APPLICATION_TYPE_LENGTH));
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
// call RMAppManager to submit application directly
|
||||
|
@ -269,7 +269,7 @@ protected void submitApplication(
|
||||
submissionContext.getAMContainerSpec().getUser(),
|
||||
submissionContext.getQueue(),
|
||||
submissionContext, this.scheduler, this.masterService,
|
||||
submitTime);
|
||||
submitTime, submissionContext.getApplicationType());
|
||||
|
||||
// Concurrent app submissions with same applicationId will fail here
|
||||
// Concurrent app submissions with different applicationIds will not
|
||||
|
@ -179,4 +179,10 @@ public interface RMApp extends EventHandler<RMAppEvent> {
|
||||
* @return the number of max attempts of the application.
|
||||
*/
|
||||
int getMaxAppAttempts();
|
||||
|
||||
/**
|
||||
* Returns the application type
|
||||
* @return the application type.
|
||||
*/
|
||||
String getApplicationType();
|
||||
}
|
||||
|
@ -97,6 +97,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
= new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
|
||||
private final long submitTime;
|
||||
private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
|
||||
private final String applicationType;
|
||||
|
||||
// Mutable fields
|
||||
private long startTime;
|
||||
@ -230,7 +231,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
||||
Configuration config, String name, String user, String queue,
|
||||
ApplicationSubmissionContext submissionContext,
|
||||
YarnScheduler scheduler,
|
||||
ApplicationMasterService masterService, long submitTime) {
|
||||
ApplicationMasterService masterService, long submitTime, String applicationType) {
|
||||
|
||||
this.applicationId = applicationId;
|
||||
this.name = name;
|
||||
@ -245,6 +246,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
||||
this.masterService = masterService;
|
||||
this.submitTime = submitTime;
|
||||
this.startTime = System.currentTimeMillis();
|
||||
this.applicationType = applicationType;
|
||||
|
||||
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||
@ -472,7 +474,7 @@ public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
|
||||
this.name, host, rpcPort, clientToken,
|
||||
createApplicationState(this.stateMachine.getCurrentState()), diags,
|
||||
trackingUrl, this.startTime, this.finishTime, finishState,
|
||||
appUsageReport, origTrackingUrl, progress);
|
||||
appUsageReport, origTrackingUrl, progress, this.applicationType);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
@ -756,4 +758,9 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getApplicationType() {
|
||||
return this.applicationType;
|
||||
}
|
||||
}
|
||||
|
@ -103,6 +103,7 @@ protected void render(Block html) {
|
||||
info("Application Overview").
|
||||
_("User:", app.getUser()).
|
||||
_("Name:", app.getName()).
|
||||
_("Application Type:", app.getApplicationType()).
|
||||
_("State:", app.getState()).
|
||||
_("FinalStatus:", app.getFinalStatus()).
|
||||
_("Started:", Times.format(app.getStartTime())).
|
||||
|
@ -56,6 +56,7 @@ class AppsBlock extends HtmlBlock {
|
||||
th(".id", "ID").
|
||||
th(".user", "User").
|
||||
th(".name", "Name").
|
||||
th(".type", "Application Type").
|
||||
th(".queue", "Queue").
|
||||
th(".starttime", "StartTime").
|
||||
th(".finishtime", "FinishTime").
|
||||
@ -88,6 +89,8 @@ class AppsBlock extends HtmlBlock {
|
||||
appInfo.getUser()))).append("\",\"")
|
||||
.append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
|
||||
appInfo.getName()))).append("\",\"")
|
||||
.append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
|
||||
appInfo.getApplicationType()))).append("\",\"")
|
||||
.append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
|
||||
appInfo.getQueue()))).append("\",\"")
|
||||
.append(appInfo.getStartTime()).append("\",\"")
|
||||
|
@ -62,7 +62,8 @@ public class AppInfo {
|
||||
protected String trackingUrl;
|
||||
protected String diagnostics;
|
||||
protected long clusterId;
|
||||
|
||||
protected String applicationType;
|
||||
|
||||
// these are only allowed if acls allow
|
||||
protected long startedTime;
|
||||
protected long finishedTime;
|
||||
@ -95,6 +96,7 @@ public AppInfo(RMApp app, Boolean hasAccess) {
|
||||
this.trackingUrlPretty = trackingUrlIsNotReady ? "UNASSIGNED" : join(
|
||||
HttpConfig.getSchemePrefix(), trackingUrl);
|
||||
this.applicationId = app.getApplicationId();
|
||||
this.applicationType = app.getApplicationType();
|
||||
this.appIdNum = String.valueOf(app.getApplicationId().getId());
|
||||
this.id = app.getApplicationId().toString();
|
||||
this.user = app.getUser().toString();
|
||||
@ -215,4 +217,8 @@ public long getClusterId() {
|
||||
return this.clusterId;
|
||||
}
|
||||
|
||||
public String getApplicationType() {
|
||||
return this.applicationType;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -156,6 +156,13 @@ public RMApp submitApp(int masterMemory, String name, String user,
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
||||
int maxAppAttempts, Credentials ts) throws Exception {
|
||||
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
|
||||
maxAppAttempts, ts, null);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
||||
int maxAppAttempts, Credentials ts, String appType) throws Exception {
|
||||
ClientRMProtocol client = getClientRMService();
|
||||
GetNewApplicationResponse resp = client.getNewApplication(Records
|
||||
.newRecord(GetNewApplicationRequest.class));
|
||||
@ -174,6 +181,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
|
||||
if (queue != null) {
|
||||
sub.setQueue(queue);
|
||||
}
|
||||
sub.setApplicationType(appType);
|
||||
ContainerLaunchContext clc = Records
|
||||
.newRecord(ContainerLaunchContext.class);
|
||||
final Resource capability = Records.newRecord(Resource.class);
|
||||
|
@ -449,7 +449,7 @@ private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
|
||||
when(asContext.getMaxAppAttempts()).thenReturn(1);
|
||||
return new RMAppImpl(applicationId3, rmContext, config, null, null,
|
||||
queueName, asContext, yarnScheduler, null , System
|
||||
.currentTimeMillis());
|
||||
.currentTimeMillis(), "YARN");
|
||||
}
|
||||
|
||||
private static YarnScheduler mockYarnScheduler() {
|
||||
|
@ -34,6 +34,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
@ -236,6 +237,11 @@ public FinalApplicationStatus getFinalApplicationStatus() {
|
||||
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getApplicationType() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
}
|
||||
|
||||
public static RMApp newApplication(int i) {
|
||||
@ -250,6 +256,7 @@ public static RMApp newApplication(int i) {
|
||||
final String queue = newQueue();
|
||||
final long start = 123456 + i * 1000;
|
||||
final long finish = 234567 + i * 1000;
|
||||
final String type = YarnConfiguration.DEFAULT_APPLICATION_TYPE;
|
||||
RMAppState[] allStates = RMAppState.values();
|
||||
final RMAppState state = allStates[i % allStates.length];
|
||||
final int maxAppAttempts = i % 1000;
|
||||
@ -268,6 +275,11 @@ public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getApplicationType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueue() {
|
||||
return queue;
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
@ -208,6 +209,11 @@ public FinalApplicationStatus getFinalApplicationStatus() {
|
||||
@Override
|
||||
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getApplicationType() {
|
||||
return YarnConfiguration.DEFAULT_APPLICATION_TYPE;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -186,7 +186,7 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext)
|
||||
RMApp application =
|
||||
new RMAppImpl(applicationId, rmContext, conf, name, user, queue,
|
||||
submissionContext, scheduler, masterService,
|
||||
System.currentTimeMillis());
|
||||
System.currentTimeMillis(), "YARN");
|
||||
|
||||
testAppStartState(applicationId, user, name, queue, application);
|
||||
return application;
|
||||
|
@ -1619,7 +1619,7 @@ public void testNotAllowSubmitApplication() throws Exception {
|
||||
RMApp application =
|
||||
new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user,
|
||||
queue, submissionContext, scheduler, masterService,
|
||||
System.currentTimeMillis());
|
||||
System.currentTimeMillis(), "YARN");
|
||||
resourceManager.getRMContext().getRMApps().putIfAbsent(applicationId, application);
|
||||
application.handle(new RMAppEvent(applicationId, RMAppEventType.START));
|
||||
|
||||
|
@ -788,6 +788,7 @@ public void verifyAppsXML(NodeList nodes, RMApp app) throws JSONException,
|
||||
WebServicesTestUtils.getXmlString(element, "id"),
|
||||
WebServicesTestUtils.getXmlString(element, "user"),
|
||||
WebServicesTestUtils.getXmlString(element, "name"),
|
||||
WebServicesTestUtils.getXmlString(element, "applicationType"),
|
||||
WebServicesTestUtils.getXmlString(element, "queue"),
|
||||
WebServicesTestUtils.getXmlString(element, "state"),
|
||||
WebServicesTestUtils.getXmlString(element, "finalStatus"),
|
||||
@ -807,20 +808,20 @@ public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
|
||||
Exception {
|
||||
|
||||
// 15 because trackingUrl not assigned yet
|
||||
assertEquals("incorrect number of elements", 15, info.length());
|
||||
assertEquals("incorrect number of elements", 16, info.length());
|
||||
|
||||
verifyAppInfoGeneric(app, info.getString("id"), info.getString("user"),
|
||||
info.getString("name"), info.getString("queue"),
|
||||
info.getString("state"), info.getString("finalStatus"),
|
||||
(float) info.getDouble("progress"), info.getString("trackingUI"),
|
||||
info.getString("diagnostics"), info.getLong("clusterId"),
|
||||
info.getLong("startedTime"), info.getLong("finishedTime"),
|
||||
info.getLong("elapsedTime"), info.getString("amHostHttpAddress"),
|
||||
info.getString("amContainerLogs"));
|
||||
info.getString("name"), info.getString("applicationType"), info.getString("queue"),
|
||||
info.getString("state"), info.getString("finalStatus"),
|
||||
(float) info.getDouble("progress"), info.getString("trackingUI"),
|
||||
info.getString("diagnostics"), info.getLong("clusterId"),
|
||||
info.getLong("startedTime"), info.getLong("finishedTime"),
|
||||
info.getLong("elapsedTime"), info.getString("amHostHttpAddress"),
|
||||
info.getString("amContainerLogs"));
|
||||
}
|
||||
|
||||
public void verifyAppInfoGeneric(RMApp app, String id, String user,
|
||||
String name, String queue, String state, String finalStatus,
|
||||
String name, String applicationType, String queue, String state, String finalStatus,
|
||||
float progress, String trackingUI, String diagnostics, long clusterId,
|
||||
long startedTime, long finishedTime, long elapsedTime,
|
||||
String amHostHttpAddress, String amContainerLogs) throws JSONException,
|
||||
@ -830,6 +831,8 @@ public void verifyAppInfoGeneric(RMApp app, String id, String user,
|
||||
.toString(), id);
|
||||
WebServicesTestUtils.checkStringMatch("user", app.getUser(), user);
|
||||
WebServicesTestUtils.checkStringMatch("name", app.getName(), name);
|
||||
WebServicesTestUtils.checkStringMatch("applicationType",
|
||||
app.getApplicationType(), applicationType);
|
||||
WebServicesTestUtils.checkStringMatch("queue", app.getQueue(), queue);
|
||||
WebServicesTestUtils.checkStringMatch("state", app.getState().toString(),
|
||||
state);
|
||||
|
@ -1227,6 +1227,7 @@ ResourceManager REST API's.
|
||||
<id>application_1326815542473_0001</id>
|
||||
<user>user1</user>
|
||||
<name>word count</name>
|
||||
<applicationType>MAPREDUCE</applicationType>
|
||||
<queue>default</queue>
|
||||
<state>FINISHED</state>
|
||||
<finalStatus>SUCCEEDED</finalStatus>
|
||||
@ -1247,6 +1248,7 @@ _01_000001</amContainerLogs>
|
||||
<id>application_1326815542473_0002</id>
|
||||
<user>user1</user>
|
||||
<name>Sleep job</name>
|
||||
<applicationType>YARN</applicationType>
|
||||
<queue>default</queue>
|
||||
<state>FINISHED</state>
|
||||
<finalStatus>SUCCEEDED</finalStatus>
|
||||
@ -1302,6 +1304,8 @@ _01_000001</amContainerLogs>
|
||||
*---------------+--------------+--------------------------------+
|
||||
| name | string | The application name |
|
||||
*---------------+--------------+--------------------------------+
|
||||
| Application Type | string | The application type |
|
||||
*---------------+--------------+--------------------------------+
|
||||
| queue | string | The queue the application was submitted to|
|
||||
*---------------+--------------+--------------------------------+
|
||||
| state | string | The application state according to the ResourceManager - valid values are: NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED|
|
||||
@ -1364,6 +1368,7 @@ _01_000001</amContainerLogs>
|
||||
"amHostHttpAddress" : "host.domain.com:8042",
|
||||
"progress" : 100,
|
||||
"name" : "Sleep job",
|
||||
"applicationType" : "Yarn",
|
||||
"startedTime" : 1326824544552,
|
||||
"elapsedTime" : 446748,
|
||||
"diagnostics" : "",
|
||||
|
Loading…
Reference in New Issue
Block a user