diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0a0a65c69b..999d82f2c9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -899,6 +899,9 @@ Release 2.8.0 - UNRELEASED YARN-4152. NodeManager crash with NPE when LogAggregationService#stopContainer called for absent container. (Bibin A Chundatt via rohithsharmaks) + YARN-4044. Running applications information changes such as movequeue is not published to + TimeLine server. (Sunil G via rohithsharmaks) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java index 7dac7163cd..96ad5ed51a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java @@ -355,6 +355,18 @@ private static ApplicationReportExt convertToApplicationReport( if (event.getEventType().equals( ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { createdTime = event.getTimestamp(); + } else if (event.getEventType().equals( + ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) { + Map eventInfo = event.getEventInfo(); + if (eventInfo == null) { + continue; + } + applicationPriority = Integer + .parseInt(eventInfo.get( + ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO) + .toString()); + queue = eventInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO) + .toString(); } else if (event.getEventType().equals( ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { progress=1.0F; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java index e24c11ca8e..a669f37d3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java @@ -80,9 +80,9 @@ public static void prepareStore() throws Exception { store = createStore(SCALE); TimelineEntities entities = new TimelineEntities(); entities.addEntity(createApplicationTimelineEntity( - ApplicationId.newInstance(0, SCALE + 1), true, true, false)); + ApplicationId.newInstance(0, SCALE + 1), true, true, false, false)); entities.addEntity(createApplicationTimelineEntity( - ApplicationId.newInstance(0, SCALE + 2), true, false, true)); + ApplicationId.newInstance(0, SCALE + 2), true, false, true, false)); store.put(entities); } @@ -139,10 +139,10 @@ private static void prepareTimelineStore(TimelineStore store, int scale) ApplicationId appId = ApplicationId.newInstance(0, i); if (i == 2) { entities.addEntity(createApplicationTimelineEntity( - appId, true, false, false)); + appId, true, false, false, true)); } else { entities.addEntity(createApplicationTimelineEntity( - appId, false, false, false)); + appId, false, false, false, false)); } store.put(entities); for (int j = 1; j <= scale; ++j) { @@ -182,7 +182,15 @@ public ApplicationReport run() throws Exception { Assert.assertEquals("test app", app.getName()); Assert.assertEquals("test app type", app.getApplicationType()); Assert.assertEquals("user1", app.getUser()); - Assert.assertEquals("test queue", app.getQueue()); + if (i == 2) { + // Change event is fired only in case of app with ID 2, hence verify + // with updated changes. And make sure last updated change is accepted. + Assert.assertEquals("changed queue1", app.getQueue()); + Assert.assertEquals(Priority.newInstance(6), app.getPriority()); + } else { + Assert.assertEquals("test queue", app.getQueue()); + Assert.assertEquals(Priority.newInstance(0), app.getPriority()); + } Assert.assertEquals(Integer.MAX_VALUE + 2L + app.getApplicationId().getId(), app.getStartTime()); Assert.assertEquals(Integer.MAX_VALUE + 3L @@ -458,7 +466,7 @@ public ContainerReport run() throws Exception { private static TimelineEntity createApplicationTimelineEntity( ApplicationId appId, boolean emptyACLs, boolean noAttemptId, - boolean wrongAppId) { + boolean wrongAppId, boolean enableUpdateEvent) { TimelineEntity entity = new TimelineEntity(); entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); if (wrongAppId) { @@ -515,9 +523,32 @@ private static TimelineEntity createApplicationTimelineEntity( } tEvent.setEventInfo(eventInfo); entity.addEvent(tEvent); + if (enableUpdateEvent) { + tEvent = new TimelineEvent(); + createAppModifiedEvent(appId, tEvent, "changed queue", 5); + entity.addEvent(tEvent); + // Change priority alone + tEvent = new TimelineEvent(); + createAppModifiedEvent(appId, tEvent, "changed queue", 6); + // Now change queue + tEvent = new TimelineEvent(); + createAppModifiedEvent(appId, tEvent, "changed queue1", 6); + entity.addEvent(tEvent); + } return entity; } + private static void createAppModifiedEvent(ApplicationId appId, + TimelineEvent tEvent, String queue, int priority) { + tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 4L + appId.getId()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, queue); + eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + priority); + tEvent.setEventInfo(eventInfo); + } + private static TimelineEntity createAppAttemptTimelineEntity( ApplicationAttemptId appAttemptId) { TimelineEntity entity = new TimelineEntity(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java index 3cbcc1e330..9ebbfb4ab2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java @@ -37,6 +37,9 @@ public class ApplicationMetricsConstants { public static final String ACLS_UPDATED_EVENT_TYPE = "YARN_APPLICATION_ACLS_UPDATED"; + public static final String UPDATED_EVENT_TYPE = + "YARN_APPLICATION_UPDATED"; + public static final String NAME_ENTITY_INFO = "YARN_APPLICATION_NAME"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java new file mode 100644 index 0000000000..9e5e1fd985 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; + +public class ApplicationUpdatedEvent extends SystemMetricsEvent { + + private ApplicationId appId; + private String queue; + private Priority applicationPriority; + + public ApplicationUpdatedEvent(ApplicationId appId, String queue, + long updatedTime, Priority applicationPriority) { + super(SystemMetricsEventType.APP_UPDATED, updatedTime); + this.appId = appId; + this.queue = queue; + this.applicationPriority = applicationPriority; + } + + @Override + public int hashCode() { + return appId.hashCode(); + } + + public ApplicationId getApplicationId() { + return appId; + } + + public String getQueue() { + return queue; + } + + public Priority getApplicationPriority() { + return applicationPriority; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java index 7328ce4369..c11034ed7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java @@ -24,6 +24,7 @@ public enum SystemMetricsEventType { APP_CREATED, APP_FINISHED, APP_ACLS_UPDATED, + APP_UPDATED, // app attempt events APP_ATTEMPT_REGISTERED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java index 0852ff4878..0f09735e7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java @@ -118,6 +118,17 @@ public void appCreated(RMApp app, long createdTime) { } } + @SuppressWarnings("unchecked") + public void appUpdated(RMApp app, long updatedTime) { + if (publishSystemMetrics) { + dispatcher.getEventHandler() + .handle( + new ApplicationUpdatedEvent(app.getApplicationId(), app + .getQueue(), updatedTime, app + .getApplicationSubmissionContext().getPriority())); + } + } + @SuppressWarnings("unchecked") public void appFinished(RMApp app, RMAppState state, long finishedTime) { if (publishSystemMetrics) { @@ -228,6 +239,9 @@ protected void handleSystemMetricsEvent( case APP_ACLS_UPDATED: publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); break; + case APP_UPDATED: + publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event); + break; case APP_ATTEMPT_REGISTERED: publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); break; @@ -308,6 +322,21 @@ private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { putEntity(entity); } + private void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) { + TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + event.getQueue()); + eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event + .getApplicationPriority().getPriority()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + private void publishApplicationACLsUpdatedEvent( ApplicationACLsUpdatedEvent event) { TimelineEntity entity = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index ea9aa7030c..42d889e741 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -911,7 +911,10 @@ public void transition(RMAppImpl app, RMAppEvent event) { moveEvent.getResult().setException(ex); return; } - + + app.rmContext.getSystemMetricsPublisher().appUpdated(app, + System.currentTimeMillis()); + // TODO: Write out change to state store (YARN-1558) // Also take care of RM failover moveEvent.getResult().set(null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 465e233642..0fd20f876d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1946,6 +1946,10 @@ public void updateApplicationPriority(Priority newPriority, application.getCurrentAppAttempt()); } + // Update the changed application state to timeline server + rmContext.getSystemMetricsPublisher().appUpdated(rmApp, + System.currentTimeMillis()); + LOG.info("Priority '" + appPriority + "' is updated in queue :" + rmApp.getQueue() + " for application: " + applicationId + " for the user: " + rmApp.getUser()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java index 0498a4f530..98daae7eac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java @@ -109,6 +109,17 @@ public void testPublishApplicationMetrics() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, i); RMApp app = createRMApp(appId); metricsPublisher.appCreated(app, app.getStartTime()); + if (i == 1) { + when(app.getQueue()).thenReturn("new test queue"); + ApplicationSubmissionContext asc = mock(ApplicationSubmissionContext.class); + when(asc.getUnmanagedAM()).thenReturn(false); + when(asc.getPriority()).thenReturn(Priority.newInstance(1)); + when(asc.getNodeLabelExpression()).thenReturn("high-cpu"); + when(app.getApplicationSubmissionContext()).thenReturn(asc); + metricsPublisher.appUpdated(app, 4L); + } else { + metricsPublisher.appUpdated(app, 4L); + } metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime()); if (i == 1) { metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L); @@ -123,7 +134,7 @@ public void testPublishApplicationMetrics() throws Exception { ApplicationMetricsConstants.ENTITY_TYPE, EnumSet.allOf(Field.class)); // ensure three events are both published before leaving the loop - } while (entity == null || entity.getEvents().size() < 3); + } while (entity == null || entity.getEvents().size() < 4); // verify all the fields Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE, entity.getEntityType()); @@ -134,19 +145,24 @@ public void testPublishApplicationMetrics() throws Exception { app.getName(), entity.getOtherInfo().get( ApplicationMetricsConstants.NAME_ENTITY_INFO)); - Assert.assertEquals(app.getQueue(), - entity.getOtherInfo() - .get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)); + if (i != 1) { + Assert.assertEquals( + app.getQueue(), + entity.getOtherInfo().get( + ApplicationMetricsConstants.QUEUE_ENTITY_INFO)); + } Assert.assertEquals( app.getApplicationSubmissionContext().getUnmanagedAM(), entity.getOtherInfo().get( ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO)); - Assert.assertEquals( - app.getApplicationSubmissionContext().getPriority().getPriority(), - entity.getOtherInfo().get( - ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO)); + if (i != 1) { + Assert.assertEquals( + app.getApplicationSubmissionContext().getPriority().getPriority(), + entity.getOtherInfo().get( + ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO)); + } Assert.assertEquals(app.getAmNodeLabelExpression(), entity.getOtherInfo() .get(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION)); @@ -190,6 +206,7 @@ public void testPublishApplicationMetrics() throws Exception { .get(ApplicationMetricsConstants.APP_CPU_METRICS).toString())); } boolean hasCreatedEvent = false; + boolean hasUpdatedEvent = false; boolean hasFinishedEvent = false; boolean hasACLsUpdatedEvent = false; for (TimelineEvent event : entity.getEvents()) { @@ -211,13 +228,28 @@ public void testPublishApplicationMetrics() throws Exception { ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)); Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event .getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO)); + } else if (event.getEventType().equals( + ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) { + hasUpdatedEvent = true; + Assert.assertEquals(4L, event.getTimestamp()); + if (1 == i) { + Assert.assertEquals( + 1, + event.getEventInfo().get( + ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO)); + Assert.assertEquals( + "new test queue", + event.getEventInfo().get( + ApplicationMetricsConstants.QUEUE_ENTITY_INFO)); + } } else if (event.getEventType().equals( ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) { hasACLsUpdatedEvent = true; Assert.assertEquals(4L, event.getTimestamp()); } } - Assert.assertTrue(hasCreatedEvent && hasFinishedEvent && hasACLsUpdatedEvent); + Assert.assertTrue(hasCreatedEvent && hasFinishedEvent + && hasACLsUpdatedEvent && hasUpdatedEvent); } }