diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java index 19afb253e0..51e2356058 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java @@ -368,6 +368,17 @@ private static ApplicationReportExt convertToApplicationReport( .toString()); queue = eventInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO) .toString(); + } else if (event.getEventType().equals( + ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE)) { + Map eventInfo = event.getEventInfo(); + if (eventInfo == null) { + continue; + } + if (eventInfo.containsKey( + ApplicationMetricsConstants.STATE_EVENT_INFO)) { + state = YarnApplicationState.valueOf(eventInfo.get( + ApplicationMetricsConstants.STATE_EVENT_INFO).toString()); + } } else if (event.getEventType().equals( ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { progress=1.0F; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java index 75dce070f5..deae8943ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java @@ -274,12 +274,8 @@ public Response getLogs(@Context HttpServletRequest req, } } - // TODO: YARN-5029. RM would send the update event. We could get - // the consistent YarnApplicationState. - // Will remove YarnApplicationState.ACCEPTED. private boolean isRunningState(YarnApplicationState appState) { - return appState == YarnApplicationState.ACCEPTED - || appState == YarnApplicationState.RUNNING; + return appState == YarnApplicationState.RUNNING; } private boolean isFinishedState(YarnApplicationState appState) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java index c456e54d52..d06b7cb68c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java @@ -40,6 +40,9 @@ public class ApplicationMetricsConstants { public static final String UPDATED_EVENT_TYPE = "YARN_APPLICATION_UPDATED"; + public static final String STATE_UPDATED_EVENT_TYPE = + "YARN_APPLICATION_STATE_UPDATED"; + public static final String NAME_ENTITY_INFO = "YARN_APPLICATION_NAME"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicaitonStateUpdatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicaitonStateUpdatedEvent.java new file mode 100644 index 0000000000..599e8d6710 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicaitonStateUpdatedEvent.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; + +/** + * When the state of this application has been changed, RM would sent + * this event to inform Timeline Server for keeping the Application state + * consistent. + */ +public class ApplicaitonStateUpdatedEvent extends SystemMetricsEvent{ + private ApplicationId appId; + private YarnApplicationState appState; + + public ApplicaitonStateUpdatedEvent(ApplicationId appliocationId, + YarnApplicationState state, long updatedTime) { + super(SystemMetricsEventType.APP_STATE_UPDATED, updatedTime); + this.appId = appliocationId; + this.appState = state; + } + + public ApplicationId getApplicationId() { + return appId; + } + + public YarnApplicationState getAppState() { + return appState; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java index c11034ed7a..fcda4b4e47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java @@ -25,6 +25,7 @@ public enum SystemMetricsEventType { APP_FINISHED, APP_ACLS_UPDATED, APP_UPDATED, + APP_STATE_UPDATED, // app attempt events APP_ATTEMPT_REGISTERED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java index cba87903fb..0e3802b574 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; @@ -54,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import com.google.common.annotations.VisibleForTesting; + /** * The class that helps RM publish metrics to the timeline server. RM will * always invoke the methods of this class regardless the service is enabled or @@ -157,6 +160,18 @@ public void appACLsUpdated(RMApp app, String appViewACLs, } } + @SuppressWarnings("unchecked") + public void appStateUpdated(RMApp app, YarnApplicationState appState, + long updatedTime) { + if (publishSystemMetrics) { + dispatcher.getEventHandler().handle( + new ApplicaitonStateUpdatedEvent( + app.getApplicationId(), + appState, + updatedTime)); + } + } + @SuppressWarnings("unchecked") public void appAttemptRegistered(RMAppAttempt appAttempt, long registeredTime) { @@ -235,32 +250,36 @@ protected Dispatcher createDispatcher(Configuration conf) { protected void handleSystemMetricsEvent( SystemMetricsEvent event) { switch (event.getType()) { - case APP_CREATED: - publishApplicationCreatedEvent((ApplicationCreatedEvent) event); - break; - case APP_FINISHED: - publishApplicationFinishedEvent((ApplicationFinishedEvent) event); - break; - case APP_ACLS_UPDATED: - publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); - break; - case APP_UPDATED: - publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event); - break; - case APP_ATTEMPT_REGISTERED: - publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); - break; - case APP_ATTEMPT_FINISHED: - publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); - break; - case CONTAINER_CREATED: - publishContainerCreatedEvent((ContainerCreatedEvent) event); - break; - case CONTAINER_FINISHED: - publishContainerFinishedEvent((ContainerFinishedEvent) event); - break; - default: - LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); + case APP_CREATED: + publishApplicationCreatedEvent((ApplicationCreatedEvent) event); + break; + case APP_FINISHED: + publishApplicationFinishedEvent((ApplicationFinishedEvent) event); + break; + case APP_ACLS_UPDATED: + publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); + break; + case APP_UPDATED: + publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event); + break; + case APP_STATE_UPDATED: + publishApplicationStateUpdatedEvent( + (ApplicaitonStateUpdatedEvent)event); + break; + case APP_ATTEMPT_REGISTERED: + publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); + break; + case APP_ATTEMPT_FINISHED: + publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); + break; + case CONTAINER_CREATED: + publishContainerCreatedEvent((ContainerCreatedEvent) event); + break; + case CONTAINER_FINISHED: + publishContainerFinishedEvent((ContainerFinishedEvent) event); + break; + default: + LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); } } @@ -352,6 +371,20 @@ private void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) { putEntity(entity); } + private void publishApplicationStateUpdatedEvent( + ApplicaitonStateUpdatedEvent event) { + TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + event.getAppState()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + private void publishApplicationACLsUpdatedEvent( ApplicationACLsUpdatedEvent event) { TimelineEntity entity = @@ -501,7 +534,9 @@ private static TimelineEntity createContainerEntity( return entity; } - private void putEntity(TimelineEntity entity) { + @Private + @VisibleForTesting + public void putEntity(TimelineEntity entity) { try { if (LOG.isDebugEnabled()) { LOG.debug("Publishing the entity " + entity.getEntityId() + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 10c9edc2b4..07d5a7459a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -250,7 +250,8 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.MOVE, new RMAppMoveTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, - RMAppEventType.ATTEMPT_REGISTERED) + RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition( + YarnApplicationState.RUNNING)) .addTransition(RMAppState.ACCEPTED, EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), // ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED @@ -928,7 +929,21 @@ public void transition(RMAppImpl app, RMAppEvent event) { nodeUpdateEvent.getNode()); }; } - + + private static final class RMAppStateUpdateTransition + extends RMAppTransition { + private YarnApplicationState stateToATS; + + public RMAppStateUpdateTransition(YarnApplicationState state) { + stateToATS = state; + } + + public void transition(RMAppImpl app, RMAppEvent event) { + app.rmContext.getSystemMetricsPublisher().appStateUpdated( + app, stateToATS, app.systemClock.getTime()); + }; + } + private static final class AppRunningOnNodeTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java index 0738a2b77c..3c46f1a2ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java @@ -106,13 +106,15 @@ public static void tearDown() throws Exception { @Test(timeout = 10000) public void testPublishApplicationMetrics() throws Exception { + long stateUpdateTimeStamp = System.currentTimeMillis(); for (int i = 1; i <= 2; ++i) { ApplicationId appId = ApplicationId.newInstance(0, i); RMApp app = createRMApp(appId); metricsPublisher.appCreated(app, app.getStartTime()); if (i == 1) { when(app.getQueue()).thenReturn("new test queue"); - ApplicationSubmissionContext asc = mock(ApplicationSubmissionContext.class); + ApplicationSubmissionContext asc = mock( + ApplicationSubmissionContext.class); when(asc.getUnmanagedAM()).thenReturn(false); when(asc.getPriority()).thenReturn(Priority.newInstance(1)); when(asc.getNodeLabelExpression()).thenReturn("high-cpu"); @@ -121,7 +123,10 @@ public void testPublishApplicationMetrics() throws Exception { } else { metricsPublisher.appUpdated(app, 4L); } - metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime()); + metricsPublisher.appStateUpdated(app, YarnApplicationState.RUNNING, + stateUpdateTimeStamp); + metricsPublisher.appFinished(app, RMAppState.FINISHED, + app.getFinishTime()); if (i == 1) { metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L); } else { @@ -134,8 +139,8 @@ public void testPublishApplicationMetrics() throws Exception { store.getEntity(appId.toString(), ApplicationMetricsConstants.ENTITY_TYPE, EnumSet.allOf(Field.class)); - // ensure three events are both published before leaving the loop - } while (entity == null || entity.getEvents().size() < 4); + // ensure Five events are both published before leaving the loop + } while (entity == null || entity.getEvents().size() < 5); // verify all the fields Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE, entity.getEntityType()); @@ -212,6 +217,7 @@ public void testPublishApplicationMetrics() throws Exception { boolean hasUpdatedEvent = false; boolean hasFinishedEvent = false; boolean hasACLsUpdatedEvent = false; + boolean hasStateUpdateEvent = false; for (TimelineEvent event : entity.getEvents()) { if (event.getEventType().equals( ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { @@ -249,10 +255,21 @@ public void testPublishApplicationMetrics() throws Exception { ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) { hasACLsUpdatedEvent = true; Assert.assertEquals(4L, event.getTimestamp()); + } else if (event.getEventType().equals( + ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE)) { + hasStateUpdateEvent = true; + Assert.assertEquals(event.getTimestamp(), stateUpdateTimeStamp); + Assert.assertEquals(YarnApplicationState.RUNNING.toString(), event + .getEventInfo().get( + ApplicationMetricsConstants.STATE_EVENT_INFO)); } } - Assert.assertTrue(hasCreatedEvent && hasFinishedEvent - && hasACLsUpdatedEvent && hasUpdatedEvent); + // Do assertTrue verification separately for easier debug + Assert.assertTrue(hasCreatedEvent); + Assert.assertTrue(hasFinishedEvent); + Assert.assertTrue(hasACLsUpdatedEvent); + Assert.assertTrue(hasUpdatedEvent); + Assert.assertTrue(hasStateUpdateEvent); } }