YARN-8219. Add application launch time to ATSV2. Contributed by Abhishek Modi.

This commit is contained in:
Rohith Sharma K S 2019-02-06 09:31:20 +05:30
parent 1e5e08d83b
commit 7fa62e150c
9 changed files with 47 additions and 4 deletions

View File

@ -34,6 +34,9 @@ public class ApplicationMetricsConstants {
public static final String FINISHED_EVENT_TYPE = public static final String FINISHED_EVENT_TYPE =
"YARN_APPLICATION_FINISHED"; "YARN_APPLICATION_FINISHED";
public static final String LAUNCHED_EVENT_TYPE =
"YARN_APPLICATION_LAUNCHED";
public static final String ACLS_UPDATED_EVENT_TYPE = public static final String ACLS_UPDATED_EVENT_TYPE =
"YARN_APPLICATION_ACLS_UPDATED"; "YARN_APPLICATION_ACLS_UPDATED";

View File

@ -258,6 +258,7 @@ public static ApplicationReport convertToApplicationReport(
String type = null; String type = null;
boolean unmanagedApplication = false; boolean unmanagedApplication = false;
long createdTime = 0; long createdTime = 0;
long launchTime = 0;
long finishedTime = 0; long finishedTime = 0;
float progress = 0.0f; float progress = 0.0f;
int applicationPriority = 0; int applicationPriority = 0;
@ -416,6 +417,9 @@ public static ApplicationReport convertToApplicationReport(
if (event.getId().equals( if (event.getId().equals(
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
createdTime = event.getTimestamp(); createdTime = event.getTimestamp();
} else if (event.getId().equals(
ApplicationMetricsConstants.LAUNCHED_EVENT_TYPE)) {
launchTime = event.getTimestamp();
} else if (event.getId().equals( } else if (event.getId().equals(
ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) { ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) {
// This type of events are parsed in time-stamp descending order // This type of events are parsed in time-stamp descending order
@ -449,8 +453,9 @@ public static ApplicationReport convertToApplicationReport(
return ApplicationReport.newInstance( return ApplicationReport.newInstance(
ApplicationId.fromString(entity.getId()), ApplicationId.fromString(entity.getId()),
latestApplicationAttemptId, user, queue, name, null, -1, null, state, latestApplicationAttemptId, user, queue, name, null, -1, null, state,
diagnosticsInfo, null, createdTime, finishedTime, finalStatus, diagnosticsInfo, null, createdTime, launchTime,
appResources, null, progress, type, null, appTags, unmanagedApplication, finishedTime, finalStatus, appResources, null,
progress, type, null, appTags, unmanagedApplication,
Priority.newInstance(applicationPriority), appNodeLabelExpression, Priority.newInstance(applicationPriority), appNodeLabelExpression,
amNodeLabelExpression); amNodeLabelExpression);
} }

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
/** /**
* Abstract implementation of SystemMetricsPublisher which is then extended by * Abstract implementation of SystemMetricsPublisher which is then extended by
@ -122,6 +123,10 @@ protected enum SystemMetricsEventType {
PUBLISH_ENTITY, PUBLISH_APPLICATION_FINISHED_ENTITY PUBLISH_ENTITY, PUBLISH_APPLICATION_FINISHED_ENTITY
} }
@Override
public void appLaunched(RMApp app, long launchTime) {
}
/** /**
* TimelinePublishEvent's hash code should be based on application's id this * TimelinePublishEvent's hash code should be based on application's id this
* will ensure all the events related to a particular app goes to particular * will ensure all the events related to a particular app goes to particular

View File

@ -45,6 +45,12 @@ public void appCreated(RMApp app, long createdTime) {
} }
} }
@Override public void appLaunched(RMApp app, long launchTime) {
for (SystemMetricsPublisher publisher : this.publishers) {
publisher.appLaunched(app, launchTime);
}
}
@Override @Override
public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
for (SystemMetricsPublisher publisher : this.publishers) { for (SystemMetricsPublisher publisher : this.publishers) {

View File

@ -67,4 +67,8 @@ public void appUpdated(RMApp app, long currentTimeMillis) {
public void appStateUpdated(RMApp app, YarnApplicationState appState, public void appStateUpdated(RMApp app, YarnApplicationState appState,
long updatedTime) { long updatedTime) {
} }
@Override
public void appLaunched(RMApp app, long launchTime) {
}
} }

View File

@ -32,6 +32,8 @@ public interface SystemMetricsPublisher {
void appCreated(RMApp app, long createdTime); void appCreated(RMApp app, long createdTime);
void appLaunched(RMApp app, long launchTime);
void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime); void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime);
void appUpdated(RMApp app, long updatedTime); void appUpdated(RMApp app, long updatedTime);

View File

@ -153,6 +153,20 @@ public void appCreated(RMApp app, long createdTime) {
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
} }
@SuppressWarnings("unchecked")
@Override
public void appLaunched(RMApp app, long launchTime) {
ApplicationEntity entity =
createApplicationEntity(app.getApplicationId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ApplicationMetricsConstants.LAUNCHED_EVENT_TYPE);
tEvent.setTimestamp(launchTime);
entity.addEvent(tEvent);
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void appFinished(RMApp app, RMAppState state, long finishedTime) { public void appFinished(RMApp app, RMAppState state, long finishedTime) {

View File

@ -293,7 +293,7 @@ RMAppEventType.KILL, new KillAttemptTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.APP_RUNNING_ON_NODE, RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition()) new AppRunningOnNodeTransition())
// Handle AppAttemptLaunch to upate the launchTime and publish to ATS // Handle AppAttemptLaunch to update the launchTime and publish to ATS
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.ATTEMPT_LAUNCHED, RMAppEventType.ATTEMPT_LAUNCHED,
new AttemptLaunchedTransition()) new AttemptLaunchedTransition())
@ -1067,6 +1067,8 @@ public void transition(RMAppImpl app, RMAppEvent event) {
app.getCurrentAppAttempt().getAppAttemptId()+ app.getCurrentAppAttempt().getAppAttemptId()+
"launchTime: "+event.getTimestamp()); "launchTime: "+event.getTimestamp());
app.launchTime = event.getTimestamp(); app.launchTime = event.getTimestamp();
app.rmContext.getSystemMetricsPublisher().appLaunched(
app, app.launchTime);
} }
} }
} }

View File

@ -203,6 +203,7 @@ public void testPublishApplicationMetrics() throws Exception {
RMApp app = createAppAndRegister(appId); RMApp app = createAppAndRegister(appId);
metricsPublisher.appCreated(app, app.getStartTime()); metricsPublisher.appCreated(app, app.getStartTime());
metricsPublisher.appLaunched(app, app.getLaunchTime());
metricsPublisher.appACLsUpdated(app, "user1,user2", 4L); metricsPublisher.appACLsUpdated(app, "user1,user2", 4L);
metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime()); metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
dispatcher.await(); dispatcher.await();
@ -221,7 +222,7 @@ public void testPublishApplicationMetrics() throws Exception {
File appFile = new File(outputDirApp, timelineServiceFileName); File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists()); Assert.assertTrue(appFile.exists());
verifyEntity( verifyEntity(
appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE, 8, 0); appFile, 4, ApplicationMetricsConstants.CREATED_EVENT_TYPE, 8, 0);
} }
@Test(timeout = 10000) @Test(timeout = 10000)
@ -356,6 +357,7 @@ private static RMApp createRMApp(ApplicationId appId) {
when(app.getQueue()).thenReturn("test queue"); when(app.getQueue()).thenReturn("test queue");
when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L); when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L);
when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L); when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L);
when(app.getLaunchTime()).thenReturn(Integer.MAX_VALUE + 2L);
when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L); when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L);
when(app.getDiagnostics()).thenReturn( when(app.getDiagnostics()).thenReturn(
new StringBuilder("test diagnostics info")); new StringBuilder("test diagnostics info"));