YARN-4129. Refactor the SystemMetricPublisher in RM to better support newer events (Naganarasimha G R via sjlee)

This commit is contained in:
Sangjin Lee 2015-10-22 17:56:32 -07:00
parent e3e857866d
commit 10ec5586fb
21 changed files with 737 additions and 1571 deletions

View File

@ -117,12 +117,12 @@
<!-- Object cast is based on the event type -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher" />
<Class name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher$ApplicationEventHandler" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher$ApplicationEventHandler" />
<Class name="org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher$TimelineV2EventHandler" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>

View File

@ -74,7 +74,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
@ -104,11 +107,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
@ -309,8 +312,9 @@ protected void serviceInit(Configuration conf) throws Exception {
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
addService(systemMetricsPublisher);
SystemMetricsPublisher systemMetricsPublisher =
createSystemMetricsPublisher();
addIfService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
super.serviceInit(this.conf);
@ -465,7 +469,24 @@ private RMTimelineCollectorManager createRMTimelineCollectorManager() {
}
protected SystemMetricsPublisher createSystemMetricsPublisher() {
return new SystemMetricsPublisher(rmContext);
boolean timelineServiceEnabled =
conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
SystemMetricsPublisher publisher = null;
if (timelineServiceEnabled) {
if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
LOG.info("TimelineService V1 is configured");
publisher = new TimelineServiceV1Publisher();
} else {
LOG.info("TimelineService V2 is configured");
publisher = new TimelineServiceV2Publisher(rmContext);
}
} else {
LOG.info("TimelineServicePublisher is not configured");
publisher = new NoOpSystemMetricPublisher();
}
return publisher;
}
// sanity check for configurations
@ -585,10 +606,6 @@ protected void serviceInit(Configuration configuration) throws Exception {
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
addService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
RMTimelineCollectorManager timelineCollectorManager =
createRMTimelineCollectorManager();
addService(timelineCollectorManager);

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
public abstract class AbstractSystemMetricsPublisher extends CompositeService
implements SystemMetricsPublisher {
private MultiThreadedDispatcher dispatcher;
protected Dispatcher getDispatcher() {
return dispatcher;
}
public AbstractSystemMetricsPublisher(String name) {
super(name);
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
dispatcher =
new MultiThreadedDispatcher(getConfig().getInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
dispatcher.setDrainEventsOnStop();
addIfService(dispatcher);
super.serviceInit(conf);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public static class MultiThreadedDispatcher extends CompositeService
implements Dispatcher {
private List<AsyncDispatcher> dispatchers =
new ArrayList<AsyncDispatcher>();
public MultiThreadedDispatcher(int num) {
super(MultiThreadedDispatcher.class.getName());
for (int i = 0; i < num; ++i) {
AsyncDispatcher dispatcher = createDispatcher();
dispatchers.add(dispatcher);
addIfService(dispatcher);
}
}
@Override
public EventHandler getEventHandler() {
return new CompositEventHandler();
}
@Override
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
for (AsyncDispatcher dispatcher : dispatchers) {
dispatcher.register(eventType, handler);
}
}
public void setDrainEventsOnStop() {
for (AsyncDispatcher dispatcher : dispatchers) {
dispatcher.setDrainEventsOnStop();
}
}
private class CompositEventHandler implements EventHandler<Event> {
@Override
public void handle(Event event) {
// Use hashCode (of ApplicationId) to dispatch the event to the child
// dispatcher, such that all the writing events of one application will
// be handled by one thread, the scheduled order of the these events
// will be preserved
int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
dispatchers.get(index).getEventHandler().handle(event);
}
}
protected AsyncDispatcher createDispatcher() {
return new AsyncDispatcher();
}
}
/**
* EventType which is used while publishing the events
*/
protected static enum SystemMetricsEventType {
PUBLISH_ENTITY, PUBLISH_APPLICATION_FINISHED_ENTITY
}
/**
* TimelinePublishEvent's hash code should be based on application's id this
* will ensure all the events related to a particular app goes to particular
* thread of MultiThreaded dispatcher.
*/
protected static abstract class TimelinePublishEvent
extends AbstractEvent<SystemMetricsEventType> {
private ApplicationId appId;
public TimelinePublishEvent(SystemMetricsEventType type,
ApplicationId appId) {
super(type);
this.appId = appId;
}
public ApplicationId getApplicationId() {
return appId;
}
@Override
public int hashCode() {
return appId.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof TimelinePublishEvent)) {
return false;
}
TimelinePublishEvent other = (TimelinePublishEvent) obj;
if (appId == null) {
if (other.appId != null) {
return false;
}
} else if (getType() == null) {
if (other.getType() != null) {
return false;
}
} else
if (!appId.equals(other.appId) || !getType().equals(other.getType())) {
return false;
}
return true;
}
}
}

View File

@ -1,191 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.TimelineServicePublisher;
public abstract class AbstractTimelineServicePublisher extends CompositeService
implements TimelineServicePublisher, EventHandler<SystemMetricsEvent> {
private static final Log LOG = LogFactory
.getLog(TimelineServiceV2Publisher.class);
private Configuration conf;
public AbstractTimelineServicePublisher(String name) {
super(name);
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
}
@Override
public void handle(SystemMetricsEvent event) {
switch (event.getType()) {
case APP_CREATED:
publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
break;
case APP_FINISHED:
publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
break;
case APP_UPDATED:
publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event);
break;
case APP_STATE_UPDATED:
publishApplicationStateUpdatedEvent(
(ApplicaitonStateUpdatedEvent)event);
break;
case APP_ACLS_UPDATED:
publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
break;
case APP_ATTEMPT_REGISTERED:
publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
break;
case APP_ATTEMPT_FINISHED:
publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
break;
case CONTAINER_CREATED:
publishContainerCreatedEvent((ContainerCreatedEvent) event);
break;
case CONTAINER_FINISHED:
publishContainerFinishedEvent((ContainerFinishedEvent) event);
break;
default:
LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
}
}
abstract void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event);
abstract void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event);
abstract void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event);
abstract void publishApplicationStateUpdatedEvent(
ApplicaitonStateUpdatedEvent event);
abstract void publishApplicationACLsUpdatedEvent(
ApplicationACLsUpdatedEvent event);
abstract void publishApplicationFinishedEvent(ApplicationFinishedEvent event);
abstract void publishApplicationCreatedEvent(ApplicationCreatedEvent event);
abstract void publishContainerCreatedEvent(ContainerCreatedEvent event);
abstract void publishContainerFinishedEvent(ContainerFinishedEvent event);
@Override
public Dispatcher getDispatcher() {
MultiThreadedDispatcher dispatcher =
new MultiThreadedDispatcher(
conf.getInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
dispatcher.setDrainEventsOnStop();
return dispatcher;
}
@Override
public boolean publishRMContainerMetrics() {
return true;
}
@Override
public EventHandler<SystemMetricsEvent> getEventHandler() {
return this;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public static class MultiThreadedDispatcher extends CompositeService
implements Dispatcher {
private List<AsyncDispatcher> dispatchers =
new ArrayList<AsyncDispatcher>();
public MultiThreadedDispatcher(int num) {
super(MultiThreadedDispatcher.class.getName());
for (int i = 0; i < num; ++i) {
AsyncDispatcher dispatcher = createDispatcher();
dispatchers.add(dispatcher);
addIfService(dispatcher);
}
}
@Override
public EventHandler getEventHandler() {
return new CompositEventHandler();
}
@Override
public void register(Class<? extends Enum> eventType, EventHandler handler) {
for (AsyncDispatcher dispatcher : dispatchers) {
dispatcher.register(eventType, handler);
}
}
public void setDrainEventsOnStop() {
for (AsyncDispatcher dispatcher : dispatchers) {
dispatcher.setDrainEventsOnStop();
}
}
private class CompositEventHandler implements EventHandler<Event> {
@Override
public void handle(Event event) {
// Use hashCode (of ApplicationId) to dispatch the event to the child
// dispatcher, such that all the writing events of one application will
// be handled by one thread, the scheduled order of the these events
// will be preserved
int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
dispatchers.get(index).getEventHandler().handle(event);
}
}
protected AsyncDispatcher createDispatcher() {
return new AsyncDispatcher();
}
}
}

View File

@ -1,90 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
public class AppAttemptFinishedEvent extends
SystemMetricsEvent {
private ApplicationAttemptId appAttemptId;
private String trackingUrl;
private String originalTrackingUrl;
private String diagnosticsInfo;
private FinalApplicationStatus appStatus;
private YarnApplicationAttemptState state;
private ContainerId masterContainerId;
public AppAttemptFinishedEvent(
ApplicationAttemptId appAttemptId,
String trackingUrl,
String originalTrackingUrl,
String diagnosticsInfo,
FinalApplicationStatus appStatus,
YarnApplicationAttemptState state,
long finishedTime,
ContainerId masterContainerId) {
super(SystemMetricsEventType.APP_ATTEMPT_FINISHED, finishedTime);
this.appAttemptId = appAttemptId;
// This is the tracking URL after the application attempt is finished
this.trackingUrl = trackingUrl;
this.originalTrackingUrl = originalTrackingUrl;
this.diagnosticsInfo = diagnosticsInfo;
this.appStatus = appStatus;
this.state = state;
this.masterContainerId = masterContainerId;
}
@Override
public int hashCode() {
return appAttemptId.getApplicationId().hashCode();
}
public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptId;
}
public String getTrackingUrl() {
return trackingUrl;
}
public String getOriginalTrackingURL() {
return originalTrackingUrl;
}
public String getDiagnosticsInfo() {
return diagnosticsInfo;
}
public FinalApplicationStatus getFinalApplicationStatus() {
return appStatus;
}
public YarnApplicationAttemptState getYarnApplicationAttemptState() {
return state;
}
public ContainerId getMasterContainerId() {
return masterContainerId;
}
}

View File

@ -1,81 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
public class AppAttemptRegisteredEvent extends
SystemMetricsEvent {
private ApplicationAttemptId appAttemptId;
private String host;
private int rpcPort;
private String trackingUrl;
private String originalTrackingUrl;
private ContainerId masterContainerId;
public AppAttemptRegisteredEvent(
ApplicationAttemptId appAttemptId,
String host,
int rpcPort,
String trackingUrl,
String originalTrackingUrl,
ContainerId masterContainerId,
long registeredTime) {
super(SystemMetricsEventType.APP_ATTEMPT_REGISTERED, registeredTime);
this.appAttemptId = appAttemptId;
this.host = host;
this.rpcPort = rpcPort;
// This is the tracking URL after the application attempt is registered
this.trackingUrl = trackingUrl;
this.originalTrackingUrl = originalTrackingUrl;
this.masterContainerId = masterContainerId;
}
@Override
public int hashCode() {
return appAttemptId.getApplicationId().hashCode();
}
public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptId;
}
public String getHost() {
return host;
}
public int getRpcPort() {
return rpcPort;
}
public String getTrackingUrl() {
return trackingUrl;
}
public String getOriginalTrackingURL() {
return originalTrackingUrl;
}
public ContainerId getMasterContainerId() {
return masterContainerId;
}
}

View File

@ -1,47 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
/**
* When the state of this application has been changed, RM would sent
* this event to inform Timeline Server for keeping the Application state
* consistent.
*/
public class ApplicaitonStateUpdatedEvent extends SystemMetricsEvent{
private ApplicationId appId;
private YarnApplicationState appState;
public ApplicaitonStateUpdatedEvent(ApplicationId appliocationId,
YarnApplicationState state, long updatedTime) {
super(SystemMetricsEventType.APP_STATE_UPDATED, updatedTime);
this.appId = appliocationId;
this.appState = state;
}
public ApplicationId getApplicationId() {
return appId;
}
public YarnApplicationState getAppState() {
return appState;
}
}

View File

@ -1,45 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class ApplicationACLsUpdatedEvent extends SystemMetricsEvent {
private ApplicationId appId;
private String viewAppACLs;
public ApplicationACLsUpdatedEvent(ApplicationId appId,
String viewAppACLs,
long updatedTime) {
super(SystemMetricsEventType.APP_ACLS_UPDATED, updatedTime);
this.appId = appId;
this.viewAppACLs = viewAppACLs;
}
public ApplicationId getApplicationId() {
return appId;
}
public String getViewAppACLs() {
return viewAppACLs;
}
}

View File

@ -1,124 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.Set;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
public class ApplicationCreatedEvent extends
SystemMetricsEvent {
private ApplicationId appId;
private String name;
private String type;
private String user;
private String queue;
private long submittedTime;
private Set<String> appTags;
private boolean unmanagedApplication;
private Priority applicationPriority;
private String appNodeLabelsExpression;
private String amNodeLabelsExpression;
private final CallerContext callerContext;
public ApplicationCreatedEvent(ApplicationId appId,
String name,
String type,
String user,
String queue,
long submittedTime,
long createdTime,
Set<String> appTags,
boolean unmanagedApplication,
Priority applicationPriority,
String appNodeLabelsExpression,
String amNodeLabelsExpression,
CallerContext callerContext) {
super(SystemMetricsEventType.APP_CREATED, createdTime);
this.appId = appId;
this.name = name;
this.type = type;
this.user = user;
this.queue = queue;
this.submittedTime = submittedTime;
this.appTags = appTags;
this.unmanagedApplication = unmanagedApplication;
this.applicationPriority = applicationPriority;
this.appNodeLabelsExpression = appNodeLabelsExpression;
this.amNodeLabelsExpression = amNodeLabelsExpression;
this.callerContext = callerContext;
}
@Override
public int hashCode() {
return appId.hashCode();
}
public ApplicationId getApplicationId() {
return appId;
}
public String getApplicationName() {
return name;
}
public String getApplicationType() {
return type;
}
public String getUser() {
return user;
}
public String getQueue() {
return queue;
}
public long getSubmittedTime() {
return submittedTime;
}
public Set<String> getAppTags() {
return appTags;
}
public boolean isUnmanagedApp() {
return unmanagedApplication;
}
public Priority getApplicationPriority() {
return applicationPriority;
}
public String getAppNodeLabelsExpression() {
return appNodeLabelsExpression;
}
public String getAmNodeLabelsExpression() {
return amNodeLabelsExpression;
}
public CallerContext getCallerContext() {
return callerContext;
}
}

View File

@ -1,91 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
public class ApplicationFinishedEvent extends
SystemMetricsEvent {
private ApplicationId appId;;
private String diagnosticsInfo;
private FinalApplicationStatus appStatus;
private YarnApplicationState state;
private ApplicationAttemptId latestAppAttemptId;
private RMAppMetrics appMetrics;
private RMAppImpl app;
public ApplicationFinishedEvent(
ApplicationId appId,
String diagnosticsInfo,
FinalApplicationStatus appStatus,
YarnApplicationState state,
ApplicationAttemptId latestAppAttemptId,
long finishedTime,
RMAppMetrics appMetrics,
RMAppImpl app) {
super(SystemMetricsEventType.APP_FINISHED, finishedTime);
this.appId = appId;
this.diagnosticsInfo = diagnosticsInfo;
this.appStatus = appStatus;
this.latestAppAttemptId = latestAppAttemptId;
this.state = state;
this.appMetrics = appMetrics;
this.app = app;
}
@Override
public int hashCode() {
return appId.hashCode();
}
public RMAppImpl getApp() {
return app;
}
public ApplicationId getApplicationId() {
return appId;
}
public String getDiagnosticsInfo() {
return diagnosticsInfo;
}
public FinalApplicationStatus getFinalApplicationStatus() {
return appStatus;
}
public YarnApplicationState getYarnApplicationState() {
return state;
}
public ApplicationAttemptId getLatestApplicationAttemptId() {
return latestAppAttemptId;
}
public RMAppMetrics getAppMetrics() {
return appMetrics;
}
}

View File

@ -1,54 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
public class ApplicationUpdatedEvent extends SystemMetricsEvent {
private ApplicationId appId;
private String queue;
private Priority applicationPriority;
public ApplicationUpdatedEvent(ApplicationId appId, String queue,
long updatedTime, Priority applicationPriority) {
super(SystemMetricsEventType.APP_UPDATED, updatedTime);
this.appId = appId;
this.queue = queue;
this.applicationPriority = applicationPriority;
}
@Override
public int hashCode() {
return appId.hashCode();
}
public ApplicationId getApplicationId() {
return appId;
}
public String getQueue() {
return queue;
}
public Priority getApplicationPriority() {
return applicationPriority;
}
}

View File

@ -1,73 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
public class ContainerCreatedEvent extends SystemMetricsEvent {
private ContainerId containerId;
private Resource allocatedResource;
private NodeId allocatedNode;
private Priority allocatedPriority;
private String nodeHttpAddress;
public ContainerCreatedEvent(
ContainerId containerId,
Resource allocatedResource,
NodeId allocatedNode,
Priority allocatedPriority,
long createdTime,
String nodeHttpAddress) {
super(SystemMetricsEventType.CONTAINER_CREATED, createdTime);
this.containerId = containerId;
this.allocatedResource = allocatedResource;
this.allocatedNode = allocatedNode;
this.allocatedPriority = allocatedPriority;
this.nodeHttpAddress = nodeHttpAddress;
}
@Override
public int hashCode() {
return containerId.getApplicationAttemptId().getApplicationId().hashCode();
}
public ContainerId getContainerId() {
return containerId;
}
public Resource getAllocatedResource() {
return allocatedResource;
}
public NodeId getAllocatedNode() {
return allocatedNode;
}
public Priority getAllocatedPriority() {
return allocatedPriority;
}
public String getNodeHttpAddress() {
return nodeHttpAddress;
}
}

View File

@ -1,72 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
public class ContainerFinishedEvent extends SystemMetricsEvent {
private ContainerId containerId;
private String diagnosticsInfo;
private int containerExitStatus;
private ContainerState state;
private NodeId allocatedNode;
public ContainerFinishedEvent(
ContainerId containerId,
String diagnosticsInfo,
int containerExitStatus,
ContainerState state,
long finishedTime,
NodeId allocatedNode) {
super(SystemMetricsEventType.CONTAINER_FINISHED, finishedTime);
this.containerId = containerId;
this.diagnosticsInfo = diagnosticsInfo;
this.containerExitStatus = containerExitStatus;
this.allocatedNode = allocatedNode;
this.state = state;
}
@Override
public int hashCode() {
return containerId.getApplicationAttemptId().getApplicationId().hashCode();
}
public ContainerId getContainerId() {
return containerId;
}
public String getDiagnosticsInfo() {
return diagnosticsInfo;
}
public int getContainerExitStatus() {
return containerExitStatus;
}
public ContainerState getContainerState() {
return state;
}
public NodeId getAllocatedNode() {
return allocatedNode;
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
/**
* This class does nothing when any of the methods are invoked on
* SystemMetricsPublisher
*/
public class NoOpSystemMetricPublisher implements SystemMetricsPublisher{
@Override
public void appCreated(RMApp app, long createdTime) {
}
@Override
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
}
@Override
public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
}
@Override
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
}
@Override
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
}
@Override
public void containerCreated(RMContainer container, long createdTime) {
}
@Override
public void containerFinished(RMContainer container, long finishedTime) {
}
@Override
public void appUpdated(RMApp app, long currentTimeMillis) {
}
@Override
public void appStateUpdated(RMApp app, YarnApplicationState appState,
long updatedTime) {
}
}

View File

@ -1,33 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.event.AbstractEvent;
public class SystemMetricsEvent extends AbstractEvent<SystemMetricsEventType> {
public SystemMetricsEvent(SystemMetricsEventType type) {
super(type);
}
public SystemMetricsEvent(SystemMetricsEventType type, long timestamp) {
super(type, timestamp);
}
}

View File

@ -1,37 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
public enum SystemMetricsEventType {
// app events
APP_CREATED,
APP_FINISHED,
APP_ACLS_UPDATED,
APP_UPDATED,
APP_STATE_UPDATED,
// app attempt events
APP_ATTEMPT_REGISTERED,
APP_ATTEMPT_FINISHED,
// container events
CONTAINER_CREATED,
CONTAINER_FINISHED
}

View File

@ -18,269 +18,32 @@
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import com.google.common.annotations.VisibleForTesting;
public interface SystemMetricsPublisher {
import com.google.common.annotations.VisibleForTesting;
void appCreated(RMApp app, long createdTime);
/**
* The class that helps RM publish metrics to the timeline server. RM will
* always invoke the methods of this class regardless the service is enabled or
* not. If it is disabled, publishing requests will be ignored silently.
*/
@Private
@Unstable
public class SystemMetricsPublisher extends CompositeService {
void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime);
private static final Log LOG = LogFactory
.getLog(SystemMetricsPublisher.class);
void appUpdated(RMApp app, long updatedTime);
private Dispatcher dispatcher;
private boolean publishSystemMetrics;
private boolean publishContainerMetrics;
protected RMContext rmContext;
void appStateUpdated(RMApp app, YarnApplicationState appState,
long updatedTime);
public SystemMetricsPublisher(RMContext rmContext) {
super(SystemMetricsPublisher.class.getName());
this.rmContext = rmContext;
}
void appFinished(RMApp app, RMAppState state, long finishedTime);
@Override
protected void serviceInit(Configuration conf) throws Exception {
publishSystemMetrics =
conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
if (publishSystemMetrics) {
TimelineServicePublisher timelineServicePublisher =
getTimelineServicePublisher(conf);
if (timelineServicePublisher != null) {
addService(timelineServicePublisher);
// init required to be called so that other methods of
// TimelineServicePublisher can be utilized
timelineServicePublisher.init(conf);
dispatcher = createDispatcher(timelineServicePublisher);
publishContainerMetrics =
timelineServicePublisher.publishRMContainerMetrics();
dispatcher.register(SystemMetricsEventType.class,
timelineServicePublisher.getEventHandler());
addIfService(dispatcher);
} else {
LOG.info("TimelineServicePublisher is not configured");
publishSystemMetrics = false;
}
LOG.info("YARN system metrics publishing service is enabled");
} else {
LOG.info("YARN system metrics publishing service is not enabled");
}
super.serviceInit(conf);
}
void appAttemptRegistered(RMAppAttempt appAttempt, long registeredTime);
@VisibleForTesting
Dispatcher createDispatcher(TimelineServicePublisher timelineServicePublisher) {
return timelineServicePublisher.getDispatcher();
}
void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime);
TimelineServicePublisher getTimelineServicePublisher(Configuration conf) {
if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
return new TimelineServiceV1Publisher();
} else if (conf.getBoolean(
YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
return new TimelineServiceV2Publisher(rmContext);
}
return null;
}
void containerCreated(RMContainer container, long createdTime);
@SuppressWarnings("unchecked")
public void appCreated(RMApp app, long createdTime) {
if (publishSystemMetrics) {
ApplicationSubmissionContext appSubmissionContext =
app.getApplicationSubmissionContext();
dispatcher.getEventHandler().handle(
new ApplicationCreatedEvent(
app.getApplicationId(),
app.getName(),
app.getApplicationType(),
app.getUser(),
app.getQueue(),
app.getSubmitTime(),
createdTime, app.getApplicationTags(),
appSubmissionContext.getUnmanagedAM(),
appSubmissionContext.getPriority(),
app.getAppNodeLabelExpression(),
app.getAmNodeLabelExpression(),
app.getCallerContext()));
}
}
@SuppressWarnings("unchecked")
public void appUpdated(RMApp app, long updatedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler()
.handle(new ApplicationUpdatedEvent(app.getApplicationId(),
app.getQueue(), updatedTime,
app.getApplicationSubmissionContext().getPriority()));
}
}
@SuppressWarnings("unchecked")
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicationFinishedEvent(
app.getApplicationId(),
app.getDiagnostics().toString(),
app.getFinalApplicationStatus(),
RMServerUtils.createApplicationState(state),
app.getCurrentAppAttempt() == null ?
null : app.getCurrentAppAttempt().getAppAttemptId(),
finishedTime,
app.getRMAppMetrics(),
(RMAppImpl)app));
}
}
@SuppressWarnings("unchecked")
public void appACLsUpdated(RMApp app, String appViewACLs,
long updatedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicationACLsUpdatedEvent(
app.getApplicationId(),
appViewACLs == null ? "" : appViewACLs,
updatedTime));
}
}
@SuppressWarnings("unchecked")
public void appStateUpdated(RMApp app, YarnApplicationState appState,
long updatedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicaitonStateUpdatedEvent(
app.getApplicationId(),
appState,
updatedTime));
}
}
@SuppressWarnings("unchecked")
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
if (publishSystemMetrics) {
ContainerId container = (appAttempt.getMasterContainer() == null) ? null
: appAttempt.getMasterContainer().getId();
dispatcher.getEventHandler().handle(
new AppAttemptRegisteredEvent(
appAttempt.getAppAttemptId(),
appAttempt.getHost(),
appAttempt.getRpcPort(),
appAttempt.getTrackingUrl(),
appAttempt.getOriginalTrackingUrl(),
container,
registeredTime));
}
}
@SuppressWarnings("unchecked")
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
if (publishSystemMetrics) {
ContainerId container = (appAttempt.getMasterContainer() == null) ? null
: appAttempt.getMasterContainer().getId();
dispatcher.getEventHandler().handle(
new AppAttemptFinishedEvent(
appAttempt.getAppAttemptId(),
appAttempt.getTrackingUrl(),
appAttempt.getOriginalTrackingUrl(),
appAttempt.getDiagnostics(),
// app will get the final status from app attempt, or create one
// based on app state if it doesn't exist
app.getFinalApplicationStatus(),
RMServerUtils.createApplicationAttemptState(appAttemtpState),
finishedTime,
container));
}
}
@SuppressWarnings("unchecked")
public void containerCreated(RMContainer container, long createdTime) {
if (publishContainerMetrics) {
dispatcher.getEventHandler().handle(
new ContainerCreatedEvent(
container.getContainerId(),
container.getAllocatedResource(),
container.getAllocatedNode(),
container.getAllocatedPriority(),
createdTime, container.getNodeHttpAddress()));
}
}
@SuppressWarnings("unchecked")
public void containerFinished(RMContainer container, long finishedTime) {
if (publishContainerMetrics) {
dispatcher.getEventHandler().handle(
new ContainerFinishedEvent(
container.getContainerId(),
container.getDiagnosticsInfo(),
container.getContainerExitStatus(),
container.getContainerState(),
finishedTime, container.getAllocatedNode()));
}
}
@VisibleForTesting
boolean isPublishContainerMetrics() {
return publishContainerMetrics;
}
@VisibleForTesting
Dispatcher getDispatcher() {
return dispatcher;
}
interface TimelineServicePublisher extends Service {
/**
* @return the Dispatcher which needs to be used to dispatch events
*/
Dispatcher getDispatcher();
/**
* @return true if RMContainerMetricsNeeds to be sent
*/
boolean publishRMContainerMetrics();
/**
* @return EventHandler which needs to be registered to the dispatcher to
* handle the SystemMetricsEvent
*/
EventHandler<SystemMetricsEvent> getEventHandler();
}
void containerFinished(RMContainer container, long finishedTime);
}

View File

@ -26,21 +26,28 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
public class TimelineServiceV1Publisher extends
AbstractTimelineServicePublisher {
public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
private static final Log LOG = LogFactory
.getLog(TimelineServiceV1Publisher.class);
private static final Log LOG =
LogFactory.getLog(TimelineServiceV1Publisher.class);
public TimelineServiceV1Publisher() {
super("TimelineserviceV1Publisher");
@ -49,76 +56,69 @@ public TimelineServiceV1Publisher() {
private TimelineClient client;
@Override
public void serviceInit(Configuration conf) throws Exception {
protected void serviceInit(Configuration conf) throws Exception {
client = TimelineClient.createTimelineClient();
addIfService(client);
super.serviceInit(conf);
getDispatcher().register(SystemMetricsEventType.class,
new TimelineV1EventHandler());
}
@SuppressWarnings("unchecked")
@Override
void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
TimelineEntity entity =
createApplicationEntity(event.getApplicationId());
public void appCreated(RMApp app, long createdTime) {
TimelineEntity entity = createApplicationEntity(app.getApplicationId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
event.getApplicationName());
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, app.getName());
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
event.getApplicationType());
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
event.getUser());
app.getApplicationType());
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser());
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
app.getQueue());
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
event.getSubmittedTime());
app.getSubmitTime());
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
event.getAppTags());
app.getApplicationTags());
entityInfo.put(
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
event.isUnmanagedApp());
app.getApplicationSubmissionContext().getUnmanagedAM());
entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
event.getApplicationPriority().getPriority());
entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
event.getAppNodeLabelsExpression());
app.getApplicationSubmissionContext().getPriority().getPriority());
entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
event.getAmNodeLabelsExpression());
if (event.getCallerContext() != null) {
if (event.getCallerContext().getContext() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT,
event.getCallerContext().getContext());
}
if (event.getCallerContext().getSignature() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE,
event.getCallerContext().getSignature());
}
}
app.getAmNodeLabelExpression());
entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
app.getAppNodeLabelExpression());
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(
ApplicationMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(createdTime);
entity.addEvent(tEvent);
putEntity(entity);
getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
}
@Override
void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
TimelineEntity entity = createApplicationEntity(app.getApplicationId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setTimestamp(finishedTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
.getFinalApplicationStatus().toString());
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
.getYarnApplicationState().toString());
if (event.getLatestApplicationAttemptId() != null) {
app.getDiagnostics().toString());
eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
app.getFinalApplicationStatus().toString());
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
RMServerUtils.createApplicationState(state).toString());
String latestApplicationAttemptId = app.getCurrentAppAttempt() == null
? null : app.getCurrentAppAttempt().getAppAttemptId().toString();
if (latestApplicationAttemptId != null) {
eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
event.getLatestApplicationAttemptId().toString());
latestApplicationAttemptId);
}
RMAppMetrics appMetrics = event.getAppMetrics();
RMAppMetrics appMetrics = app.getRMAppMetrics();
entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
appMetrics.getVcoreSeconds());
entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
@ -126,54 +126,170 @@ void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
// sync sending of finish event to avoid possibility of saving application
// finished state in RMStateStore save without publishing in ATS
putEntity(entity);// sync event so that ATS update is done without fail
}
@SuppressWarnings("unchecked")
@Override
void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
public void appUpdated(RMApp app, long updatedTime) {
TimelineEntity entity = createApplicationEntity(app.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
.getApplicationPriority().getPriority());
app.getQueue());
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
app.getApplicationSubmissionContext().getPriority().getPriority());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setTimestamp(updatedTime);
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
}
@SuppressWarnings("unchecked")
@Override
void publishApplicationStateUpdatedEvent(
ApplicaitonStateUpdatedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
public void appStateUpdated(RMApp app, YarnApplicationState appState,
long updatedTime) {
TimelineEntity entity = createApplicationEntity(app.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
event.getAppState());
appState);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setTimestamp(updatedTime);
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
}
@Override
void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
TimelineEntity entity = createApplicationEntity(app.getApplicationId());
TimelineEvent tEvent = new TimelineEvent();
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
event.getViewAppACLs());
(appViewACLs == null) ? "" : appViewACLs);
entity.setOtherInfo(entityInfo);
tEvent.setEventType(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setTimestamp(updatedTime);
entity.addEvent(tEvent);
putEntity(entity);
getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
}
@SuppressWarnings("unchecked")
@Override
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
TimelineEntity entity =
createAppAttemptEntity(appAttempt.getAppAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
tEvent.setTimestamp(registeredTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
appAttempt.getTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
appAttempt.getOriginalTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
appAttempt.getHost());
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
appAttempt.getRpcPort());
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
appAttempt.getMasterContainer().getId().toString());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
getDispatcher().getEventHandler().handle(
new TimelineV1PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
entity, appAttempt.getAppAttemptId().getApplicationId()));
}
@SuppressWarnings("unchecked")
@Override
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
TimelineEntity entity =
createAppAttemptEntity(appAttempt.getAppAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(finishedTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
appAttempt.getTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
appAttempt.getOriginalTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
appAttempt.getDiagnostics());
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
app.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils
.createApplicationAttemptState(appAttemtpState).toString());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
getDispatcher().getEventHandler().handle(
new TimelineV1PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
entity, appAttempt.getAppAttemptId().getApplicationId()));
}
@SuppressWarnings("unchecked")
@Override
public void containerCreated(RMContainer container, long createdTime) {
TimelineEntity entity = createContainerEntity(container.getContainerId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
container.getAllocatedResource().getMemorySize());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
container.getAllocatedResource().getVirtualCores());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
container.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
container.getAllocatedNode().getPort());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
container.getAllocatedPriority().getPriority());
entityInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
container.getNodeHttpAddress());
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(createdTime);
entity.addEvent(tEvent);
getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, container
.getContainerId().getApplicationAttemptId().getApplicationId()));
}
@SuppressWarnings("unchecked")
@Override
public void containerFinished(RMContainer container, long finishedTime) {
TimelineEntity entity = createContainerEntity(container.getContainerId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(finishedTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
container.getDiagnosticsInfo());
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
container.getContainerExitStatus());
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
container.getContainerState().toString());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, container
.getContainerId().getApplicationAttemptId().getApplicationId()));
}
private static TimelineEntity createApplicationEntity(
@ -184,63 +300,6 @@ private static TimelineEntity createApplicationEntity(
return entity;
}
@Override
void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
TimelineEntity entity =
createAppAttemptEntity(event.getApplicationAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(
AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
event.getTrackingUrl());
eventInfo.put(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
event.getOriginalTrackingURL());
eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
event.getHost());
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
event.getRpcPort());
if (event.getMasterContainerId() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
event.getMasterContainerId().toString());
}
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
@Override
void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
TimelineEntity entity =
createAppAttemptEntity(event.getApplicationAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
event.getTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
event.getOriginalTrackingURL());
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
.getYarnApplicationAttemptState().toString());
if (event.getMasterContainerId() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
event.getMasterContainerId().toString());
}
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private static TimelineEntity createAppAttemptEntity(
ApplicationAttemptId appAttemptId) {
TimelineEntity entity = new TimelineEntity();
@ -251,59 +310,6 @@ private static TimelineEntity createAppAttemptEntity(
return entity;
}
@Override
void publishContainerCreatedEvent(ContainerCreatedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
event.getAllocatedResource().getMemorySize());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
.getAllocatedResource().getVirtualCores());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
.getAllocatedNode().getPort());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
event.getAllocatedPriority().getPriority());
entityInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
event.getNodeHttpAddress());
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
putEntity(entity);
}
@Override
void publishContainerFinishedEvent(ContainerFinishedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
event.getContainerExitStatus());
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
.getContainerState().toString());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
event.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
event.getAllocatedNode().getPort());
entity.setOtherInfo(entityInfo);
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private static TimelineEntity createContainerEntity(ContainerId containerId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(ContainerMetricsConstants.ENTITY_TYPE);
@ -326,4 +332,26 @@ private void putEntity(TimelineEntity entity) {
+ entity.getEntityId() + "]", e);
}
}
private class TimelineV1PublishEvent extends TimelinePublishEvent {
private TimelineEntity entity;
public TimelineV1PublishEvent(SystemMetricsEventType type,
TimelineEntity entity, ApplicationId appId) {
super(type, appId);
this.entity = entity;
}
public TimelineEntity getEntity() {
return entity;
}
}
private class TimelineV1EventHandler
implements EventHandler<TimelineV1PublishEvent> {
@Override
public void handle(TimelineV1PublishEvent event) {
putEntity(event.getEntity());
}
}
}

View File

@ -25,10 +25,10 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
@ -39,93 +39,100 @@
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
/**
* This class is responsible for posting application, appattempt & Container
* lifecycle related events to timeline service V2
*/
@Private
@Unstable
public class TimelineServiceV2Publisher extends
AbstractTimelineServicePublisher {
private static final Log LOG = LogFactory
.getLog(TimelineServiceV2Publisher.class);
public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
private static final Log LOG =
LogFactory.getLog(TimelineServiceV2Publisher.class);
protected RMTimelineCollectorManager rmTimelineCollectorManager;
private boolean publishContainerMetrics;
public TimelineServiceV2Publisher(RMContext rmContext) {
super("TimelineserviceV2Publisher");
rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
}
private boolean publishContainerMetrics;
@Override
protected void serviceInit(Configuration conf) throws Exception {
publishContainerMetrics =
conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
super.serviceInit(conf);
protected void serviceStart() throws Exception {
super.serviceStart();
publishContainerMetrics = getConfig().getBoolean(
YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
getDispatcher().register(SystemMetricsEventType.class,
new TimelineV2EventHandler());
}
@VisibleForTesting
boolean isPublishContainerMetrics() {
return publishContainerMetrics;
}
@SuppressWarnings("unchecked")
@Override
void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
TimelineEntity entity =
createApplicationEntity(event.getApplicationId());
public void appCreated(RMApp app, long createdTime) {
ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
entity.setQueue(app.getQueue());
entity.setCreatedTime(createdTime);
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
event.getApplicationName());
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, app.getName());
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
event.getApplicationType());
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
event.getUser());
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
app.getApplicationType());
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser());
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
event.getSubmittedTime());
app.getSubmitTime());
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
event.getAppTags());
app.getApplicationTags());
entityInfo.put(
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
event.isUnmanagedApp());
app.getApplicationSubmissionContext().getUnmanagedAM());
entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
event.getApplicationPriority().getPriority());
entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
event.getAppNodeLabelsExpression());
entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
event.getAmNodeLabelsExpression());
if (event.getCallerContext() != null) {
if (event.getCallerContext().getContext() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT,
event.getCallerContext().getContext());
}
if (event.getCallerContext().getSignature() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE,
event.getCallerContext().getSignature());
}
}
app.getApplicationSubmissionContext().getPriority().getPriority());
entity.getConfigs().put(
ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
app.getAmNodeLabelExpression());
entity.getConfigs().put(
ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
app.getAppNodeLabelExpression());
entity.setInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setTimestamp(createdTime);
entity.addEvent(tEvent);
putEntity(entity, event.getApplicationId());
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
}
@SuppressWarnings("unchecked")
@Override
void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
ApplicationEntity entity =
createApplicationEntity(event.getApplicationId());
RMAppMetrics appMetrics = event.getAppMetrics();
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
RMAppMetrics appMetrics = app.getRMAppMetrics();
entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
appMetrics.getVcoreSeconds());
entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
@ -133,70 +140,76 @@ void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setTimestamp(finishedTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
.getFinalApplicationStatus().toString());
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
.getYarnApplicationState().toString());
if (event.getLatestApplicationAttemptId() != null) {
app.getDiagnostics().toString());
eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
app.getFinalApplicationStatus().toString());
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
RMServerUtils.createApplicationState(state).toString());
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() == null
? null : app.getCurrentAppAttempt().getAppAttemptId();
if (appAttemptId != null) {
eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
event.getLatestApplicationAttemptId().toString());
appAttemptId.toString());
}
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getApplicationId());
//cleaning up the collector cached
event.getApp().stopTimelineCollector();
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
}
@SuppressWarnings("unchecked")
@Override
void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
public void appStateUpdated(RMApp app, YarnApplicationState appState,
long updatedTime) {
ApplicationEntity entity =
createApplicationEntity(event.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
.getApplicationPriority().getPriority());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getApplicationId());
}
@Override
void publishApplicationStateUpdatedEvent(
ApplicaitonStateUpdatedEvent event) {
ApplicationEntity entity =
createApplicationEntity(event.getApplicationId());
createApplicationEntity(app.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
event.getAppState());
appState);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setId(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE);
tEvent.setTimestamp(updatedTime);
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getApplicationId());
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
}
@SuppressWarnings("unchecked")
@Override
void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
ApplicationEntity entity =
createApplicationEntity(event.getApplicationId());
public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
event.getViewAppACLs());
appViewACLs);
entity.setInfo(entityInfo);
putEntity(entity, event.getApplicationId());
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
}
@SuppressWarnings("unchecked")
@Override
public void appUpdated(RMApp app, long currentTimeMillis) {
ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
app.getQueue());
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
app.getApplicationSubmissionContext().getPriority().getPriority());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(currentTimeMillis);
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
}
private static ApplicationEntity createApplicationEntity(
@ -206,117 +219,134 @@ private static ApplicationEntity createApplicationEntity(
return entity;
}
@SuppressWarnings("unchecked")
@Override
void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
TimelineEntity entity =
createAppAttemptEntity(event.getApplicationAttemptId());
createAppAttemptEntity(appAttempt.getAppAttemptId());
entity.setCreatedTime(registeredTime);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setTimestamp(registeredTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(
AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
event.getTrackingUrl());
eventInfo.put(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
event.getOriginalTrackingURL());
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
appAttempt.getTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
appAttempt.getOriginalTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
event.getHost());
appAttempt.getHost());
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
event.getRpcPort());
if (event.getMasterContainerId() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
event.getMasterContainerId().toString());
}
appAttempt.getRpcPort());
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
appAttempt.getMasterContainer().getId().toString());
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getApplicationAttemptId().getApplicationId());
getDispatcher().getEventHandler().handle(
new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
entity, appAttempt.getAppAttemptId().getApplicationId()));
}
@SuppressWarnings("unchecked")
@Override
void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
ApplicationAttemptEntity entity =
createAppAttemptEntity(event.getApplicationAttemptId());
createAppAttemptEntity(appAttempt.getAppAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setTimestamp(finishedTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
event.getTrackingUrl());
appAttempt.getTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
event.getOriginalTrackingURL());
appAttempt.getOriginalTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
.getYarnApplicationAttemptState().toString());
if (event.getMasterContainerId() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
event.getMasterContainerId().toString());
appAttempt.getDiagnostics());
// app will get the final status from app attempt, or create one
// based on app state if it doesn't exist
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
app.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils
.createApplicationAttemptState(appAttemtpState).toString());
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
getDispatcher().getEventHandler().handle(
new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
entity, appAttempt.getAppAttemptId().getApplicationId()));
}
private static ApplicationAttemptEntity createAppAttemptEntity(
ApplicationAttemptId appAttemptId) {
ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
entity.setId(appAttemptId.toString());
entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
appAttemptId.getApplicationId().toString()));
return entity;
}
@SuppressWarnings("unchecked")
@Override
public void containerCreated(RMContainer container, long createdTime) {
if (publishContainerMetrics) {
TimelineEntity entity = createContainerEntity(container.getContainerId());
entity.setCreatedTime(createdTime);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
tEvent.setTimestamp(createdTime);
// updated as event info instead of entity info, as entity info is updated
// by NM
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
container.getAllocatedResource().getMemorySize());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
container.getAllocatedResource().getVirtualCores());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
container.getAllocatedNode().getHost());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
container.getAllocatedNode().getPort());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
container.getAllocatedPriority().getPriority());
eventInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
container.getNodeHttpAddress());
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, container
.getContainerId().getApplicationAttemptId().getApplicationId()));
}
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getApplicationAttemptId().getApplicationId());
}
@SuppressWarnings("unchecked")
@Override
void publishContainerCreatedEvent(ContainerCreatedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
public void containerFinished(RMContainer container, long finishedTime) {
if (publishContainerMetrics) {
TimelineEntity entity = createContainerEntity(container.getContainerId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
// updated as event info instead of entity info, as entity info is updated
// by NM
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event
.getAllocatedResource().getMemorySize());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
.getAllocatedResource().getVirtualCores());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
.getAllocatedNode().getHost());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
.getAllocatedNode().getPort());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
event.getAllocatedPriority().getPriority());
eventInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
event.getNodeHttpAddress());
tEvent.setInfo(eventInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
tEvent.setTimestamp(finishedTime);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
container.getDiagnosticsInfo());
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
container.getContainerExitStatus());
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
container.getContainerState().toString());
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getContainerId().getApplicationAttemptId()
.getApplicationId());
}
@Override
void publishContainerFinishedEvent(ContainerFinishedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
event.getContainerExitStatus());
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
.getContainerState().toString());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
event.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
event.getAllocatedNode().getPort());
entity.setInfo(entityInfo);
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getContainerId().getApplicationAttemptId()
.getApplicationId());
entity.addEvent(tEvent);
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
SystemMetricsEventType.PUBLISH_ENTITY, entity, container
.getContainerId().getApplicationAttemptId().getApplicationId()));
}
}
private static ContainerEntity createContainerEntity(ContainerId containerId) {
@ -344,17 +374,48 @@ private void putEntity(TimelineEntity entity, ApplicationId appId) {
}
}
private static ApplicationAttemptEntity createAppAttemptEntity(
ApplicationAttemptId appAttemptId) {
ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
entity.setId(appAttemptId.toString());
entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
appAttemptId.getApplicationId().toString()));
return entity;
private class ApplicationFinishPublishEvent extends TimelineV2PublishEvent {
private RMAppImpl app;
public ApplicationFinishPublishEvent(SystemMetricsEventType type,
TimelineEntity entity, RMAppImpl app) {
super(type, entity, app.getApplicationId());
this.app = app;
}
public RMAppImpl getRMAppImpl() {
return app;
}
}
@Override
public boolean publishRMContainerMetrics() {
return publishContainerMetrics;
private class TimelineV2EventHandler
implements EventHandler<TimelineV2PublishEvent> {
@Override
public void handle(TimelineV2PublishEvent event) {
switch (event.getType()) {
case PUBLISH_APPLICATION_FINISHED_ENTITY:
putEntity(event.getEntity(), event.getApplicationId());
((ApplicationFinishPublishEvent) event).getRMAppImpl()
.stopTimelineCollector();
break;
default:
putEntity(event.getEntity(), event.getApplicationId());
break;
}
}
}
private class TimelineV2PublishEvent extends TimelinePublishEvent {
private TimelineEntity entity;
public TimelineV2PublishEvent(SystemMetricsEventType type,
TimelineEntity entity, ApplicationId appId) {
super(type, appId);
this.entity = entity;
}
public TimelineEntity getEntity() {
return entity;
}
}
}

View File

@ -48,7 +48,6 @@
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@ -69,7 +68,7 @@
public class TestSystemMetricsPublisher {
private static ApplicationHistoryServer timelineServer;
private static SystemMetricsPublisher metricsPublisher;
private static TimelineServiceV1Publisher metricsPublisher;
private static TimelineStore store;
@BeforeClass
@ -90,7 +89,7 @@ public static void setup() throws Exception {
timelineServer.start();
store = timelineServer.getTimelineStore();
metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
metricsPublisher = new TimelineServiceV1Publisher();
metricsPublisher.init(conf);
metricsPublisher.start();
}

View File

@ -49,7 +49,6 @@
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher.MultiThreadedDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@ -75,7 +74,7 @@ public class TestSystemMetricsPublisherForV2 {
TestSystemMetricsPublisherForV2.class.getName() + "-localDir")
.getAbsoluteFile();
private static SystemMetricsPublisher metricsPublisher;
private static TimelineServiceV2Publisher metricsPublisher;
private static DrainDispatcher dispatcher = new DrainDispatcher();
private static final String DEFAULT_FLOW_VERSION = "1";
private static final long DEFAULT_FLOW_RUN = 1;
@ -103,10 +102,11 @@ public static void setup() throws Exception {
rmTimelineCollectorManager.init(conf);
rmTimelineCollectorManager.start();
metricsPublisher = new SystemMetricsPublisher(rmContext) {
dispatcher.init(conf);
dispatcher.start();
metricsPublisher = new TimelineServiceV2Publisher(rmContext) {
@Override
Dispatcher createDispatcher(
TimelineServicePublisher timelineServicePublisher) {
protected Dispatcher getDispatcher() {
return dispatcher;
}
};
@ -150,8 +150,8 @@ private static Configuration getTimelineV2Conf() {
@Test
public void testSystemMetricPublisherInitialization() {
@SuppressWarnings("resource")
SystemMetricsPublisher metricsPublisher =
new SystemMetricsPublisher(mock(RMContext.class));
TimelineServiceV2Publisher metricsPublisher =
new TimelineServiceV2Publisher(mock(RMContext.class));
try {
Configuration conf = getTimelineV2Conf();
conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
@ -163,20 +163,18 @@ public void testSystemMetricPublisherInitialization() {
metricsPublisher.stop();
metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
metricsPublisher = new TimelineServiceV2Publisher(mock(RMContext.class));
conf = getTimelineV2Conf();
metricsPublisher.init(conf);
metricsPublisher.start();
assertTrue("Expected to publish container Metrics from RM",
metricsPublisher.isPublishContainerMetrics());
assertTrue(
"MultiThreadedDispatcher expected when container Metrics is not published",
metricsPublisher.getDispatcher() instanceof MultiThreadedDispatcher);
} finally {
metricsPublisher.stop();
}
}
@Test(timeout = 1000000)
@Test(timeout = 10000)
public void testPublishApplicationMetrics() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
RMApp app = createAppAndRegister(appId);