YARN-1462. AHS API and other AHS changes to handle tags for completed MR jobs. Contributed by Xuan Gong

This commit is contained in:
Xuan 2015-06-05 12:48:52 -07:00
parent 75885852cc
commit 3e000a919f
9 changed files with 124 additions and 7 deletions

View File

@ -292,6 +292,8 @@ Release 2.8.0 - UNRELEASED
YARN-2392. Add more diags about app retry limits on AM failures. (Steve YARN-2392. Add more diags about app retry limits on AM failures. (Steve
Loughran via jianhe) Loughran via jianhe)
YARN-1462. AHS API and other AHS changes to handle tags for completed MR jobs. (xgong)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -83,6 +83,25 @@ public static ApplicationReport newInstance(ApplicationId applicationId,
return report; return report;
} }
@Private
@Unstable
public static ApplicationReport newInstance(ApplicationId applicationId,
ApplicationAttemptId applicationAttemptId, String user, String queue,
String name, String host, int rpcPort, Token clientToAMToken,
YarnApplicationState state, String diagnostics, String url,
long startTime, long finishTime, FinalApplicationStatus finalStatus,
ApplicationResourceUsageReport appResources, String origTrackingUrl,
float progress, String applicationType, Token amRmToken,
Set<String> tags) {
ApplicationReport report =
newInstance(applicationId, applicationAttemptId, user, queue, name,
host, rpcPort, clientToAMToken, state, diagnostics, url, startTime,
finishTime, finalStatus, appResources, origTrackingUrl, progress,
applicationType, amRmToken);
report.setApplicationTags(tags);
return report;
}
/** /**
* Get the <code>ApplicationId</code> of the application. * Get the <code>ApplicationId</code> of the application.
* @return <code>ApplicationId</code> of the application * @return <code>ApplicationId</code> of the application

View File

@ -19,11 +19,14 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice; package org.apache.hadoop.yarn.server.applicationhistoryservice;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -249,6 +252,7 @@ private static ApplicationReportExt convertToApplicationReport(
FinalApplicationStatus finalStatus = FinalApplicationStatus.UNDEFINED; FinalApplicationStatus finalStatus = FinalApplicationStatus.UNDEFINED;
YarnApplicationState state = YarnApplicationState.ACCEPTED; YarnApplicationState state = YarnApplicationState.ACCEPTED;
ApplicationResourceUsageReport appResources = null; ApplicationResourceUsageReport appResources = null;
Set<String> appTags = null;
Map<ApplicationAccessType, String> appViewACLs = Map<ApplicationAccessType, String> appViewACLs =
new HashMap<ApplicationAccessType, String>(); new HashMap<ApplicationAccessType, String>();
Map<String, Object> entityInfo = entity.getOtherInfo(); Map<String, Object> entityInfo = entity.getOtherInfo();
@ -270,7 +274,7 @@ private static ApplicationReportExt convertToApplicationReport(
ConverterUtils.toApplicationId(entity.getEntityId()), ConverterUtils.toApplicationId(entity.getEntityId()),
latestApplicationAttemptId, user, queue, name, null, -1, null, state, latestApplicationAttemptId, user, queue, name, null, -1, null, state,
diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null, diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null,
null, progress, type, null), appViewACLs); null, progress, type, null, appTags), appViewACLs);
} }
if (entityInfo.containsKey(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)) { if (entityInfo.containsKey(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)) {
queue = queue =
@ -295,6 +299,17 @@ private static ApplicationReportExt convertToApplicationReport(
appResources=ApplicationResourceUsageReport appResources=ApplicationResourceUsageReport
.newInstance(0, 0, null, null, null, memorySeconds, vcoreSeconds); .newInstance(0, 0, null, null, null, memorySeconds, vcoreSeconds);
} }
if (entityInfo.containsKey(ApplicationMetricsConstants.APP_TAGS_INFO)) {
appTags = new HashSet<String>();
Object obj = entityInfo.get(ApplicationMetricsConstants.APP_TAGS_INFO);
if (obj != null && obj instanceof Collection<?>) {
for(Object o : (Collection<?>)obj) {
if (o != null) {
appTags.add(o.toString());
}
}
}
}
} }
List<TimelineEvent> events = entity.getEvents(); List<TimelineEvent> events = entity.getEvents();
if (events != null) { if (events != null) {
@ -347,7 +362,7 @@ private static ApplicationReportExt convertToApplicationReport(
ConverterUtils.toApplicationId(entity.getEntityId()), ConverterUtils.toApplicationId(entity.getEntityId()),
latestApplicationAttemptId, user, queue, name, null, -1, null, state, latestApplicationAttemptId, user, queue, name, null, -1, null, state,
diagnosticsInfo, null, createdTime, finishedTime, finalStatus, appResources, diagnosticsInfo, null, createdTime, finishedTime, finalStatus, appResources,
null, progress, type, null), appViewACLs); null, progress, type, null, appTags), appViewACLs);
} }
private static ApplicationAttemptReport convertToApplicationAttemptReport( private static ApplicationAttemptReport convertToApplicationAttemptReport(

View File

@ -22,7 +22,9 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@ -185,6 +187,9 @@ public ApplicationReport run() throws Exception {
Assert.assertEquals(Integer.MAX_VALUE + 3L Assert.assertEquals(Integer.MAX_VALUE + 3L
+ +app.getApplicationId().getId(), app.getFinishTime()); + +app.getApplicationId().getId(), app.getFinishTime());
Assert.assertTrue(Math.abs(app.getProgress() - 1.0F) < 0.0001); Assert.assertTrue(Math.abs(app.getProgress() - 1.0F) < 0.0001);
Assert.assertEquals(2, app.getApplicationTags().size());
Assert.assertTrue(app.getApplicationTags().contains("Test_APP_TAGS_1"));
Assert.assertTrue(app.getApplicationTags().contains("Test_APP_TAGS_2"));
// App 2 doesn't have the ACLs, such that the default ACLs " " will be used. // App 2 doesn't have the ACLs, such that the default ACLs " " will be used.
// Nobody except admin and owner has access to the details of the app. // Nobody except admin and owner has access to the details of the app.
if ((i == 1 && callerUGI != null && if ((i == 1 && callerUGI != null &&
@ -471,6 +476,10 @@ private static TimelineEntity createApplicationTimelineEntity(
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
"user2"); "user2");
} }
Set<String> appTags = new HashSet<String>();
appTags.add("Test_APP_TAGS_1");
appTags.add("Test_APP_TAGS_2");
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, appTags);
entity.setOtherInfo(entityInfo); entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent(); TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE); tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE);

View File

@ -73,4 +73,5 @@ public class ApplicationMetricsConstants {
public static final String LATEST_APP_ATTEMPT_EVENT_INFO = public static final String LATEST_APP_ATTEMPT_EVENT_INFO =
"YARN_APPLICATION_LATEST_APP_ATTEMPT"; "YARN_APPLICATION_LATEST_APP_ATTEMPT";
public static final String APP_TAGS_INFO = "YARN_APPLICATION_TAGS";
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.metrics; package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
public class ApplicationCreatedEvent extends public class ApplicationCreatedEvent extends
@ -29,6 +31,7 @@ public class ApplicationCreatedEvent extends
private String user; private String user;
private String queue; private String queue;
private long submittedTime; private long submittedTime;
private Set<String> appTags;
public ApplicationCreatedEvent(ApplicationId appId, public ApplicationCreatedEvent(ApplicationId appId,
String name, String name,
@ -36,7 +39,8 @@ public ApplicationCreatedEvent(ApplicationId appId,
String user, String user,
String queue, String queue,
long submittedTime, long submittedTime,
long createdTime) { long createdTime,
Set<String> appTags) {
super(SystemMetricsEventType.APP_CREATED, createdTime); super(SystemMetricsEventType.APP_CREATED, createdTime);
this.appId = appId; this.appId = appId;
this.name = name; this.name = name;
@ -44,6 +48,7 @@ public ApplicationCreatedEvent(ApplicationId appId,
this.user = user; this.user = user;
this.queue = queue; this.queue = queue;
this.submittedTime = submittedTime; this.submittedTime = submittedTime;
this.appTags = appTags;
} }
@Override @Override
@ -75,4 +80,7 @@ public long getSubmittedTime() {
return submittedTime; return submittedTime;
} }
public Set<String> getAppTags() {
return appTags;
}
} }

View File

@ -106,7 +106,7 @@ public void appCreated(RMApp app, long createdTime) {
app.getUser(), app.getUser(),
app.getQueue(), app.getQueue(),
app.getSubmitTime(), app.getSubmitTime(),
createdTime)); createdTime, app.getApplicationTags()));
} }
} }
@ -251,6 +251,8 @@ private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
event.getQueue()); event.getQueue());
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
event.getSubmittedTime()); event.getSubmittedTime());
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
event.getAppTags());
entity.setOtherInfo(entityInfo); entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent(); TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType( tEvent.setEventType(

View File

@ -21,7 +21,11 @@
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -143,6 +147,8 @@ public void testPublishApplicationMetrics() throws Exception {
Assert.assertEquals(app.getSubmitTime(), Assert.assertEquals(app.getSubmitTime(),
entity.getOtherInfo().get( entity.getOtherInfo().get(
ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO)); ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO));
Assert.assertTrue(verifyAppTags(app.getApplicationTags(),
entity.getOtherInfo()));
if (i == 1) { if (i == 1) {
Assert.assertEquals("uers1,user2", Assert.assertEquals("uers1,user2",
entity.getOtherInfo().get( entity.getOtherInfo().get(
@ -352,6 +358,10 @@ private static RMApp createRMApp(ApplicationId appId) {
FinalApplicationStatus.UNDEFINED); FinalApplicationStatus.UNDEFINED);
when(app.getRMAppMetrics()).thenReturn( when(app.getRMAppMetrics()).thenReturn(
new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE)); new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE));
Set<String> appTags = new HashSet<String>();
appTags.add("test");
appTags.add("tags");
when(app.getApplicationTags()).thenReturn(appTags);
return app; return app;
} }
@ -392,4 +402,31 @@ private static RMContainer createRMContainer(ContainerId containerId) {
return container; return container;
} }
private static boolean verifyAppTags(Set<String> appTags,
Map<String, Object> entityInfo) {
if (!entityInfo.containsKey(ApplicationMetricsConstants.APP_TAGS_INFO)) {
return false;
}
Object obj = entityInfo.get(ApplicationMetricsConstants.APP_TAGS_INFO);
if (obj instanceof Collection<?>) {
Collection<?> collection = (Collection<?>) obj;
if (collection.size() != appTags.size()) {
return false;
}
for (String appTag : appTags) {
boolean match = false;
for (Object o : collection) {
if (o.toString().equals(appTag)) {
match = true;
break;
}
}
if (!match) {
return false;
}
}
return true;
}
return false;
}
} }

View File

@ -1125,7 +1125,27 @@ Response Body:
"submittedTime":1430424769395, "submittedTime":1430424769395,
"startedTime":1430424769395, "startedTime":1430424769395,
"finishedTime":1430424776594, "finishedTime":1430424776594,
"elapsedTime":7199 "elapsedTime":7199},
{
"appId":"application_1430424020775_0001",
"currentAppAttemptId":"appattempt_1430424020775_0001_000001",
"user":"zshen",
"name":"QuasiMonteCarlo",
"queue":"default",
"type":"MAPREDUCE",
"host":"localhost",
"rpcPort":56264,
"appState":"FINISHED",
"progress":100.0,
"diagnosticsInfo":"",
"originalTrackingUrl":"http://d-69-91-129-173.dhcp4.washington.edu:19888/jobhistory/job/job_1430424020775_0001",
"trackingUrl":"http://d-69-91-129-173.dhcp4.washington.edu:8088/proxy/application_1430424020775_0001/",
"finalAppStatus":"SUCCEEDED",
"submittedTime":1430424053809,
"startedTime":1430424072153,
"finishedTime":1430424776594,
"elapsedTime":18344,
"applicationTags":"mrapplication,ta-example"
} }
] ]
} }
@ -1227,6 +1247,7 @@ Response Body:
<startedTime>1430424053809</startedTime> <startedTime>1430424053809</startedTime>
<finishedTime>1430424072153</finishedTime> <finishedTime>1430424072153</finishedTime>
<elapsedTime>18344</elapsedTime> <elapsedTime>18344</elapsedTime>
<applicationTags>mrapplication,ta-example</applicationTags>
</app> </app>
</apps> </apps>
@ -1275,7 +1296,8 @@ None
| `allocatedVCores` | int | The sum of virtual cores allocated to the application's running containers | | `allocatedVCores` | int | The sum of virtual cores allocated to the application's running containers |
| `currentAppAttemptId` | string | The latest application attempt ID | | `currentAppAttemptId` | string | The latest application attempt ID |
| `host` | string | The host of the ApplicationMaster | | `host` | string | The host of the ApplicationMaster |
| `rpcPort` | int | The RPC port of the ApplicationMaster; zero if no IPC service declared. | | `rpcPort` | int | The RPC port of the ApplicationMaster; zero if no IPC service declared |
| `applicationTags` | string | The application tags. |
### Response Examples: ### Response Examples:
@ -1311,7 +1333,8 @@ Response Body:
"submittedTime": 1430424053809, "submittedTime": 1430424053809,
"startedTime": 1430424053809, "startedTime": 1430424053809,
"finishedTime": 1430424072153, "finishedTime": 1430424072153,
"elapsedTime": 18344 "elapsedTime": 18344,
"applicationTags": mrapplication,tag-example
} }
#### XML response #### XML response
@ -1349,6 +1372,7 @@ Response Body:
<startedTime>1430424053809</startedTime> <startedTime>1430424053809</startedTime>
<finishedTime>1430424072153</finishedTime> <finishedTime>1430424072153</finishedTime>
<elapsedTime>18344</elapsedTime> <elapsedTime>18344</elapsedTime>
<applicationTags>mrapplication,ta-example</applicationTags>
</app> </app>
## <a name="REST_API_APPLICATION_ATTEMPT_LIST"></a>Application Attempt List ## <a name="REST_API_APPLICATION_ATTEMPT_LIST"></a>Application Attempt List