YARN-6395. Integrate service app master to write data into ATSv2. Contributed by Rohith Sharma K S

This commit is contained in:
Jian He 2017-03-30 15:58:51 +08:00
parent c31cd981eb
commit f4216b7bba
10 changed files with 1043 additions and 0 deletions

View File

@ -64,6 +64,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
@ -146,6 +147,8 @@
import org.apache.slider.server.appmaster.state.MostRecentContainerReleaseSelector;
import org.apache.slider.server.appmaster.state.ProviderAppState;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.timelineservice.ServiceTimelinePublisher;
import org.apache.slider.server.appmaster.timelineservice.SliderMetricsSink;
import org.apache.slider.server.appmaster.web.SliderAMWebApp;
import org.apache.slider.server.appmaster.web.WebAppApi;
import org.apache.slider.server.appmaster.web.WebAppApiImpl;
@ -240,6 +243,13 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private AMRMClientAsync asyncRMClient;
/** Handle to communicate with the timeline service */
private TimelineClient timelineClient;
private boolean timelineServiceEnabled = false;
ServiceTimelinePublisher serviceTimelinePublisher;
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private RMOperationHandler rmOperationHandler;
@ -483,6 +493,10 @@ public synchronized void serviceInit(Configuration conf) throws Exception {
addService(executorService);
addService(actionQueues);
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
timelineServiceEnabled = true;
log.info("Enabled YARN timeline service v2. ");
}
//init all child services
super.serviceInit(conf);
@ -650,6 +664,20 @@ private int createAndRunCluster(String appName) throws Throwable {
//now bring it up
deployChildService(asyncRMClient);
if (timelineServiceEnabled) {
timelineClient = TimelineClient.createTimelineClient(appid);
asyncRMClient.registerTimelineClient(timelineClient);
timelineClient.init(getConfig());
timelineClient.start();
log.info("Timeline client started.");
serviceTimelinePublisher = new ServiceTimelinePublisher(timelineClient);
serviceTimelinePublisher.init(getConfig());
serviceTimelinePublisher.start();
appState.setServiceTimelinePublisher(serviceTimelinePublisher);
log.info("ServiceTimelinePublisher started.");
}
// nmclient relays callbacks back to this class
nmClientAsync = new NMClientAsyncImpl("nmclient", this);
@ -781,6 +809,12 @@ private int createAndRunCluster(String appName) throws Throwable {
liveContainers = amRegistrationData.getContainersFromPreviousAttempts();
DefaultMetricsSystem.initialize("SliderAppMaster");
if (timelineServiceEnabled) {
DefaultMetricsSystem.instance().register("SliderMetricsSink",
"For processing metrics to ATS",
new SliderMetricsSink(serviceTimelinePublisher));
log.info("SliderMetricsSink registered.");
}
//determine the location for the role history data
Path historyDir = new Path(appDir, HISTORY_DIR_NAME);
@ -1132,6 +1166,9 @@ public void registerServiceInstance(String instanceName,
yarnRegistryOperations.getSelfRegistrationPath(),
true);
}
if (timelineServiceEnabled) {
serviceTimelinePublisher.serviceAttemptRegistered(appState);
}
}
/**
@ -1184,6 +1221,11 @@ public boolean registerComponent(ContainerId id, String description,
container.setState(org.apache.slider.api.resource.ContainerState.INIT);
container.setBareHost(instance.host);
instance.providerRole.component.addContainer(container);
if (timelineServiceEnabled) {
serviceTimelinePublisher.componentInstanceStarted(container,
instance.providerRole.component.getName());
}
return true;
}
@ -1345,6 +1387,12 @@ private synchronized int finish() throws Exception {
releaseAllContainers(application);
DefaultMetricsSystem.shutdown();
if (timelineServiceEnabled) {
serviceTimelinePublisher.serviceAttemptUnregistered(appState, stopAction);
serviceTimelinePublisher.stop();
timelineClient.stop();
}
// When the application completes, it should send a finish application
// signal to the RM
log.info("Application completed. Signalling finish to RM");
@ -1490,6 +1538,10 @@ public synchronized void onContainersCompleted(List<ContainerStatus> completedCo
if(!result.unknownNode) {
queue(new UnregisterComponentInstance(containerId, 0,
TimeUnit.MILLISECONDS));
if (timelineServiceEnabled && result.roleInstance != null) {
serviceTimelinePublisher
.componentInstanceFinished(result.roleInstance);
}
}
}
@ -1967,6 +2019,17 @@ public void onContainerStatusReceived(ContainerId containerId,
nmClientAsync.getContainerStatusAsync(containerId,
cinfo.container.getNodeId());
}
} else if (timelineServiceEnabled) {
RoleInstance instance = appState.getOwnedContainer(containerId);
if (instance != null) {
org.apache.slider.api.resource.Container container =
instance.providerRole.component
.getContainer(containerId.toString());
if (container != null) {
serviceTimelinePublisher.componentInstanceUpdated(container,
instance.providerRole.component.getName());
}
}
}
}

View File

@ -64,6 +64,7 @@
import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation;
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation;
import org.apache.slider.server.appmaster.operations.UpdateBlacklistOperation;
import org.apache.slider.server.appmaster.timelineservice.ServiceTimelinePublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -207,6 +208,8 @@ public class AppState {
private Resource maxResource;
private SliderMetrics appMetrics;
private ServiceTimelinePublisher serviceTimelinePublisher;
/**
* Create an instance
* @param recordFactory factory for YARN records
@ -1762,6 +1765,10 @@ public synchronized List<AbstractRMOperation> releaseAllContainers() {
log.info("Releasing container. Log: " + url);
try {
containerReleaseSubmitted(possible);
// update during finish call
if (serviceTimelinePublisher != null) {
serviceTimelinePublisher.componentInstanceFinished(instance);
}
} catch (SliderInternalStateException e) {
log.warn("when releasing container {} :", possible, e);
}
@ -1948,4 +1955,8 @@ public Map<Integer, String> buildNamingMap() {
}
return naming;
}
public void setServiceTimelinePublisher(ServiceTimelinePublisher serviceTimelinePublisher) {
this.serviceTimelinePublisher = serviceTimelinePublisher;
}
}

View File

@ -0,0 +1,365 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.timelineservice;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.slider.api.resource.Application;
import org.apache.slider.api.resource.Component;
import org.apache.slider.api.resource.ConfigFile;
import org.apache.slider.api.resource.Configuration;
import org.apache.slider.api.resource.Container;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.server.appmaster.actions.ActionStopSlider;
import org.apache.slider.server.appmaster.state.AppState;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A single service that publishes all the Timeline Entities.
*/
public class ServiceTimelinePublisher extends CompositeService {
// Number of bytes of config which can be published in one shot to ATSv2.
public static final int ATS_CONFIG_PUBLISH_SIZE_BYTES = 10 * 1024;
private TimelineClient timelineClient;
private volatile boolean stopped = false;
private static final Logger log =
LoggerFactory.getLogger(ServiceTimelinePublisher.class);
@Override
protected void serviceStop() throws Exception {
stopped = true;
}
public boolean isStopped() {
return stopped;
}
public ServiceTimelinePublisher(TimelineClient client) {
super(ServiceTimelinePublisher.class.getName());
timelineClient = client;
}
public void serviceAttemptRegistered(AppState appState) {
Application application = appState.getClusterStatus();
long currentTimeMillis = application.getLaunchTime() == null
? System.currentTimeMillis() : application.getLaunchTime().getTime();
TimelineEntity entity = createServiceAttemptEntity(application.getId());
entity.setCreatedTime(currentTimeMillis);
// create info keys
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(SliderTimelineMetricsConstants.NAME, application.getName());
entityInfos.put(SliderTimelineMetricsConstants.STATE,
application.getState().toString());
entityInfos.put(SliderTimelineMetricsConstants.LAUNCH_TIME,
currentTimeMillis);
entity.addInfo(entityInfos);
// add an event
TimelineEvent startEvent = new TimelineEvent();
startEvent.setId(SliderTimelineEvent.SERVICE_ATTEMPT_REGISTERED.toString());
startEvent.setTimestamp(currentTimeMillis);
entity.addEvent(startEvent);
// publish before configurations published
putEntity(entity);
// publish application specific configurations
publishConfigurations(application.getConfiguration(), application.getId(),
SliderTimelineEntityType.SERVICE_ATTEMPT.toString(), true);
// publish component as separate entity.
publishComponents(application.getComponents());
}
public void serviceAttemptUnregistered(AppState appState,
ActionStopSlider stopAction) {
long currentTimeMillis = System.currentTimeMillis();
TimelineEntity entity =
createServiceAttemptEntity(appState.getClusterStatus().getId());
// add info
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(SliderTimelineMetricsConstants.EXIT_STATUS_CODE,
stopAction.getExitCode());
entityInfos.put(SliderTimelineMetricsConstants.STATE,
stopAction.getFinalApplicationStatus().toString());
if (stopAction.getMessage() != null) {
entityInfos.put(SliderTimelineMetricsConstants.EXIT_REASON,
stopAction.getMessage());
}
if (stopAction.getEx() != null) {
entityInfos.put(SliderTimelineMetricsConstants.DIAGNOSTICS_INFO,
stopAction.getEx().toString());
}
entity.addInfo(entityInfos);
// add an event
TimelineEvent startEvent = new TimelineEvent();
startEvent
.setId(SliderTimelineEvent.SERVICE_ATTEMPT_UNREGISTERED.toString());
startEvent.setTimestamp(currentTimeMillis);
entity.addEvent(startEvent);
putEntity(entity);
}
public void componentInstanceStarted(Container container,
String componentName) {
TimelineEntity entity = createComponentInstanceEntity(container.getId());
entity.setCreatedTime(container.getLaunchTime().getTime());
// create info keys
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(SliderTimelineMetricsConstants.BARE_HOST,
container.getBareHost());
entityInfos.put(SliderTimelineMetricsConstants.STATE,
container.getState().toString());
entityInfos.put(SliderTimelineMetricsConstants.LAUNCH_TIME,
container.getLaunchTime().getTime());
entityInfos.put(SliderTimelineMetricsConstants.COMPONENT_NAME,
componentName);
entity.addInfo(entityInfos);
// add an event
TimelineEvent startEvent = new TimelineEvent();
startEvent
.setId(SliderTimelineEvent.COMPONENT_INSTANCE_REGISTERED.toString());
startEvent.setTimestamp(container.getLaunchTime().getTime());
entity.addEvent(startEvent);
putEntity(entity);
}
public void componentInstanceFinished(RoleInstance instance) {
TimelineEntity entity = createComponentInstanceEntity(instance.id);
// create info keys
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(SliderTimelineMetricsConstants.EXIT_STATUS_CODE,
instance.exitCode);
entityInfos.put(SliderTimelineMetricsConstants.DIAGNOSTICS_INFO,
instance.diagnostics);
// TODO need to change the state based on enum value.
entityInfos.put(SliderTimelineMetricsConstants.STATE, "FINISHED");
entity.addInfo(entityInfos);
// add an event
TimelineEvent startEvent = new TimelineEvent();
startEvent
.setId(SliderTimelineEvent.COMPONENT_INSTANCE_UNREGISTERED.toString());
startEvent.setTimestamp(System.currentTimeMillis());
entity.addEvent(startEvent);
putEntity(entity);
}
public void componentInstanceUpdated(Container container,
String componentName) {
TimelineEntity entity = createComponentInstanceEntity(container.getId());
// create info keys
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(SliderTimelineMetricsConstants.IP, container.getIp());
entityInfos.put(SliderTimelineMetricsConstants.HOSTNAME,
container.getHostname());
entityInfos.put(SliderTimelineMetricsConstants.STATE,
container.getState().toString());
entity.addInfo(entityInfos);
TimelineEvent updateEvent = new TimelineEvent();
updateEvent
.setId(SliderTimelineEvent.COMPONENT_INSTANCE_UPDATED.toString());
updateEvent.setTimestamp(System.currentTimeMillis());
entity.addEvent(updateEvent);
putEntity(entity);
}
private void publishComponents(List<Component> components) {
long currentTimeMillis = System.currentTimeMillis();
for (Component component : components) {
TimelineEntity entity = createComponentEntity(component.getName());
entity.setCreatedTime(currentTimeMillis);
// create info keys
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(SliderTimelineMetricsConstants.ARTIFACT_ID,
component.getArtifact().getId());
entityInfos.put(SliderTimelineMetricsConstants.ARTIFACT_TYPE,
component.getArtifact().getType().toString());
if (component.getResource().getProfile() != null) {
entityInfos.put(SliderTimelineMetricsConstants.RESOURCE_PROFILE,
component.getResource().getProfile());
}
entityInfos.put(SliderTimelineMetricsConstants.RESOURCE_CPU,
component.getResource().getCpus());
entityInfos.put(SliderTimelineMetricsConstants.RESOURCE_MEMORY,
component.getResource().getMemory());
if (component.getLaunchCommand() != null) {
entityInfos.put(SliderTimelineMetricsConstants.LAUNCH_COMMAND,
component.getLaunchCommand());
}
entityInfos.put(SliderTimelineMetricsConstants.UNIQUE_COMPONENT_SUPPORT,
component.getUniqueComponentSupport().toString());
entityInfos.put(SliderTimelineMetricsConstants.RUN_PRIVILEGED_CONTAINER,
component.getRunPrivilegedContainer().toString());
if (component.getPlacementPolicy() != null) {
entityInfos.put(SliderTimelineMetricsConstants.PLACEMENT_POLICY,
component.getPlacementPolicy().getLabel());
}
entity.addInfo(entityInfos);
putEntity(entity);
// publish component specific configurations
publishConfigurations(component.getConfiguration(), component.getName(),
SliderTimelineEntityType.COMPONENT.toString(), false);
}
}
private void publishConfigurations(Configuration configuration,
String entityId, String entityType, boolean isServiceAttemptEntity) {
if (isServiceAttemptEntity) {
// publish slider-client.xml properties at service level
publishConfigurations(SliderUtils.loadSliderClientXML().iterator(),
entityId, entityType);
}
publishConfigurations(configuration.getProperties().entrySet().iterator(),
entityId, entityType);
publishConfigurations(configuration.getEnv().entrySet().iterator(),
entityId, entityType);
for (ConfigFile configFile : configuration.getFiles()) {
publishConfigurations(configFile.getProps().entrySet().iterator(),
entityId, entityType);
}
}
private void publishConfigurations(Iterator<Entry<String, String>> iterator,
String entityId, String entityType) {
int configSize = 0;
TimelineEntity entity = createTimelineEntity(entityId, entityType);
while (iterator.hasNext()) {
Entry<String, String> entry = iterator.next();
int size = entry.getKey().length() + entry.getValue().length();
configSize += size;
// Configs are split into multiple entities if they exceed 100kb in size.
if (configSize > ATS_CONFIG_PUBLISH_SIZE_BYTES) {
if (entity.getConfigs().size() > 0) {
putEntity(entity);
entity = createTimelineEntity(entityId, entityType);
}
configSize = size;
}
entity.addConfig(entry.getKey(), entry.getValue());
}
if (configSize > 0) {
putEntity(entity);
}
}
/**
* Called from SliderMetricsSink at regular interval of time.
* @param metrics of service or components
* @param entityId Id of entity
* @param entityType Type of entity
* @param timestamp
*/
public void publishMetrics(Iterable<AbstractMetric> metrics, String entityId,
String entityType, long timestamp) {
TimelineEntity entity = createTimelineEntity(entityId, entityType);
Set<TimelineMetric> entityMetrics = new HashSet<TimelineMetric>();
for (AbstractMetric metric : metrics) {
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setId(metric.name());
timelineMetric.addValue(timestamp, metric.value());
entityMetrics.add(timelineMetric);
}
entity.setMetrics(entityMetrics);
putEntity(entity);
}
private TimelineEntity createServiceAttemptEntity(String serviceId) {
TimelineEntity entity = createTimelineEntity(serviceId,
SliderTimelineEntityType.SERVICE_ATTEMPT.toString());
return entity;
}
private TimelineEntity createComponentInstanceEntity(String instanceId) {
TimelineEntity entity = createTimelineEntity(instanceId,
SliderTimelineEntityType.COMPONENT_INSTANCE.toString());
return entity;
}
private TimelineEntity createComponentEntity(String componentId) {
TimelineEntity entity = createTimelineEntity(componentId,
SliderTimelineEntityType.COMPONENT.toString());
return entity;
}
private TimelineEntity createTimelineEntity(String entityId,
String entityType) {
TimelineEntity entity = new TimelineEntity();
entity.setId(entityId);
entity.setType(entityType);
return entity;
}
private void putEntity(TimelineEntity entity) {
try {
if (log.isDebugEnabled()) {
log.debug("Publishing the entity " + entity + ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
if (timelineClient != null) {
timelineClient.putEntitiesAsync(entity);
} else {
log.error("Seems like client has been removed before the entity "
+ "could be published for " + entity);
}
} catch (Exception e) {
log.error("Error when publishing entity " + entity, e);
}
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.timelineservice;
import org.apache.commons.configuration2.SubsetConfiguration;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Write the metrics to a ATSv2. Generally, this class is instantiated via
* hadoop-metrics2 property files. Specifically, you would create this class by
* adding the following to by This would actually be set as: <code>
* [prefix].sink.[some instance name].class
* =org.apache.slider.server.appmaster.timelineservice.SliderMetricsSink
* </code>, where <tt>prefix</tt> is "atsv2": and <tt>some instance name</tt> is
* just any unique name, so properties can be differentiated if there are
* multiple sinks of the same type created
*/
public class SliderMetricsSink implements MetricsSink {
private static final Logger log =
LoggerFactory.getLogger(SliderMetricsSink.class);
private ServiceTimelinePublisher serviceTimelinePublisher;
public SliderMetricsSink() {
}
public SliderMetricsSink(ServiceTimelinePublisher publisher) {
serviceTimelinePublisher = publisher;
}
/**
* Publishes service and component metrics to ATS.
*/
@Override
public void putMetrics(MetricsRecord record) {
if (serviceTimelinePublisher.isStopped()) {
log.warn("ServiceTimelinePublisher has stopped. "
+ "Not publishing any more metrics to ATS.");
return;
}
boolean isServiceMetrics = false;
boolean isComponentMetrics = false;
String appId = null;
for (MetricsTag tag : record.tags()) {
if (tag.name().equals("type") && tag.value().equals("service")) {
isServiceMetrics = true;
} else if (tag.name().equals("type") && tag.value().equals("component")) {
isComponentMetrics = true;
break; // if component metrics, no more information required from tag so
// break the loop
} else if (tag.name().equals("appId")) {
appId = tag.value();
}
}
if (isServiceMetrics && appId != null) {
if (log.isDebugEnabled()) {
log.debug("Publishing service metrics. " + record);
}
serviceTimelinePublisher.publishMetrics(record.metrics(), appId,
SliderTimelineEntityType.SERVICE_ATTEMPT.toString(),
record.timestamp());
} else if (isComponentMetrics) {
if (log.isDebugEnabled()) {
log.debug("Publishing Component metrics. " + record);
}
serviceTimelinePublisher.publishMetrics(record.metrics(), record.name(),
SliderTimelineEntityType.COMPONENT.toString(), record.timestamp());
}
}
@Override
public void init(SubsetConfiguration conf) {
}
@Override
public void flush() {
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.timelineservice;
/**
* Slider entities that are published to ATS.
*/
public enum SliderTimelineEntityType {
/**
* Used for publishing service entity information.
*/
SERVICE_ATTEMPT,
/**
* Used for publishing component entity information.
*/
COMPONENT,
/**
* Used for publishing component instance entity information.
*/
COMPONENT_INSTANCE
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.timelineservice;
/**
* Events that are used to store in ATS.
*/
public enum SliderTimelineEvent {
SERVICE_ATTEMPT_REGISTERED,
SERVICE_ATTEMPT_UNREGISTERED,
COMPONENT_INSTANCE_REGISTERED,
COMPONENT_INSTANCE_UNREGISTERED,
COMPONENT_INSTANCE_UPDATED
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.timelineservice;
/**
* Constants which are stored as key in ATS
*/
public final class SliderTimelineMetricsConstants {
public static final String URI = "URI";
public static final String NAME = "NAME";
public static final String STATE = "STATE";
public static final String EXIT_STATUS_CODE = "EXIT_STATUS_CODE";
public static final String EXIT_REASON = "EXIT_REASON";
public static final String DIAGNOSTICS_INFO = "DIAGNOSTICS_INFO";
public static final String LAUNCH_TIME = "LAUNCH_TIME";
public static final String LAUNCH_COMMAND = "LAUNCH_COMMAND";
public static final String TOTAL_CONTAINERS = "NUMBER_OF_CONTAINERS";
public static final String RUNNING_CONTAINERS =
"NUMBER_OF_RUNNING_CONTAINERS";
/**
* Artifacts constants.
*/
public static final String ARTIFACT_ID = "ARTIFACT_ID";
public static final String ARTIFACT_TYPE = "ARTIFACT_TYPE";
public static final String ARTIFACT_URI = "ARTIFACT_URI";
/**
* Resource constants.
*/
public static final String RESOURCE_CPU = "RESOURCE_CPU";
public static final String RESOURCE_MEMORY = "RESOURCE_MEMORY";
public static final String RESOURCE_PROFILE = "RESOURCE_PROFILE";
/**
* component instance constants.
*/
public static final String IP = "IP";
public static final String HOSTNAME = "HOSTNAME";
public static final String BARE_HOST = "BARE_HOST";
public static final String COMPONENT_NAME = "COMPONENT_NAME";
/**
* component constants.
*/
public static final String DEPENDENCIES = "DEPENDENCIES";
public static final String DESCRIPTION = "DESCRIPTION";
public static final String UNIQUE_COMPONENT_SUPPORT =
"UNIQUE_COMPONENT_SUPPORT";
public static final String RUN_PRIVILEGED_CONTAINER =
"RUN_PRIVILEGED_CONTAINER";
public static final String PLACEMENT_POLICY = "PLACEMENT_POLICY";
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* ATS implementation
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.slider.server.appmaster.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,285 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.timelineservice;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.slider.api.resource.Application;
import org.apache.slider.api.resource.ApplicationState;
import org.apache.slider.api.resource.Artifact;
import org.apache.slider.api.resource.Component;
import org.apache.slider.api.resource.Container;
import org.apache.slider.api.resource.ContainerState;
import org.apache.slider.api.resource.PlacementPolicy;
import org.apache.slider.api.resource.Resource;
import org.apache.slider.server.appmaster.actions.ActionStopSlider;
import org.apache.slider.server.appmaster.state.AppState;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test class for ServiceTimelinePublisher.
*/
public class TestServiceTimelinePublisher {
private TimelineClient timelineClient;
private Configuration config;
private ServiceTimelinePublisher serviceTimelinePublisher;
private static String SERVICE_NAME = "HBASE";
private static String SERVICEID = "application_1490093646524_0005";
private static String ARTIFACTID = "ARTIFACTID";
private static String COMPONENT_NAME = "DEFAULT";
private static String CONTAINER_ID =
"container_e02_1490093646524_0005_01_000001";
private static String CONTAINER_IP =
"localhost";
private static String CONTAINER_HOSTNAME =
"cnl124-localhost.site";
private static String CONTAINER_BAREHOST =
"localhost.com";
@Before
public void setUp() throws Exception {
config = new Configuration();
timelineClient = new DummyTimelineClient();
serviceTimelinePublisher = new ServiceTimelinePublisher(timelineClient);
timelineClient.init(config);
serviceTimelinePublisher.init(config);
timelineClient.start();
serviceTimelinePublisher.start();
}
@After
public void tearDown() throws Exception {
serviceTimelinePublisher.stop();
timelineClient.stop();
}
@Test
public void testServiceAttemptEntity() {
AppState appState = createMockAppState();
int exitCode = 0;
String message = "Stopped by user";
ActionStopSlider stopAction = mock(ActionStopSlider.class);
when(stopAction.getExitCode()).thenReturn(exitCode);
when(stopAction.getFinalApplicationStatus())
.thenReturn(FinalApplicationStatus.SUCCEEDED);
when(stopAction.getMessage()).thenReturn(message);
serviceTimelinePublisher.serviceAttemptRegistered(appState);
Collection<TimelineEntity> lastPublishedEntities =
((DummyTimelineClient) timelineClient).getLastPublishedEntities();
// 2 entities because during registration component also registered.
assertEquals(2, lastPublishedEntities.size());
for (TimelineEntity timelineEntity : lastPublishedEntities) {
if (timelineEntity.getType() == SliderTimelineEntityType.COMPONENT
.toString()) {
verifyComponentTimelineEntity(timelineEntity);
} else {
verifyServiceAttemptTimelineEntity(timelineEntity, 0, null, true);
}
}
serviceTimelinePublisher.serviceAttemptUnregistered(appState, stopAction);
lastPublishedEntities =
((DummyTimelineClient) timelineClient).getLastPublishedEntities();
for (TimelineEntity timelineEntity : lastPublishedEntities) {
if (timelineEntity.getType() == SliderTimelineEntityType.SERVICE_ATTEMPT
.toString()) {
verifyServiceAttemptTimelineEntity(timelineEntity, exitCode, message,
false);
}
}
}
@Test
public void testComponentInstanceEntity() {
Container container = new Container();
container.id(CONTAINER_ID).ip(CONTAINER_IP).bareHost(CONTAINER_BAREHOST)
.hostname(CONTAINER_HOSTNAME).state(ContainerState.INIT)
.launchTime(new Date());
serviceTimelinePublisher.componentInstanceStarted(container,
COMPONENT_NAME);
Collection<TimelineEntity> lastPublishedEntities =
((DummyTimelineClient) timelineClient).getLastPublishedEntities();
assertEquals(1, lastPublishedEntities.size());
TimelineEntity entity = lastPublishedEntities.iterator().next();
assertEquals(1, entity.getEvents().size());
assertEquals(CONTAINER_ID, entity.getId());
assertEquals(CONTAINER_BAREHOST,
entity.getInfo().get(SliderTimelineMetricsConstants.BARE_HOST));
assertEquals(COMPONENT_NAME,
entity.getInfo().get(SliderTimelineMetricsConstants.COMPONENT_NAME));
assertEquals(ContainerState.INIT.toString(),
entity.getInfo().get(SliderTimelineMetricsConstants.STATE));
// updated container state
container.setState(ContainerState.READY);
serviceTimelinePublisher.componentInstanceUpdated(container,
COMPONENT_NAME);
lastPublishedEntities =
((DummyTimelineClient) timelineClient).getLastPublishedEntities();
assertEquals(1, lastPublishedEntities.size());
entity = lastPublishedEntities.iterator().next();
assertEquals(2, entity.getEvents().size());
assertEquals(ContainerState.READY.toString(),
entity.getInfo().get(SliderTimelineMetricsConstants.STATE));
}
private void verifyServiceAttemptTimelineEntity(TimelineEntity timelineEntity,
int exitCode, String message, boolean isRegistedEntity) {
assertEquals(SERVICEID, timelineEntity.getId());
assertEquals(SERVICE_NAME,
timelineEntity.getInfo().get(SliderTimelineMetricsConstants.NAME));
if (isRegistedEntity) {
assertEquals(ApplicationState.STARTED.toString(),
timelineEntity.getInfo().get(SliderTimelineMetricsConstants.STATE));
assertEquals(SliderTimelineEvent.SERVICE_ATTEMPT_REGISTERED.toString(),
timelineEntity.getEvents().iterator().next().getId());
} else {
assertEquals("SUCCEEDED",
timelineEntity.getInfo().get(SliderTimelineMetricsConstants.STATE));
assertEquals(exitCode, timelineEntity.getInfo()
.get(SliderTimelineMetricsConstants.EXIT_STATUS_CODE));
assertEquals(message, timelineEntity.getInfo()
.get(SliderTimelineMetricsConstants.EXIT_REASON));
assertEquals(2, timelineEntity.getEvents().size());
assertEquals(SliderTimelineEvent.SERVICE_ATTEMPT_UNREGISTERED.toString(),
timelineEntity.getEvents().iterator().next().getId());
}
}
private void verifyComponentTimelineEntity(TimelineEntity entity) {
Map<String, Object> info = entity.getInfo();
assertEquals("DEFAULT", entity.getId());
assertEquals(ARTIFACTID,
info.get(SliderTimelineMetricsConstants.ARTIFACT_ID));
assertEquals("DOCKER",
info.get(SliderTimelineMetricsConstants.ARTIFACT_TYPE));
assertEquals("medium",
info.get(SliderTimelineMetricsConstants.RESOURCE_PROFILE));
assertEquals(1, info.get(SliderTimelineMetricsConstants.RESOURCE_CPU));
assertEquals("1024",
info.get(SliderTimelineMetricsConstants.RESOURCE_MEMORY));
assertEquals("sleep 1",
info.get(SliderTimelineMetricsConstants.LAUNCH_COMMAND));
assertEquals("false",
info.get(SliderTimelineMetricsConstants.UNIQUE_COMPONENT_SUPPORT));
assertEquals("false",
info.get(SliderTimelineMetricsConstants.RUN_PRIVILEGED_CONTAINER));
assertEquals("label",
info.get(SliderTimelineMetricsConstants.PLACEMENT_POLICY));
}
private static AppState createMockAppState() {
AppState appState = mock(AppState.class);
Application application = mock(Application.class);
when(application.getId()).thenReturn(SERVICEID);
when(application.getLaunchTime()).thenReturn(new Date());
when(application.getState()).thenReturn(ApplicationState.STARTED);
when(application.getName()).thenReturn(SERVICE_NAME);
when(application.getConfiguration())
.thenReturn(new org.apache.slider.api.resource.Configuration());
Component component = mock(Component.class);
Artifact artifact = new Artifact();
artifact.setId(ARTIFACTID);
Resource resource = new Resource();
resource.setCpus(1);
resource.setMemory(1024 + "");
resource.setProfile("medium");
when(component.getArtifact()).thenReturn(artifact);
when(component.getName()).thenReturn(COMPONENT_NAME);
when(component.getResource()).thenReturn(resource);
when(component.getLaunchCommand()).thenReturn("sleep 1");
PlacementPolicy placementPolicy = new PlacementPolicy();
placementPolicy.setLabel("label");
when(component.getPlacementPolicy()).thenReturn(placementPolicy);
when(component.getConfiguration())
.thenReturn(new org.apache.slider.api.resource.Configuration());
List<Component> components = new ArrayList<Component>();
components.add(component);
when(application.getComponents()).thenReturn(components);
when(appState.getClusterStatus()).thenReturn(application);
return appState;
}
public static void main(String[] args) {
Application application = createMockAppState().getClusterStatus();
System.out.println(application.getConfiguration());
}
protected static class DummyTimelineClient extends TimelineClientImpl {
private Map<Identifier, TimelineEntity> lastPublishedEntities =
new HashMap<>();
@Override
public void putEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException {
for (TimelineEntity timelineEntity : entities) {
TimelineEntity entity =
lastPublishedEntities.get(timelineEntity.getIdentifier());
if (entity == null) {
lastPublishedEntities.put(timelineEntity.getIdentifier(),
timelineEntity);
} else {
entity.addMetrics(timelineEntity.getMetrics());
entity.addEvents(timelineEntity.getEvents());
entity.addInfo(timelineEntity.getInfo());
entity.addConfigs(timelineEntity.getConfigs());
entity.addRelatesToEntities(timelineEntity.getRelatesToEntities());
entity
.addIsRelatedToEntities(timelineEntity.getIsRelatedToEntities());
}
}
}
public Collection<TimelineEntity> getLastPublishedEntities() {
return lastPublishedEntities.values();
}
public void reset() {
lastPublishedEntities = null;
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* ATS tests
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.slider.server.appmaster.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;