diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index aa2943d7e2..d724026e8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -117,12 +117,12 @@ - + - + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 97fd7e9335..0c57ede445 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -74,7 +74,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; @@ -104,11 +107,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter; import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; @@ -309,8 +312,9 @@ protected void serviceInit(Configuration conf) throws Exception { addService(rmApplicationHistoryWriter); rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); - SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher(); - addService(systemMetricsPublisher); + SystemMetricsPublisher systemMetricsPublisher = + createSystemMetricsPublisher(); + addIfService(systemMetricsPublisher); rmContext.setSystemMetricsPublisher(systemMetricsPublisher); super.serviceInit(this.conf); @@ -465,7 +469,24 @@ private RMTimelineCollectorManager createRMTimelineCollectorManager() { } protected SystemMetricsPublisher createSystemMetricsPublisher() { - return new SystemMetricsPublisher(rmContext); + boolean timelineServiceEnabled = + conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); + SystemMetricsPublisher publisher = null; + if (timelineServiceEnabled) { + if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) { + LOG.info("TimelineService V1 is configured"); + publisher = new TimelineServiceV1Publisher(); + } else { + LOG.info("TimelineService V2 is configured"); + publisher = new TimelineServiceV2Publisher(rmContext); + } + } else { + LOG.info("TimelineServicePublisher is not configured"); + publisher = new NoOpSystemMetricPublisher(); + } + return publisher; } // sanity check for configurations @@ -585,10 +606,6 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(rmApplicationHistoryWriter); rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); - SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher(); - addService(systemMetricsPublisher); - rmContext.setSystemMetricsPublisher(systemMetricsPublisher); - RMTimelineCollectorManager timelineCollectorManager = createRMTimelineCollectorManager(); addService(timelineCollectorManager); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java new file mode 100644 index 0000000000..8b3d74976a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; + +public abstract class AbstractSystemMetricsPublisher extends CompositeService + implements SystemMetricsPublisher { + private MultiThreadedDispatcher dispatcher; + + protected Dispatcher getDispatcher() { + return dispatcher; + } + + public AbstractSystemMetricsPublisher(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + dispatcher = + new MultiThreadedDispatcher(getConfig().getInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); + dispatcher.setDrainEventsOnStop(); + addIfService(dispatcher); + super.serviceInit(conf); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static class MultiThreadedDispatcher extends CompositeService + implements Dispatcher { + + private List dispatchers = + new ArrayList(); + + public MultiThreadedDispatcher(int num) { + super(MultiThreadedDispatcher.class.getName()); + for (int i = 0; i < num; ++i) { + AsyncDispatcher dispatcher = createDispatcher(); + dispatchers.add(dispatcher); + addIfService(dispatcher); + } + } + + @Override + public EventHandler getEventHandler() { + return new CompositEventHandler(); + } + + @Override + public void register(Class eventType, + EventHandler handler) { + for (AsyncDispatcher dispatcher : dispatchers) { + dispatcher.register(eventType, handler); + } + } + + public void setDrainEventsOnStop() { + for (AsyncDispatcher dispatcher : dispatchers) { + dispatcher.setDrainEventsOnStop(); + } + } + + private class CompositEventHandler implements EventHandler { + + @Override + public void handle(Event event) { + // Use hashCode (of ApplicationId) to dispatch the event to the child + // dispatcher, such that all the writing events of one application will + // be handled by one thread, the scheduled order of the these events + // will be preserved + int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size(); + dispatchers.get(index).getEventHandler().handle(event); + } + } + + protected AsyncDispatcher createDispatcher() { + return new AsyncDispatcher(); + } + } + + /** + * EventType which is used while publishing the events + */ + protected static enum SystemMetricsEventType { + PUBLISH_ENTITY, PUBLISH_APPLICATION_FINISHED_ENTITY + } + + /** + * TimelinePublishEvent's hash code should be based on application's id this + * will ensure all the events related to a particular app goes to particular + * thread of MultiThreaded dispatcher. + */ + protected static abstract class TimelinePublishEvent + extends AbstractEvent { + + private ApplicationId appId; + + public TimelinePublishEvent(SystemMetricsEventType type, + ApplicationId appId) { + super(type); + this.appId = appId; + } + + public ApplicationId getApplicationId() { + return appId; + } + + @Override + public int hashCode() { + return appId.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof TimelinePublishEvent)) { + return false; + } + TimelinePublishEvent other = (TimelinePublishEvent) obj; + if (appId == null) { + if (other.appId != null) { + return false; + } + } else if (getType() == null) { + if (other.getType() != null) { + return false; + } + } else + if (!appId.equals(other.appId) || !getType().equals(other.getType())) { + return false; + } + return true; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java deleted file mode 100644 index 3d041f668b..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.TimelineServicePublisher; - -public abstract class AbstractTimelineServicePublisher extends CompositeService - implements TimelineServicePublisher, EventHandler { - - private static final Log LOG = LogFactory - .getLog(TimelineServiceV2Publisher.class); - - private Configuration conf; - - public AbstractTimelineServicePublisher(String name) { - super(name); - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - this.conf = conf; - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - } - - @Override - public void handle(SystemMetricsEvent event) { - switch (event.getType()) { - case APP_CREATED: - publishApplicationCreatedEvent((ApplicationCreatedEvent) event); - break; - case APP_FINISHED: - publishApplicationFinishedEvent((ApplicationFinishedEvent) event); - break; - case APP_UPDATED: - publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event); - break; - case APP_STATE_UPDATED: - publishApplicationStateUpdatedEvent( - (ApplicaitonStateUpdatedEvent)event); - break; - case APP_ACLS_UPDATED: - publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); - break; - case APP_ATTEMPT_REGISTERED: - publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); - break; - case APP_ATTEMPT_FINISHED: - publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); - break; - case CONTAINER_CREATED: - publishContainerCreatedEvent((ContainerCreatedEvent) event); - break; - case CONTAINER_FINISHED: - publishContainerFinishedEvent((ContainerFinishedEvent) event); - break; - default: - LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); - } - } - - abstract void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event); - - abstract void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event); - - abstract void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event); - - abstract void publishApplicationStateUpdatedEvent( - ApplicaitonStateUpdatedEvent event); - - abstract void publishApplicationACLsUpdatedEvent( - ApplicationACLsUpdatedEvent event); - - abstract void publishApplicationFinishedEvent(ApplicationFinishedEvent event); - - abstract void publishApplicationCreatedEvent(ApplicationCreatedEvent event); - - abstract void publishContainerCreatedEvent(ContainerCreatedEvent event); - - abstract void publishContainerFinishedEvent(ContainerFinishedEvent event); - - @Override - public Dispatcher getDispatcher() { - MultiThreadedDispatcher dispatcher = - new MultiThreadedDispatcher( - conf.getInt( - YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, - YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); - dispatcher.setDrainEventsOnStop(); - return dispatcher; - } - - @Override - public boolean publishRMContainerMetrics() { - return true; - } - - @Override - public EventHandler getEventHandler() { - return this; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static class MultiThreadedDispatcher extends CompositeService - implements Dispatcher { - - private List dispatchers = - new ArrayList(); - - public MultiThreadedDispatcher(int num) { - super(MultiThreadedDispatcher.class.getName()); - for (int i = 0; i < num; ++i) { - AsyncDispatcher dispatcher = createDispatcher(); - dispatchers.add(dispatcher); - addIfService(dispatcher); - } - } - - @Override - public EventHandler getEventHandler() { - return new CompositEventHandler(); - } - - @Override - public void register(Class eventType, EventHandler handler) { - for (AsyncDispatcher dispatcher : dispatchers) { - dispatcher.register(eventType, handler); - } - } - - public void setDrainEventsOnStop() { - for (AsyncDispatcher dispatcher : dispatchers) { - dispatcher.setDrainEventsOnStop(); - } - } - - private class CompositEventHandler implements EventHandler { - - @Override - public void handle(Event event) { - // Use hashCode (of ApplicationId) to dispatch the event to the child - // dispatcher, such that all the writing events of one application will - // be handled by one thread, the scheduled order of the these events - // will be preserved - int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size(); - dispatchers.get(index).getEventHandler().handle(event); - } - } - - protected AsyncDispatcher createDispatcher() { - return new AsyncDispatcher(); - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptFinishedEvent.java deleted file mode 100644 index fc1d10fee0..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptFinishedEvent.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; - -public class AppAttemptFinishedEvent extends - SystemMetricsEvent { - - private ApplicationAttemptId appAttemptId; - private String trackingUrl; - private String originalTrackingUrl; - private String diagnosticsInfo; - private FinalApplicationStatus appStatus; - private YarnApplicationAttemptState state; - private ContainerId masterContainerId; - - public AppAttemptFinishedEvent( - ApplicationAttemptId appAttemptId, - String trackingUrl, - String originalTrackingUrl, - String diagnosticsInfo, - FinalApplicationStatus appStatus, - YarnApplicationAttemptState state, - long finishedTime, - ContainerId masterContainerId) { - super(SystemMetricsEventType.APP_ATTEMPT_FINISHED, finishedTime); - this.appAttemptId = appAttemptId; - // This is the tracking URL after the application attempt is finished - this.trackingUrl = trackingUrl; - this.originalTrackingUrl = originalTrackingUrl; - this.diagnosticsInfo = diagnosticsInfo; - this.appStatus = appStatus; - this.state = state; - this.masterContainerId = masterContainerId; - } - - @Override - public int hashCode() { - return appAttemptId.getApplicationId().hashCode(); - } - - public ApplicationAttemptId getApplicationAttemptId() { - return appAttemptId; - } - - public String getTrackingUrl() { - return trackingUrl; - } - - public String getOriginalTrackingURL() { - return originalTrackingUrl; - } - - public String getDiagnosticsInfo() { - return diagnosticsInfo; - } - - public FinalApplicationStatus getFinalApplicationStatus() { - return appStatus; - } - - public YarnApplicationAttemptState getYarnApplicationAttemptState() { - return state; - } - - public ContainerId getMasterContainerId() { - return masterContainerId; - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptRegisteredEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptRegisteredEvent.java deleted file mode 100644 index 1d0f16de43..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptRegisteredEvent.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ContainerId; - -public class AppAttemptRegisteredEvent extends - SystemMetricsEvent { - - private ApplicationAttemptId appAttemptId; - private String host; - private int rpcPort; - private String trackingUrl; - private String originalTrackingUrl; - private ContainerId masterContainerId; - - public AppAttemptRegisteredEvent( - ApplicationAttemptId appAttemptId, - String host, - int rpcPort, - String trackingUrl, - String originalTrackingUrl, - ContainerId masterContainerId, - long registeredTime) { - super(SystemMetricsEventType.APP_ATTEMPT_REGISTERED, registeredTime); - this.appAttemptId = appAttemptId; - this.host = host; - this.rpcPort = rpcPort; - // This is the tracking URL after the application attempt is registered - this.trackingUrl = trackingUrl; - this.originalTrackingUrl = originalTrackingUrl; - this.masterContainerId = masterContainerId; - } - - @Override - public int hashCode() { - return appAttemptId.getApplicationId().hashCode(); - } - - public ApplicationAttemptId getApplicationAttemptId() { - return appAttemptId; - } - - public String getHost() { - return host; - } - - public int getRpcPort() { - return rpcPort; - } - - public String getTrackingUrl() { - return trackingUrl; - } - - public String getOriginalTrackingURL() { - return originalTrackingUrl; - } - - public ContainerId getMasterContainerId() { - return masterContainerId; - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicaitonStateUpdatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicaitonStateUpdatedEvent.java deleted file mode 100644 index 599e8d6710..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicaitonStateUpdatedEvent.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; - -/** - * When the state of this application has been changed, RM would sent - * this event to inform Timeline Server for keeping the Application state - * consistent. - */ -public class ApplicaitonStateUpdatedEvent extends SystemMetricsEvent{ - private ApplicationId appId; - private YarnApplicationState appState; - - public ApplicaitonStateUpdatedEvent(ApplicationId appliocationId, - YarnApplicationState state, long updatedTime) { - super(SystemMetricsEventType.APP_STATE_UPDATED, updatedTime); - this.appId = appliocationId; - this.appState = state; - } - - public ApplicationId getApplicationId() { - return appId; - } - - public YarnApplicationState getAppState() { - return appState; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationACLsUpdatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationACLsUpdatedEvent.java deleted file mode 100644 index c8b314c221..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationACLsUpdatedEvent.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.hadoop.yarn.api.records.ApplicationId; - - -public class ApplicationACLsUpdatedEvent extends SystemMetricsEvent { - - private ApplicationId appId; - private String viewAppACLs; - - public ApplicationACLsUpdatedEvent(ApplicationId appId, - String viewAppACLs, - long updatedTime) { - super(SystemMetricsEventType.APP_ACLS_UPDATED, updatedTime); - this.appId = appId; - this.viewAppACLs = viewAppACLs; - } - - public ApplicationId getApplicationId() { - return appId; - } - - public String getViewAppACLs() { - return viewAppACLs; - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java deleted file mode 100644 index 968a8fd9c8..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import java.util.Set; - -import org.apache.hadoop.ipc.CallerContext; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Priority; - -public class ApplicationCreatedEvent extends - SystemMetricsEvent { - - private ApplicationId appId; - private String name; - private String type; - private String user; - private String queue; - private long submittedTime; - private Set appTags; - private boolean unmanagedApplication; - private Priority applicationPriority; - private String appNodeLabelsExpression; - private String amNodeLabelsExpression; - private final CallerContext callerContext; - - - public ApplicationCreatedEvent(ApplicationId appId, - String name, - String type, - String user, - String queue, - long submittedTime, - long createdTime, - Set appTags, - boolean unmanagedApplication, - Priority applicationPriority, - String appNodeLabelsExpression, - String amNodeLabelsExpression, - CallerContext callerContext) { - super(SystemMetricsEventType.APP_CREATED, createdTime); - this.appId = appId; - this.name = name; - this.type = type; - this.user = user; - this.queue = queue; - this.submittedTime = submittedTime; - this.appTags = appTags; - this.unmanagedApplication = unmanagedApplication; - this.applicationPriority = applicationPriority; - this.appNodeLabelsExpression = appNodeLabelsExpression; - this.amNodeLabelsExpression = amNodeLabelsExpression; - this.callerContext = callerContext; - } - - @Override - public int hashCode() { - return appId.hashCode(); - } - - public ApplicationId getApplicationId() { - return appId; - } - - public String getApplicationName() { - return name; - } - - public String getApplicationType() { - return type; - } - - public String getUser() { - return user; - } - - public String getQueue() { - return queue; - } - - public long getSubmittedTime() { - return submittedTime; - } - - public Set getAppTags() { - return appTags; - } - - public boolean isUnmanagedApp() { - return unmanagedApplication; - } - - public Priority getApplicationPriority() { - return applicationPriority; - } - - public String getAppNodeLabelsExpression() { - return appNodeLabelsExpression; - } - - public String getAmNodeLabelsExpression() { - return amNodeLabelsExpression; - } - - public CallerContext getCallerContext() { - return callerContext; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java deleted file mode 100644 index d9241b23c7..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; - -public class ApplicationFinishedEvent extends - SystemMetricsEvent { - - private ApplicationId appId;; - private String diagnosticsInfo; - private FinalApplicationStatus appStatus; - private YarnApplicationState state; - private ApplicationAttemptId latestAppAttemptId; - private RMAppMetrics appMetrics; - private RMAppImpl app; - - public ApplicationFinishedEvent( - ApplicationId appId, - String diagnosticsInfo, - FinalApplicationStatus appStatus, - YarnApplicationState state, - ApplicationAttemptId latestAppAttemptId, - long finishedTime, - RMAppMetrics appMetrics, - RMAppImpl app) { - super(SystemMetricsEventType.APP_FINISHED, finishedTime); - this.appId = appId; - this.diagnosticsInfo = diagnosticsInfo; - this.appStatus = appStatus; - this.latestAppAttemptId = latestAppAttemptId; - this.state = state; - this.appMetrics = appMetrics; - this.app = app; - } - - @Override - public int hashCode() { - return appId.hashCode(); - } - - public RMAppImpl getApp() { - return app; - } - - public ApplicationId getApplicationId() { - return appId; - } - - public String getDiagnosticsInfo() { - return diagnosticsInfo; - } - - public FinalApplicationStatus getFinalApplicationStatus() { - return appStatus; - } - - public YarnApplicationState getYarnApplicationState() { - return state; - } - - public ApplicationAttemptId getLatestApplicationAttemptId() { - return latestAppAttemptId; - } - - public RMAppMetrics getAppMetrics() { - return appMetrics; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java deleted file mode 100644 index 9e5e1fd985..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Priority; - -public class ApplicationUpdatedEvent extends SystemMetricsEvent { - - private ApplicationId appId; - private String queue; - private Priority applicationPriority; - - public ApplicationUpdatedEvent(ApplicationId appId, String queue, - long updatedTime, Priority applicationPriority) { - super(SystemMetricsEventType.APP_UPDATED, updatedTime); - this.appId = appId; - this.queue = queue; - this.applicationPriority = applicationPriority; - } - - @Override - public int hashCode() { - return appId.hashCode(); - } - - public ApplicationId getApplicationId() { - return appId; - } - - public String getQueue() { - return queue; - } - - public Priority getApplicationPriority() { - return applicationPriority; - } -} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java deleted file mode 100644 index 05b6781113..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; - -public class ContainerCreatedEvent extends SystemMetricsEvent { - - private ContainerId containerId; - private Resource allocatedResource; - private NodeId allocatedNode; - private Priority allocatedPriority; - private String nodeHttpAddress; - - public ContainerCreatedEvent( - ContainerId containerId, - Resource allocatedResource, - NodeId allocatedNode, - Priority allocatedPriority, - long createdTime, - String nodeHttpAddress) { - super(SystemMetricsEventType.CONTAINER_CREATED, createdTime); - this.containerId = containerId; - this.allocatedResource = allocatedResource; - this.allocatedNode = allocatedNode; - this.allocatedPriority = allocatedPriority; - this.nodeHttpAddress = nodeHttpAddress; - } - - @Override - public int hashCode() { - return containerId.getApplicationAttemptId().getApplicationId().hashCode(); - } - - public ContainerId getContainerId() { - return containerId; - } - - public Resource getAllocatedResource() { - return allocatedResource; - } - - public NodeId getAllocatedNode() { - return allocatedNode; - } - - public Priority getAllocatedPriority() { - return allocatedPriority; - } - - public String getNodeHttpAddress() { - return nodeHttpAddress; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java deleted file mode 100644 index ca4d3117aa..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.NodeId; - -public class ContainerFinishedEvent extends SystemMetricsEvent { - - private ContainerId containerId; - private String diagnosticsInfo; - private int containerExitStatus; - private ContainerState state; - private NodeId allocatedNode; - - public ContainerFinishedEvent( - ContainerId containerId, - String diagnosticsInfo, - int containerExitStatus, - ContainerState state, - long finishedTime, - NodeId allocatedNode) { - super(SystemMetricsEventType.CONTAINER_FINISHED, finishedTime); - this.containerId = containerId; - this.diagnosticsInfo = diagnosticsInfo; - this.containerExitStatus = containerExitStatus; - this.allocatedNode = allocatedNode; - this.state = state; - } - - @Override - public int hashCode() { - return containerId.getApplicationAttemptId().getApplicationId().hashCode(); - } - - public ContainerId getContainerId() { - return containerId; - } - - public String getDiagnosticsInfo() { - return diagnosticsInfo; - } - - public int getContainerExitStatus() { - return containerExitStatus; - } - - public ContainerState getContainerState() { - return state; - } - - public NodeId getAllocatedNode() { - return allocatedNode; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java new file mode 100644 index 0000000000..845555d40d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +/** + * This class does nothing when any of the methods are invoked on + * SystemMetricsPublisher + */ +public class NoOpSystemMetricPublisher implements SystemMetricsPublisher{ + + @Override + public void appCreated(RMApp app, long createdTime) { + } + + @Override + public void appFinished(RMApp app, RMAppState state, long finishedTime) { + } + + @Override + public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { + } + + @Override + public void appAttemptRegistered(RMAppAttempt appAttempt, + long registeredTime) { + } + + @Override + public void appAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { + } + + @Override + public void containerCreated(RMContainer container, long createdTime) { + } + + @Override + public void containerFinished(RMContainer container, long finishedTime) { + } + + @Override + public void appUpdated(RMApp app, long currentTimeMillis) { + } + + @Override + public void appStateUpdated(RMApp app, YarnApplicationState appState, + long updatedTime) { + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java deleted file mode 100644 index 18473968f4..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.hadoop.yarn.event.AbstractEvent; - -public class SystemMetricsEvent extends AbstractEvent { - - public SystemMetricsEvent(SystemMetricsEventType type) { - super(type); - } - - public SystemMetricsEvent(SystemMetricsEventType type, long timestamp) { - super(type, timestamp); - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java deleted file mode 100644 index fcda4b4e47..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.metrics; - - -public enum SystemMetricsEventType { - // app events - APP_CREATED, - APP_FINISHED, - APP_ACLS_UPDATED, - APP_UPDATED, - APP_STATE_UPDATED, - - // app attempt events - APP_ATTEMPT_REGISTERED, - APP_ATTEMPT_FINISHED, - - // container events - CONTAINER_CREATED, - CONTAINER_FINISHED -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java index 15960f7036..5bceae5654 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java @@ -18,269 +18,32 @@ package org.apache.hadoop.yarn.server.resourcemanager.metrics; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.client.api.TimelineClient; -import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import com.google.common.annotations.VisibleForTesting; +public interface SystemMetricsPublisher { -import com.google.common.annotations.VisibleForTesting; + void appCreated(RMApp app, long createdTime); -/** - * The class that helps RM publish metrics to the timeline server. RM will - * always invoke the methods of this class regardless the service is enabled or - * not. If it is disabled, publishing requests will be ignored silently. - */ -@Private -@Unstable -public class SystemMetricsPublisher extends CompositeService { + void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime); - private static final Log LOG = LogFactory - .getLog(SystemMetricsPublisher.class); + void appUpdated(RMApp app, long updatedTime); - private Dispatcher dispatcher; - private boolean publishSystemMetrics; - private boolean publishContainerMetrics; - protected RMContext rmContext; + void appStateUpdated(RMApp app, YarnApplicationState appState, + long updatedTime); - public SystemMetricsPublisher(RMContext rmContext) { - super(SystemMetricsPublisher.class.getName()); - this.rmContext = rmContext; - } + void appFinished(RMApp app, RMAppState state, long finishedTime); - @Override - protected void serviceInit(Configuration conf) throws Exception { - publishSystemMetrics = - conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); - if (publishSystemMetrics) { - TimelineServicePublisher timelineServicePublisher = - getTimelineServicePublisher(conf); - if (timelineServicePublisher != null) { - addService(timelineServicePublisher); - // init required to be called so that other methods of - // TimelineServicePublisher can be utilized - timelineServicePublisher.init(conf); - dispatcher = createDispatcher(timelineServicePublisher); - publishContainerMetrics = - timelineServicePublisher.publishRMContainerMetrics(); - dispatcher.register(SystemMetricsEventType.class, - timelineServicePublisher.getEventHandler()); - addIfService(dispatcher); - } else { - LOG.info("TimelineServicePublisher is not configured"); - publishSystemMetrics = false; - } - LOG.info("YARN system metrics publishing service is enabled"); - } else { - LOG.info("YARN system metrics publishing service is not enabled"); - } - super.serviceInit(conf); - } + void appAttemptRegistered(RMAppAttempt appAttempt, long registeredTime); - @VisibleForTesting - Dispatcher createDispatcher(TimelineServicePublisher timelineServicePublisher) { - return timelineServicePublisher.getDispatcher(); - } + void appAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState appAttemtpState, RMApp app, long finishedTime); - TimelineServicePublisher getTimelineServicePublisher(Configuration conf) { - if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, - YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) { - return new TimelineServiceV1Publisher(); - } else if (conf.getBoolean( - YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, - YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) { - return new TimelineServiceV2Publisher(rmContext); - } - return null; - } + void containerCreated(RMContainer container, long createdTime); - @SuppressWarnings("unchecked") - public void appCreated(RMApp app, long createdTime) { - if (publishSystemMetrics) { - ApplicationSubmissionContext appSubmissionContext = - app.getApplicationSubmissionContext(); - dispatcher.getEventHandler().handle( - new ApplicationCreatedEvent( - app.getApplicationId(), - app.getName(), - app.getApplicationType(), - app.getUser(), - app.getQueue(), - app.getSubmitTime(), - createdTime, app.getApplicationTags(), - appSubmissionContext.getUnmanagedAM(), - appSubmissionContext.getPriority(), - app.getAppNodeLabelExpression(), - app.getAmNodeLabelExpression(), - app.getCallerContext())); - } - } - - @SuppressWarnings("unchecked") - public void appUpdated(RMApp app, long updatedTime) { - if (publishSystemMetrics) { - dispatcher.getEventHandler() - .handle(new ApplicationUpdatedEvent(app.getApplicationId(), - app.getQueue(), updatedTime, - app.getApplicationSubmissionContext().getPriority())); - } - } - - @SuppressWarnings("unchecked") - public void appFinished(RMApp app, RMAppState state, long finishedTime) { - if (publishSystemMetrics) { - dispatcher.getEventHandler().handle( - new ApplicationFinishedEvent( - app.getApplicationId(), - app.getDiagnostics().toString(), - app.getFinalApplicationStatus(), - RMServerUtils.createApplicationState(state), - app.getCurrentAppAttempt() == null ? - null : app.getCurrentAppAttempt().getAppAttemptId(), - finishedTime, - app.getRMAppMetrics(), - (RMAppImpl)app)); - } - } - - @SuppressWarnings("unchecked") - public void appACLsUpdated(RMApp app, String appViewACLs, - long updatedTime) { - if (publishSystemMetrics) { - dispatcher.getEventHandler().handle( - new ApplicationACLsUpdatedEvent( - app.getApplicationId(), - appViewACLs == null ? "" : appViewACLs, - updatedTime)); - } - } - - @SuppressWarnings("unchecked") - public void appStateUpdated(RMApp app, YarnApplicationState appState, - long updatedTime) { - if (publishSystemMetrics) { - dispatcher.getEventHandler().handle( - new ApplicaitonStateUpdatedEvent( - app.getApplicationId(), - appState, - updatedTime)); - } - } - - @SuppressWarnings("unchecked") - public void appAttemptRegistered(RMAppAttempt appAttempt, - long registeredTime) { - if (publishSystemMetrics) { - ContainerId container = (appAttempt.getMasterContainer() == null) ? null - : appAttempt.getMasterContainer().getId(); - dispatcher.getEventHandler().handle( - new AppAttemptRegisteredEvent( - appAttempt.getAppAttemptId(), - appAttempt.getHost(), - appAttempt.getRpcPort(), - appAttempt.getTrackingUrl(), - appAttempt.getOriginalTrackingUrl(), - container, - registeredTime)); - } - } - - @SuppressWarnings("unchecked") - public void appAttemptFinished(RMAppAttempt appAttempt, - RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { - if (publishSystemMetrics) { - ContainerId container = (appAttempt.getMasterContainer() == null) ? null - : appAttempt.getMasterContainer().getId(); - dispatcher.getEventHandler().handle( - new AppAttemptFinishedEvent( - appAttempt.getAppAttemptId(), - appAttempt.getTrackingUrl(), - appAttempt.getOriginalTrackingUrl(), - appAttempt.getDiagnostics(), - // app will get the final status from app attempt, or create one - // based on app state if it doesn't exist - app.getFinalApplicationStatus(), - RMServerUtils.createApplicationAttemptState(appAttemtpState), - finishedTime, - container)); - } - } - - @SuppressWarnings("unchecked") - public void containerCreated(RMContainer container, long createdTime) { - if (publishContainerMetrics) { - dispatcher.getEventHandler().handle( - new ContainerCreatedEvent( - container.getContainerId(), - container.getAllocatedResource(), - container.getAllocatedNode(), - container.getAllocatedPriority(), - createdTime, container.getNodeHttpAddress())); - } - } - - @SuppressWarnings("unchecked") - public void containerFinished(RMContainer container, long finishedTime) { - if (publishContainerMetrics) { - dispatcher.getEventHandler().handle( - new ContainerFinishedEvent( - container.getContainerId(), - container.getDiagnosticsInfo(), - container.getContainerExitStatus(), - container.getContainerState(), - finishedTime, container.getAllocatedNode())); - } - } - - @VisibleForTesting - boolean isPublishContainerMetrics() { - return publishContainerMetrics; - } - - @VisibleForTesting - Dispatcher getDispatcher() { - return dispatcher; - } - - interface TimelineServicePublisher extends Service { - /** - * @return the Dispatcher which needs to be used to dispatch events - */ - Dispatcher getDispatcher(); - - /** - * @return true if RMContainerMetricsNeeds to be sent - */ - boolean publishRMContainerMetrics(); - - /** - * @return EventHandler which needs to be registered to the dispatcher to - * handle the SystemMetricsEvent - */ - EventHandler getEventHandler(); - } + void containerFinished(RMContainer container, long finishedTime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java index 6cada5561f..c890577f04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java @@ -26,21 +26,28 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; -public class TimelineServiceV1Publisher extends - AbstractTimelineServicePublisher { +public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher { - private static final Log LOG = LogFactory - .getLog(TimelineServiceV1Publisher.class); + private static final Log LOG = + LogFactory.getLog(TimelineServiceV1Publisher.class); public TimelineServiceV1Publisher() { super("TimelineserviceV1Publisher"); @@ -49,76 +56,69 @@ public TimelineServiceV1Publisher() { private TimelineClient client; @Override - public void serviceInit(Configuration conf) throws Exception { + protected void serviceInit(Configuration conf) throws Exception { client = TimelineClient.createTimelineClient(); addIfService(client); super.serviceInit(conf); + getDispatcher().register(SystemMetricsEventType.class, + new TimelineV1EventHandler()); } + @SuppressWarnings("unchecked") @Override - void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); + public void appCreated(RMApp app, long createdTime) { + TimelineEntity entity = createApplicationEntity(app.getApplicationId()); Map entityInfo = new HashMap(); - entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, - event.getApplicationName()); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, app.getName()); entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, - event.getApplicationType()); - entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, - event.getUser()); + app.getApplicationType()); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser()); entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, - event.getQueue()); + app.getQueue()); entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, - event.getSubmittedTime()); + app.getSubmitTime()); entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, - event.getAppTags()); + app.getApplicationTags()); entityInfo.put( ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, - event.isUnmanagedApp()); + app.getApplicationSubmissionContext().getUnmanagedAM()); entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, - event.getApplicationPriority().getPriority()); - entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, - event.getAppNodeLabelsExpression()); + app.getApplicationSubmissionContext().getPriority().getPriority()); entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, - event.getAmNodeLabelsExpression()); - if (event.getCallerContext() != null) { - if (event.getCallerContext().getContext() != null) { - entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT, - event.getCallerContext().getContext()); - } - if (event.getCallerContext().getSignature() != null) { - entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE, - event.getCallerContext().getSignature()); - } - } + app.getAmNodeLabelExpression()); + entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, + app.getAppNodeLabelExpression()); entity.setOtherInfo(entityInfo); TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - ApplicationMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(createdTime); + entity.addEvent(tEvent); - putEntity(entity); + getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); } @Override - void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { - TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + public void appFinished(RMApp app, RMAppState state, long finishedTime) { + TimelineEntity entity = createApplicationEntity(app.getApplicationId()); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(finishedTime); Map eventInfo = new HashMap(); eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event - .getFinalApplicationStatus().toString()); - eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event - .getYarnApplicationState().toString()); - if (event.getLatestApplicationAttemptId() != null) { + app.getDiagnostics().toString()); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, + app.getFinalApplicationStatus().toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + RMServerUtils.createApplicationState(state).toString()); + String latestApplicationAttemptId = app.getCurrentAppAttempt() == null + ? null : app.getCurrentAppAttempt().getAppAttemptId().toString(); + if (latestApplicationAttemptId != null) { eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, - event.getLatestApplicationAttemptId().toString()); + latestApplicationAttemptId); } - RMAppMetrics appMetrics = event.getAppMetrics(); + RMAppMetrics appMetrics = app.getRMAppMetrics(); entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS, appMetrics.getVcoreSeconds()); entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS, @@ -126,54 +126,170 @@ void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { tEvent.setEventInfo(eventInfo); entity.addEvent(tEvent); - putEntity(entity); + // sync sending of finish event to avoid possibility of saving application + // finished state in RMStateStore save without publishing in ATS + putEntity(entity);// sync event so that ATS update is done without fail } + @SuppressWarnings("unchecked") @Override - void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) { - TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + public void appUpdated(RMApp app, long updatedTime) { + TimelineEntity entity = createApplicationEntity(app.getApplicationId()); Map eventInfo = new HashMap(); eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, - event.getQueue()); - eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event - .getApplicationPriority().getPriority()); + app.getQueue()); + eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + app.getApplicationSubmissionContext().getPriority().getPriority()); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(updatedTime); tEvent.setEventInfo(eventInfo); entity.addEvent(tEvent); - putEntity(entity); + getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); } + @SuppressWarnings("unchecked") @Override - void publishApplicationStateUpdatedEvent( - ApplicaitonStateUpdatedEvent event) { - TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + public void appStateUpdated(RMApp app, YarnApplicationState appState, + long updatedTime) { + TimelineEntity entity = createApplicationEntity(app.getApplicationId()); Map eventInfo = new HashMap(); eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, - event.getAppState()); + appState); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(updatedTime); tEvent.setEventInfo(eventInfo); entity.addEvent(tEvent); - putEntity(entity); + getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); } - @Override - void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) { - TimelineEntity entity = createApplicationEntity(event.getApplicationId()); - + public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { + TimelineEntity entity = createApplicationEntity(app.getApplicationId()); TimelineEvent tEvent = new TimelineEvent(); Map entityInfo = new HashMap(); entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, - event.getViewAppACLs()); + (appViewACLs == null) ? "" : appViewACLs); entity.setOtherInfo(entityInfo); tEvent.setEventType(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(updatedTime); entity.addEvent(tEvent); - putEntity(entity); + getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void appAttemptRegistered(RMAppAttempt appAttempt, + long registeredTime) { + TimelineEntity entity = + createAppAttemptEntity(appAttempt.getAppAttemptId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(registeredTime); + Map eventInfo = new HashMap(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + appAttempt.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + appAttempt.getOriginalTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, + appAttempt.getHost()); + eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, + appAttempt.getRpcPort()); + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + appAttempt.getMasterContainer().getId().toString()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle( + new TimelineV1PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY, + entity, appAttempt.getAppAttemptId().getApplicationId())); + + } + + @SuppressWarnings("unchecked") + @Override + public void appAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { + TimelineEntity entity = + createAppAttemptEntity(appAttempt.getAppAttemptId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(finishedTime); + Map eventInfo = new HashMap(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + appAttempt.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + appAttempt.getOriginalTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + appAttempt.getDiagnostics()); + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, + app.getFinalApplicationStatus().toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils + .createApplicationAttemptState(appAttemtpState).toString()); + tEvent.setEventInfo(eventInfo); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle( + new TimelineV1PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY, + entity, appAttempt.getAppAttemptId().getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void containerCreated(RMContainer container, long createdTime) { + TimelineEntity entity = createContainerEntity(container.getContainerId()); + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + container.getAllocatedResource().getMemorySize()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, + container.getAllocatedResource().getVirtualCores()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + container.getAllocatedNode().getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + container.getAllocatedNode().getPort()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + container.getAllocatedPriority().getPriority()); + entityInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + container.getNodeHttpAddress()); + entity.setOtherInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(createdTime); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, container + .getContainerId().getApplicationAttemptId().getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void containerFinished(RMContainer container, long finishedTime) { + TimelineEntity entity = createContainerEntity(container.getContainerId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(finishedTime); + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + container.getDiagnosticsInfo()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + container.getContainerExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, + container.getContainerState().toString()); + tEvent.setEventInfo(eventInfo); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, container + .getContainerId().getApplicationAttemptId().getApplicationId())); } private static TimelineEntity createApplicationEntity( @@ -184,63 +300,6 @@ private static TimelineEntity createApplicationEntity( return entity; } - @Override - void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { - TimelineEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put( - AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); - eventInfo.put( - AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); - eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, - event.getHost()); - eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, - event.getRpcPort()); - if (event.getMasterContainerId() != null) { - eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, - event.getMasterContainerId().toString()); - } - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - @Override - void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { - TimelineEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); - - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); - eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); - eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event - .getFinalApplicationStatus().toString()); - eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event - .getYarnApplicationAttemptState().toString()); - if (event.getMasterContainerId() != null) { - eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, - event.getMasterContainerId().toString()); - } - tEvent.setEventInfo(eventInfo); - - entity.addEvent(tEvent); - putEntity(entity); - } - private static TimelineEntity createAppAttemptEntity( ApplicationAttemptId appAttemptId) { TimelineEntity entity = new TimelineEntity(); @@ -251,59 +310,6 @@ private static TimelineEntity createAppAttemptEntity( return entity; } - @Override - void publishContainerCreatedEvent(ContainerCreatedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - Map entityInfo = new HashMap(); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, - event.getAllocatedResource().getMemorySize()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event - .getAllocatedResource().getVirtualCores()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event - .getAllocatedNode().getHost()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event - .getAllocatedNode().getPort()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, - event.getAllocatedPriority().getPriority()); - entityInfo.put( - ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, - event.getNodeHttpAddress()); - entity.setOtherInfo(entityInfo); - - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - - entity.addEvent(tEvent); - putEntity(entity); - } - - @Override - void publishContainerFinishedEvent(ContainerFinishedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, - event.getContainerExitStatus()); - eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event - .getContainerState().toString()); - Map entityInfo = new HashMap(); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, - event.getAllocatedNode().getHost()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, - event.getAllocatedNode().getPort()); - entity.setOtherInfo(entityInfo); - tEvent.setEventInfo(eventInfo); - - entity.addEvent(tEvent); - putEntity(entity); - } - private static TimelineEntity createContainerEntity(ContainerId containerId) { TimelineEntity entity = new TimelineEntity(); entity.setEntityType(ContainerMetricsConstants.ENTITY_TYPE); @@ -326,4 +332,26 @@ private void putEntity(TimelineEntity entity) { + entity.getEntityId() + "]", e); } } + + private class TimelineV1PublishEvent extends TimelinePublishEvent { + private TimelineEntity entity; + + public TimelineV1PublishEvent(SystemMetricsEventType type, + TimelineEntity entity, ApplicationId appId) { + super(type, appId); + this.entity = entity; + } + + public TimelineEntity getEntity() { + return entity; + } + } + + private class TimelineV1EventHandler + implements EventHandler { + @Override + public void handle(TimelineV1PublishEvent event) { + putEntity(event.getEntity()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java index 891673886d..e1e83fe4c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -25,10 +25,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; @@ -39,93 +39,100 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for posting application, appattempt & Container * lifecycle related events to timeline service V2 */ @Private @Unstable -public class TimelineServiceV2Publisher extends - AbstractTimelineServicePublisher { - private static final Log LOG = LogFactory - .getLog(TimelineServiceV2Publisher.class); +public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher { + private static final Log LOG = + LogFactory.getLog(TimelineServiceV2Publisher.class); protected RMTimelineCollectorManager rmTimelineCollectorManager; + private boolean publishContainerMetrics; public TimelineServiceV2Publisher(RMContext rmContext) { super("TimelineserviceV2Publisher"); rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager(); } - private boolean publishContainerMetrics; - @Override - protected void serviceInit(Configuration conf) throws Exception { - publishContainerMetrics = - conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED, - YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED); - super.serviceInit(conf); + protected void serviceStart() throws Exception { + super.serviceStart(); + publishContainerMetrics = getConfig().getBoolean( + YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED, + YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED); + getDispatcher().register(SystemMetricsEventType.class, + new TimelineV2EventHandler()); } + @VisibleForTesting + boolean isPublishContainerMetrics() { + return publishContainerMetrics; + } + + @SuppressWarnings("unchecked") @Override - void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); + public void appCreated(RMApp app, long createdTime) { + ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + entity.setQueue(app.getQueue()); + entity.setCreatedTime(createdTime); + Map entityInfo = new HashMap(); - entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, - event.getApplicationName()); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, app.getName()); entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, - event.getApplicationType()); - entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, - event.getUser()); - entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, - event.getQueue()); + app.getApplicationType()); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser()); entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, - event.getSubmittedTime()); + app.getSubmitTime()); entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, - event.getAppTags()); + app.getApplicationTags()); entityInfo.put( ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, - event.isUnmanagedApp()); + app.getApplicationSubmissionContext().getUnmanagedAM()); entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, - event.getApplicationPriority().getPriority()); - entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, - event.getAppNodeLabelsExpression()); - entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, - event.getAmNodeLabelsExpression()); - if (event.getCallerContext() != null) { - if (event.getCallerContext().getContext() != null) { - entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT, - event.getCallerContext().getContext()); - } - if (event.getCallerContext().getSignature() != null) { - entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE, - event.getCallerContext().getSignature()); - } - } + app.getApplicationSubmissionContext().getPriority().getPriority()); + entity.getConfigs().put( + ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, + app.getAmNodeLabelExpression()); + entity.getConfigs().put( + ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, + app.getAppNodeLabelExpression()); entity.setInfo(entityInfo); TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(createdTime); entity.addEvent(tEvent); - putEntity(entity, event.getApplicationId()); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); } + @SuppressWarnings("unchecked") @Override - void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { - ApplicationEntity entity = - createApplicationEntity(event.getApplicationId()); - RMAppMetrics appMetrics = event.getAppMetrics(); + public void appFinished(RMApp app, RMAppState state, long finishedTime) { + ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + RMAppMetrics appMetrics = app.getRMAppMetrics(); entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS, appMetrics.getVcoreSeconds()); entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS, @@ -133,70 +140,76 @@ void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(finishedTime); Map eventInfo = new HashMap(); eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event - .getFinalApplicationStatus().toString()); - eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event - .getYarnApplicationState().toString()); - if (event.getLatestApplicationAttemptId() != null) { + app.getDiagnostics().toString()); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, + app.getFinalApplicationStatus().toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + RMServerUtils.createApplicationState(state).toString()); + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() == null + ? null : app.getCurrentAppAttempt().getAppAttemptId(); + if (appAttemptId != null) { eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, - event.getLatestApplicationAttemptId().toString()); + appAttemptId.toString()); } tEvent.setInfo(eventInfo); entity.addEvent(tEvent); - putEntity(entity, event.getApplicationId()); - //cleaning up the collector cached - event.getApp().stopTimelineCollector(); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); } + @SuppressWarnings("unchecked") @Override - void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) { + public void appStateUpdated(RMApp app, YarnApplicationState appState, + long updatedTime) { ApplicationEntity entity = - createApplicationEntity(event.getApplicationId()); - Map eventInfo = new HashMap(); - eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, - event.getQueue()); - eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event - .getApplicationPriority().getPriority()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - tEvent.setInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity, event.getApplicationId()); - } - - @Override - void publishApplicationStateUpdatedEvent( - ApplicaitonStateUpdatedEvent event) { - ApplicationEntity entity = - createApplicationEntity(event.getApplicationId()); + createApplicationEntity(app.getApplicationId()); Map eventInfo = new HashMap(); eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, - event.getAppState()); + appState); TimelineEvent tEvent = new TimelineEvent(); - tEvent.setId(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setId(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(updatedTime); tEvent.setInfo(eventInfo); entity.addEvent(tEvent); - putEntity(entity, event.getApplicationId()); + + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); } + @SuppressWarnings("unchecked") @Override - void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) { - ApplicationEntity entity = - createApplicationEntity(event.getApplicationId()); + public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { + ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); Map entityInfo = new HashMap(); entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, - event.getViewAppACLs()); + appViewACLs); entity.setInfo(entityInfo); - putEntity(entity, event.getApplicationId()); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); + } + + @SuppressWarnings("unchecked") + @Override + public void appUpdated(RMApp app, long currentTimeMillis) { + ApplicationEntity entity = createApplicationEntity(app.getApplicationId()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + app.getQueue()); + eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + app.getApplicationSubmissionContext().getPriority().getPriority()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); + tEvent.setTimestamp(currentTimeMillis); + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId())); } private static ApplicationEntity createApplicationEntity( @@ -206,117 +219,134 @@ private static ApplicationEntity createApplicationEntity( return entity; } + @SuppressWarnings("unchecked") @Override - void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { + public void appAttemptRegistered(RMAppAttempt appAttempt, + long registeredTime) { TimelineEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); + createAppAttemptEntity(appAttempt.getAppAttemptId()); + entity.setCreatedTime(registeredTime); + TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(registeredTime); Map eventInfo = new HashMap(); - eventInfo.put( - AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); - eventInfo.put( - AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + appAttempt.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + appAttempt.getOriginalTrackingUrl()); eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, - event.getHost()); + appAttempt.getHost()); eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, - event.getRpcPort()); - if (event.getMasterContainerId() != null) { - eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, - event.getMasterContainerId().toString()); - } + appAttempt.getRpcPort()); + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + appAttempt.getMasterContainer().getId().toString()); tEvent.setInfo(eventInfo); entity.addEvent(tEvent); - putEntity(entity, event.getApplicationAttemptId().getApplicationId()); + getDispatcher().getEventHandler().handle( + new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY, + entity, appAttempt.getAppAttemptId().getApplicationId())); } + @SuppressWarnings("unchecked") @Override - void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { + public void appAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { + ApplicationAttemptEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); + createAppAttemptEntity(appAttempt.getAppAttemptId()); TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); + tEvent.setTimestamp(finishedTime); Map eventInfo = new HashMap(); eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); + appAttempt.getTrackingUrl()); eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); + appAttempt.getOriginalTrackingUrl()); eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event - .getFinalApplicationStatus().toString()); - eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event - .getYarnApplicationAttemptState().toString()); - if (event.getMasterContainerId() != null) { - eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, - event.getMasterContainerId().toString()); + appAttempt.getDiagnostics()); + // app will get the final status from app attempt, or create one + // based on app state if it doesn't exist + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, + app.getFinalApplicationStatus().toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils + .createApplicationAttemptState(appAttemtpState).toString()); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle( + new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY, + entity, appAttempt.getAppAttemptId().getApplicationId())); + } + + private static ApplicationAttemptEntity createAppAttemptEntity( + ApplicationAttemptId appAttemptId) { + ApplicationAttemptEntity entity = new ApplicationAttemptEntity(); + entity.setId(appAttemptId.toString()); + entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(), + appAttemptId.getApplicationId().toString())); + return entity; + } + + @SuppressWarnings("unchecked") + @Override + public void containerCreated(RMContainer container, long createdTime) { + if (publishContainerMetrics) { + TimelineEntity entity = createContainerEntity(container.getContainerId()); + entity.setCreatedTime(createdTime); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE); + tEvent.setTimestamp(createdTime); + // updated as event info instead of entity info, as entity info is updated + // by NM + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + container.getAllocatedResource().getMemorySize()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, + container.getAllocatedResource().getVirtualCores()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + container.getAllocatedNode().getHost()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + container.getAllocatedNode().getPort()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + container.getAllocatedPriority().getPriority()); + eventInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + container.getNodeHttpAddress()); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, container + .getContainerId().getApplicationAttemptId().getApplicationId())); } - tEvent.setInfo(eventInfo); - - entity.addEvent(tEvent); - putEntity(entity, event.getApplicationAttemptId().getApplicationId()); } + @SuppressWarnings("unchecked") @Override - void publishContainerCreatedEvent(ContainerCreatedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); + public void containerFinished(RMContainer container, long finishedTime) { + if (publishContainerMetrics) { + TimelineEntity entity = createContainerEntity(container.getContainerId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - // updated as event info instead of entity info, as entity info is updated - // by NM - Map eventInfo = new HashMap(); - eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event - .getAllocatedResource().getMemorySize()); - eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event - .getAllocatedResource().getVirtualCores()); - eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event - .getAllocatedNode().getHost()); - eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event - .getAllocatedNode().getPort()); - eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, - event.getAllocatedPriority().getPriority()); - eventInfo.put( - ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, - event.getNodeHttpAddress()); - tEvent.setInfo(eventInfo); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE); + tEvent.setTimestamp(finishedTime); + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + container.getDiagnosticsInfo()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + container.getContainerExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, + container.getContainerState().toString()); + tEvent.setInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity, event.getContainerId().getApplicationAttemptId() - .getApplicationId()); - } - - @Override - void publishContainerFinishedEvent(ContainerFinishedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, - event.getContainerExitStatus()); - eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event - .getContainerState().toString()); - Map entityInfo = new HashMap(); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, - event.getAllocatedNode().getHost()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, - event.getAllocatedNode().getPort()); - entity.setInfo(entityInfo); - tEvent.setInfo(eventInfo); - - entity.addEvent(tEvent); - putEntity(entity, event.getContainerId().getApplicationAttemptId() - .getApplicationId()); + entity.addEvent(tEvent); + getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent( + SystemMetricsEventType.PUBLISH_ENTITY, entity, container + .getContainerId().getApplicationAttemptId().getApplicationId())); + } } private static ContainerEntity createContainerEntity(ContainerId containerId) { @@ -344,17 +374,48 @@ private void putEntity(TimelineEntity entity, ApplicationId appId) { } } - private static ApplicationAttemptEntity createAppAttemptEntity( - ApplicationAttemptId appAttemptId) { - ApplicationAttemptEntity entity = new ApplicationAttemptEntity(); - entity.setId(appAttemptId.toString()); - entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(), - appAttemptId.getApplicationId().toString())); - return entity; + private class ApplicationFinishPublishEvent extends TimelineV2PublishEvent { + private RMAppImpl app; + + public ApplicationFinishPublishEvent(SystemMetricsEventType type, + TimelineEntity entity, RMAppImpl app) { + super(type, entity, app.getApplicationId()); + this.app = app; + } + + public RMAppImpl getRMAppImpl() { + return app; + } } - @Override - public boolean publishRMContainerMetrics() { - return publishContainerMetrics; + private class TimelineV2EventHandler + implements EventHandler { + @Override + public void handle(TimelineV2PublishEvent event) { + switch (event.getType()) { + case PUBLISH_APPLICATION_FINISHED_ENTITY: + putEntity(event.getEntity(), event.getApplicationId()); + ((ApplicationFinishPublishEvent) event).getRMAppImpl() + .stopTimelineCollector(); + break; + default: + putEntity(event.getEntity(), event.getApplicationId()); + break; + } + } + } + + private class TimelineV2PublishEvent extends TimelinePublishEvent { + private TimelineEntity entity; + + public TimelineV2PublishEvent(SystemMetricsEventType type, + TimelineEntity entity, ApplicationId appId) { + super(type, appId); + this.entity = entity; + } + + public TimelineEntity getEntity() { + return entity; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java index 31a93097b3..79a266f09c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java @@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -69,7 +68,7 @@ public class TestSystemMetricsPublisher { private static ApplicationHistoryServer timelineServer; - private static SystemMetricsPublisher metricsPublisher; + private static TimelineServiceV1Publisher metricsPublisher; private static TimelineStore store; @BeforeClass @@ -90,7 +89,7 @@ public static void setup() throws Exception { timelineServer.start(); store = timelineServer.getTimelineStore(); - metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class)); + metricsPublisher = new TimelineServiceV1Publisher(); metricsPublisher.init(conf); metricsPublisher.start(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index ac20335661..20a5b136f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher.MultiThreadedDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -75,7 +74,7 @@ public class TestSystemMetricsPublisherForV2 { TestSystemMetricsPublisherForV2.class.getName() + "-localDir") .getAbsoluteFile(); - private static SystemMetricsPublisher metricsPublisher; + private static TimelineServiceV2Publisher metricsPublisher; private static DrainDispatcher dispatcher = new DrainDispatcher(); private static final String DEFAULT_FLOW_VERSION = "1"; private static final long DEFAULT_FLOW_RUN = 1; @@ -103,10 +102,11 @@ public static void setup() throws Exception { rmTimelineCollectorManager.init(conf); rmTimelineCollectorManager.start(); - metricsPublisher = new SystemMetricsPublisher(rmContext) { + dispatcher.init(conf); + dispatcher.start(); + metricsPublisher = new TimelineServiceV2Publisher(rmContext) { @Override - Dispatcher createDispatcher( - TimelineServicePublisher timelineServicePublisher) { + protected Dispatcher getDispatcher() { return dispatcher; } }; @@ -150,8 +150,8 @@ private static Configuration getTimelineV2Conf() { @Test public void testSystemMetricPublisherInitialization() { @SuppressWarnings("resource") - SystemMetricsPublisher metricsPublisher = - new SystemMetricsPublisher(mock(RMContext.class)); + TimelineServiceV2Publisher metricsPublisher = + new TimelineServiceV2Publisher(mock(RMContext.class)); try { Configuration conf = getTimelineV2Conf(); conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED, @@ -163,20 +163,18 @@ public void testSystemMetricPublisherInitialization() { metricsPublisher.stop(); - metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class)); + metricsPublisher = new TimelineServiceV2Publisher(mock(RMContext.class)); conf = getTimelineV2Conf(); metricsPublisher.init(conf); + metricsPublisher.start(); assertTrue("Expected to publish container Metrics from RM", metricsPublisher.isPublishContainerMetrics()); - assertTrue( - "MultiThreadedDispatcher expected when container Metrics is not published", - metricsPublisher.getDispatcher() instanceof MultiThreadedDispatcher); } finally { metricsPublisher.stop(); } } - @Test(timeout = 1000000) + @Test(timeout = 10000) public void testPublishApplicationMetrics() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 1); RMApp app = createAppAndRegister(appId);