YARN-8303. YarnClient should contact TimelineReader for application/attempt/container report.
This commit is contained in:
parent
8571507efa
commit
ee3355be3c
@ -33,6 +33,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||||
import org.apache.hadoop.yarn.client.api.impl.AHSClientImpl;
|
import org.apache.hadoop.yarn.client.api.impl.AHSClientImpl;
|
||||||
|
import org.apache.hadoop.yarn.client.api.impl.AHSv2ClientImpl;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||||
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
@ -46,8 +47,13 @@ public abstract class AHSClient extends AbstractService {
|
|||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
public static AHSClient createAHSClient() {
|
public static AHSClient createAHSClient() {
|
||||||
AHSClient client = new AHSClientImpl();
|
return new AHSClientImpl();
|
||||||
return client;
|
}
|
||||||
|
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
@Public
|
||||||
|
public static AHSClient createAHSv2Client() {
|
||||||
|
return new AHSv2ClientImpl();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
@ -0,0 +1,149 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.client.api.AHSClient;
|
||||||
|
import org.apache.hadoop.yarn.client.api.TimelineReaderClient;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.util.timeline.TimelineEntityV2Converter;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class provides Application History client implementation which uses
|
||||||
|
* ATS v2 as backend.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class AHSv2ClientImpl extends AHSClient {
|
||||||
|
private TimelineReaderClient readerClient;
|
||||||
|
|
||||||
|
public AHSv2ClientImpl() {
|
||||||
|
super(AHSv2ClientImpl.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serviceInit(Configuration conf) {
|
||||||
|
readerClient = TimelineReaderClient.createTimelineReaderClient();
|
||||||
|
readerClient.init(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected void setReaderClient(TimelineReaderClient readerClient) {
|
||||||
|
this.readerClient = readerClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serviceStart() {
|
||||||
|
readerClient.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serviceStop() {
|
||||||
|
readerClient.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationReport getApplicationReport(ApplicationId appId)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
TimelineEntity entity = readerClient.getApplicationEntity(
|
||||||
|
appId, "ALL", null);
|
||||||
|
return TimelineEntityV2Converter.convertToApplicationReport(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ApplicationReport> getApplications()
|
||||||
|
throws YarnException, IOException {
|
||||||
|
throw new UnsupportedOperationException("ATSv2.0 doesn't support retrieving"
|
||||||
|
+ " ALL application entities.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationAttemptReport getApplicationAttemptReport(
|
||||||
|
ApplicationAttemptId applicationAttemptId)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
TimelineEntity entity = readerClient.getApplicationAttemptEntity(
|
||||||
|
applicationAttemptId, "ALL", null);
|
||||||
|
return TimelineEntityV2Converter.convertToApplicationAttemptReport(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ApplicationAttemptReport> getApplicationAttempts(
|
||||||
|
ApplicationId applicationId) throws YarnException, IOException {
|
||||||
|
List<TimelineEntity> entities = readerClient.getApplicationAttemptEntities(
|
||||||
|
applicationId, "ALL", null, 0, null);
|
||||||
|
List<ApplicationAttemptReport> appAttemptReports =
|
||||||
|
new ArrayList<>();
|
||||||
|
if (entities != null && !entities.isEmpty()) {
|
||||||
|
for (TimelineEntity entity : entities) {
|
||||||
|
ApplicationAttemptReport container =
|
||||||
|
TimelineEntityV2Converter.convertToApplicationAttemptReport(
|
||||||
|
entity);
|
||||||
|
appAttemptReports.add(container);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return appAttemptReports;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ContainerReport getContainerReport(ContainerId containerId)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
TimelineEntity entity = readerClient.getContainerEntity(containerId,
|
||||||
|
"ALL", null);
|
||||||
|
return TimelineEntityV2Converter.convertToContainerReport(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ContainerReport> getContainers(ApplicationAttemptId
|
||||||
|
applicationAttemptId) throws YarnException, IOException {
|
||||||
|
ApplicationId appId = applicationAttemptId.getApplicationId();
|
||||||
|
Map<String, String> filters = new HashMap<>();
|
||||||
|
filters.put("infofilters", "SYSTEM_INFO_PARENT_ENTITY eq {\"id\":\"" +
|
||||||
|
applicationAttemptId.toString() +
|
||||||
|
"\",\"type\":\"YARN_APPLICATION_ATTEMPT\"}");
|
||||||
|
List<TimelineEntity> entities = readerClient.getContainerEntities(
|
||||||
|
appId, "ALL", filters, 0, null);
|
||||||
|
List<ContainerReport> containers =
|
||||||
|
new ArrayList<>();
|
||||||
|
if (entities != null && !entities.isEmpty()) {
|
||||||
|
for (TimelineEntity entity : entities) {
|
||||||
|
ContainerReport container =
|
||||||
|
TimelineEntityV2Converter.convertToContainerReport(
|
||||||
|
entity);
|
||||||
|
containers.add(container);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return containers;
|
||||||
|
}
|
||||||
|
}
|
@ -149,6 +149,7 @@ public class YarnClientImpl extends YarnClient {
|
|||||||
private long asyncApiPollIntervalMillis;
|
private long asyncApiPollIntervalMillis;
|
||||||
private long asyncApiPollTimeoutMillis;
|
private long asyncApiPollTimeoutMillis;
|
||||||
protected AHSClient historyClient;
|
protected AHSClient historyClient;
|
||||||
|
private AHSClient ahsV2Client;
|
||||||
private boolean historyServiceEnabled;
|
private boolean historyServiceEnabled;
|
||||||
protected volatile TimelineClient timelineClient;
|
protected volatile TimelineClient timelineClient;
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@ -159,6 +160,8 @@ public class YarnClientImpl extends YarnClient {
|
|||||||
protected boolean timelineServiceBestEffort;
|
protected boolean timelineServiceBestEffort;
|
||||||
private boolean loadResourceTypesFromServer;
|
private boolean loadResourceTypesFromServer;
|
||||||
|
|
||||||
|
private boolean timelineV2ServiceEnabled;
|
||||||
|
|
||||||
private static final String ROOT = "root";
|
private static final String ROOT = "root";
|
||||||
|
|
||||||
public YarnClientImpl() {
|
public YarnClientImpl() {
|
||||||
@ -188,6 +191,10 @@ protected void serviceInit(Configuration conf) throws Exception {
|
|||||||
timelineService = TimelineUtils.buildTimelineTokenService(conf);
|
timelineService = TimelineUtils.buildTimelineTokenService(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
|
||||||
|
timelineV2ServiceEnabled = true;
|
||||||
|
}
|
||||||
|
|
||||||
// The AHSClientService is enabled by default when we start the
|
// The AHSClientService is enabled by default when we start the
|
||||||
// TimelineServer which means we are able to get history information
|
// TimelineServer which means we are able to get history information
|
||||||
// for applications/applicationAttempts/containers by using ahsClient
|
// for applications/applicationAttempts/containers by using ahsClient
|
||||||
@ -200,6 +207,11 @@ protected void serviceInit(Configuration conf) throws Exception {
|
|||||||
historyClient.init(conf);
|
historyClient.init(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (timelineV2ServiceEnabled) {
|
||||||
|
ahsV2Client = AHSClient.createAHSv2Client();
|
||||||
|
ahsV2Client.init(conf);
|
||||||
|
}
|
||||||
|
|
||||||
timelineServiceBestEffort = conf.getBoolean(
|
timelineServiceBestEffort = conf.getBoolean(
|
||||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT,
|
YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT,
|
||||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_BEST_EFFORT);
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_BEST_EFFORT);
|
||||||
@ -223,6 +235,9 @@ protected void serviceStart() throws Exception {
|
|||||||
if (historyServiceEnabled) {
|
if (historyServiceEnabled) {
|
||||||
historyClient.start();
|
historyClient.start();
|
||||||
}
|
}
|
||||||
|
if (timelineV2ServiceEnabled) {
|
||||||
|
ahsV2Client.start();
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new YarnRuntimeException(e);
|
throw new YarnRuntimeException(e);
|
||||||
}
|
}
|
||||||
@ -244,6 +259,9 @@ protected void serviceStop() throws Exception {
|
|||||||
if (historyServiceEnabled) {
|
if (historyServiceEnabled) {
|
||||||
historyClient.stop();
|
historyClient.stop();
|
||||||
}
|
}
|
||||||
|
if (timelineV2ServiceEnabled) {
|
||||||
|
ahsV2Client.stop();
|
||||||
|
}
|
||||||
if (timelineClient != null) {
|
if (timelineClient != null) {
|
||||||
timelineClient.stop();
|
timelineClient.stop();
|
||||||
}
|
}
|
||||||
@ -516,6 +534,14 @@ public ApplicationReport getApplicationReport(ApplicationId appId)
|
|||||||
request.setApplicationId(appId);
|
request.setApplicationId(appId);
|
||||||
response = rmClient.getApplicationReport(request);
|
response = rmClient.getApplicationReport(request);
|
||||||
} catch (ApplicationNotFoundException e) {
|
} catch (ApplicationNotFoundException e) {
|
||||||
|
if (timelineV2ServiceEnabled) {
|
||||||
|
try {
|
||||||
|
return ahsV2Client.getApplicationReport(appId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn("Failed to fetch application report from "
|
||||||
|
+ "ATS v2", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (!historyServiceEnabled) {
|
if (!historyServiceEnabled) {
|
||||||
// Just throw it as usual if historyService is not enabled.
|
// Just throw it as usual if historyService is not enabled.
|
||||||
throw e;
|
throw e;
|
||||||
@ -726,15 +752,24 @@ public ApplicationAttemptReport getApplicationAttemptReport(
|
|||||||
.getApplicationAttemptReport(request);
|
.getApplicationAttemptReport(request);
|
||||||
return response.getApplicationAttemptReport();
|
return response.getApplicationAttemptReport();
|
||||||
} catch (YarnException e) {
|
} catch (YarnException e) {
|
||||||
if (!historyServiceEnabled) {
|
|
||||||
// Just throw it as usual if historyService is not enabled.
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
// Even if history-service is enabled, treat all exceptions still the same
|
// Even if history-service is enabled, treat all exceptions still the same
|
||||||
// except the following
|
// except the following
|
||||||
if (e.getClass() != ApplicationNotFoundException.class) {
|
if (e.getClass() != ApplicationNotFoundException.class) {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
if (timelineV2ServiceEnabled) {
|
||||||
|
try {
|
||||||
|
return ahsV2Client.getApplicationAttemptReport(appAttemptId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn("Failed to fetch application attempt report from "
|
||||||
|
+ "ATS v2", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!historyServiceEnabled) {
|
||||||
|
// Just throw it as usual if historyService is not enabled.
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
return historyClient.getApplicationAttemptReport(appAttemptId);
|
return historyClient.getApplicationAttemptReport(appAttemptId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -750,15 +785,23 @@ public List<ApplicationAttemptReport> getApplicationAttempts(
|
|||||||
.getApplicationAttempts(request);
|
.getApplicationAttempts(request);
|
||||||
return response.getApplicationAttemptList();
|
return response.getApplicationAttemptList();
|
||||||
} catch (YarnException e) {
|
} catch (YarnException e) {
|
||||||
if (!historyServiceEnabled) {
|
|
||||||
// Just throw it as usual if historyService is not enabled.
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
// Even if history-service is enabled, treat all exceptions still the same
|
// Even if history-service is enabled, treat all exceptions still the same
|
||||||
// except the following
|
// except the following
|
||||||
if (e.getClass() != ApplicationNotFoundException.class) {
|
if (e.getClass() != ApplicationNotFoundException.class) {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
if (timelineV2ServiceEnabled) {
|
||||||
|
try {
|
||||||
|
return ahsV2Client.getApplicationAttempts(appId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn("Failed to fetch application attempts from "
|
||||||
|
+ "ATS v2", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!historyServiceEnabled) {
|
||||||
|
// Just throw it as usual if historyService is not enabled.
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
return historyClient.getApplicationAttempts(appId);
|
return historyClient.getApplicationAttempts(appId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -774,16 +817,24 @@ public ContainerReport getContainerReport(ContainerId containerId)
|
|||||||
.getContainerReport(request);
|
.getContainerReport(request);
|
||||||
return response.getContainerReport();
|
return response.getContainerReport();
|
||||||
} catch (YarnException e) {
|
} catch (YarnException e) {
|
||||||
if (!historyServiceEnabled) {
|
|
||||||
// Just throw it as usual if historyService is not enabled.
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
// Even if history-service is enabled, treat all exceptions still the same
|
// Even if history-service is enabled, treat all exceptions still the same
|
||||||
// except the following
|
// except the following
|
||||||
if (e.getClass() != ApplicationNotFoundException.class
|
if (e.getClass() != ApplicationNotFoundException.class
|
||||||
&& e.getClass() != ContainerNotFoundException.class) {
|
&& e.getClass() != ContainerNotFoundException.class) {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
if (timelineV2ServiceEnabled) {
|
||||||
|
try {
|
||||||
|
return ahsV2Client.getContainerReport(containerId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn("Failed to fetch container report from "
|
||||||
|
+ "ATS v2", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!historyServiceEnabled) {
|
||||||
|
// Just throw it as usual if historyService is not enabled.
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
return historyClient.getContainerReport(containerId);
|
return historyClient.getContainerReport(containerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -802,71 +853,88 @@ public List<ContainerReport> getContainers(
|
|||||||
GetContainersResponse response = rmClient.getContainers(request);
|
GetContainersResponse response = rmClient.getContainers(request);
|
||||||
containersForAttempt.addAll(response.getContainerList());
|
containersForAttempt.addAll(response.getContainerList());
|
||||||
} catch (YarnException e) {
|
} catch (YarnException e) {
|
||||||
if (e.getClass() != ApplicationNotFoundException.class
|
// Even if history-service is enabled, treat all exceptions still the same
|
||||||
|| !historyServiceEnabled) {
|
// except the following
|
||||||
// If Application is not in RM and history service is enabled then we
|
if (e.getClass() != ApplicationNotFoundException.class) {
|
||||||
// need to check with history service else throw exception.
|
throw e;
|
||||||
|
}
|
||||||
|
if (!historyServiceEnabled && !timelineV2ServiceEnabled) {
|
||||||
|
// if both history server and ATSv2 are not enabled throw exception.
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
appNotFoundInRM = true;
|
appNotFoundInRM = true;
|
||||||
}
|
}
|
||||||
|
// Check with AHS even if found in RM because to capture info of finished
|
||||||
if (historyServiceEnabled) {
|
// containers also
|
||||||
// Check with AHS even if found in RM because to capture info of finished
|
List<ContainerReport> containersListFromAHS = null;
|
||||||
// containers also
|
try {
|
||||||
List<ContainerReport> containersListFromAHS = null;
|
containersListFromAHS =
|
||||||
try {
|
getContainerReportFromHistory(applicationAttemptId);
|
||||||
containersListFromAHS =
|
} catch (IOException e) {
|
||||||
historyClient.getContainers(applicationAttemptId);
|
if (appNotFoundInRM) {
|
||||||
} catch (IOException e) {
|
throw e;
|
||||||
// History service access might be enabled but system metrics publisher
|
}
|
||||||
// is disabled hence app not found exception is possible
|
}
|
||||||
if (appNotFoundInRM) {
|
if (null != containersListFromAHS && containersListFromAHS.size() > 0) {
|
||||||
// app not found in bothM and RM then propagate the exception.
|
// remove duplicates
|
||||||
throw e;
|
Set<ContainerId> containerIdsToBeKeptFromAHS =
|
||||||
}
|
new HashSet<ContainerId>();
|
||||||
|
Iterator<ContainerReport> tmpItr = containersListFromAHS.iterator();
|
||||||
|
while (tmpItr.hasNext()) {
|
||||||
|
containerIdsToBeKeptFromAHS.add(tmpItr.next().getContainerId());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (null != containersListFromAHS && containersListFromAHS.size() > 0) {
|
Iterator<ContainerReport> rmContainers =
|
||||||
// remove duplicates
|
containersForAttempt.iterator();
|
||||||
|
while (rmContainers.hasNext()) {
|
||||||
|
ContainerReport tmp = rmContainers.next();
|
||||||
|
containerIdsToBeKeptFromAHS.remove(tmp.getContainerId());
|
||||||
|
// Remove containers from AHS as container from RM will have latest
|
||||||
|
// information
|
||||||
|
}
|
||||||
|
|
||||||
Set<ContainerId> containerIdsToBeKeptFromAHS =
|
if (containerIdsToBeKeptFromAHS.size() > 0
|
||||||
new HashSet<ContainerId>();
|
&& containersListFromAHS.size() != containerIdsToBeKeptFromAHS
|
||||||
Iterator<ContainerReport> tmpItr = containersListFromAHS.iterator();
|
.size()) {
|
||||||
while (tmpItr.hasNext()) {
|
Iterator<ContainerReport> containersFromHS =
|
||||||
containerIdsToBeKeptFromAHS.add(tmpItr.next().getContainerId());
|
containersListFromAHS.iterator();
|
||||||
}
|
while (containersFromHS.hasNext()) {
|
||||||
|
ContainerReport containerReport = containersFromHS.next();
|
||||||
Iterator<ContainerReport> rmContainers =
|
if (containerIdsToBeKeptFromAHS.contains(containerReport
|
||||||
containersForAttempt.iterator();
|
.getContainerId())) {
|
||||||
while (rmContainers.hasNext()) {
|
containersForAttempt.add(containerReport);
|
||||||
ContainerReport tmp = rmContainers.next();
|
|
||||||
containerIdsToBeKeptFromAHS.remove(tmp.getContainerId());
|
|
||||||
// Remove containers from AHS as container from RM will have latest
|
|
||||||
// information
|
|
||||||
}
|
|
||||||
|
|
||||||
if (containerIdsToBeKeptFromAHS.size() > 0
|
|
||||||
&& containersListFromAHS.size() != containerIdsToBeKeptFromAHS
|
|
||||||
.size()) {
|
|
||||||
Iterator<ContainerReport> containersFromHS =
|
|
||||||
containersListFromAHS.iterator();
|
|
||||||
while (containersFromHS.hasNext()) {
|
|
||||||
ContainerReport containerReport = containersFromHS.next();
|
|
||||||
if (containerIdsToBeKeptFromAHS.contains(containerReport
|
|
||||||
.getContainerId())) {
|
|
||||||
containersForAttempt.add(containerReport);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else if (containersListFromAHS.size() == containerIdsToBeKeptFromAHS
|
|
||||||
.size()) {
|
|
||||||
containersForAttempt.addAll(containersListFromAHS);
|
|
||||||
}
|
}
|
||||||
|
} else if (containersListFromAHS.size() == containerIdsToBeKeptFromAHS
|
||||||
|
.size()) {
|
||||||
|
containersForAttempt.addAll(containersListFromAHS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return containersForAttempt;
|
return containersForAttempt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<ContainerReport> getContainerReportFromHistory(
|
||||||
|
ApplicationAttemptId applicationAttemptId)
|
||||||
|
throws IOException, YarnException {
|
||||||
|
List<ContainerReport> containersListFromAHS = null;
|
||||||
|
if (timelineV2ServiceEnabled) {
|
||||||
|
try {
|
||||||
|
containersListFromAHS = ahsV2Client.getContainers(applicationAttemptId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Got an error while fetching container report from ATSv2", e);
|
||||||
|
if (historyServiceEnabled) {
|
||||||
|
containersListFromAHS = historyClient.getContainers(
|
||||||
|
applicationAttemptId);
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (historyServiceEnabled) {
|
||||||
|
containersListFromAHS = historyClient.getContainers(applicationAttemptId);
|
||||||
|
}
|
||||||
|
return containersListFromAHS;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void moveApplicationAcrossQueues(ApplicationId appId,
|
public void moveApplicationAcrossQueues(ApplicationId appId,
|
||||||
String queue) throws YarnException, IOException {
|
String queue) throws YarnException, IOException {
|
||||||
|
@ -96,6 +96,7 @@ public class LogsCLI extends Configured implements Tool {
|
|||||||
|
|
||||||
private static final String CONTAINER_ID_OPTION = "containerId";
|
private static final String CONTAINER_ID_OPTION = "containerId";
|
||||||
private static final String APPLICATION_ID_OPTION = "applicationId";
|
private static final String APPLICATION_ID_OPTION = "applicationId";
|
||||||
|
private static final String CLUSTER_ID_OPTION = "clusterId";
|
||||||
private static final String NODE_ADDRESS_OPTION = "nodeAddress";
|
private static final String NODE_ADDRESS_OPTION = "nodeAddress";
|
||||||
private static final String APP_OWNER_OPTION = "appOwner";
|
private static final String APP_OWNER_OPTION = "appOwner";
|
||||||
private static final String AM_CONTAINER_OPTION = "am";
|
private static final String AM_CONTAINER_OPTION = "am";
|
||||||
@ -134,7 +135,6 @@ public class LogsCLI extends Configured implements Tool {
|
|||||||
@Override
|
@Override
|
||||||
public int run(String[] args) throws Exception {
|
public int run(String[] args) throws Exception {
|
||||||
try {
|
try {
|
||||||
yarnClient = createYarnClient();
|
|
||||||
webServiceClient = new Client(new URLConnectionClientHandler(
|
webServiceClient = new Client(new URLConnectionClientHandler(
|
||||||
new HttpURLConnectionFactory() {
|
new HttpURLConnectionFactory() {
|
||||||
@Override
|
@Override
|
||||||
@ -171,6 +171,7 @@ private int runCommand(String[] args) throws Exception {
|
|||||||
}
|
}
|
||||||
CommandLineParser parser = new GnuParser();
|
CommandLineParser parser = new GnuParser();
|
||||||
String appIdStr = null;
|
String appIdStr = null;
|
||||||
|
String clusterIdStr = null;
|
||||||
String containerIdStr = null;
|
String containerIdStr = null;
|
||||||
String nodeAddress = null;
|
String nodeAddress = null;
|
||||||
String appOwner = null;
|
String appOwner = null;
|
||||||
@ -207,6 +208,10 @@ private int runCommand(String[] args) throws Exception {
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (commandLine.hasOption(CLUSTER_ID_OPTION)) {
|
||||||
|
clusterIdStr = commandLine.getOptionValue(CLUSTER_ID_OPTION);
|
||||||
|
getConf().set(YarnConfiguration.RM_CLUSTER_ID, clusterIdStr);
|
||||||
|
}
|
||||||
if (commandLine.hasOption(PER_CONTAINER_LOG_FILES_OPTION)) {
|
if (commandLine.hasOption(PER_CONTAINER_LOG_FILES_OPTION)) {
|
||||||
logFiles = commandLine.getOptionValues(PER_CONTAINER_LOG_FILES_OPTION);
|
logFiles = commandLine.getOptionValues(PER_CONTAINER_LOG_FILES_OPTION);
|
||||||
}
|
}
|
||||||
@ -303,6 +308,8 @@ private int runCommand(String[] args) throws Exception {
|
|||||||
LogCLIHelpers logCliHelper = new LogCLIHelpers();
|
LogCLIHelpers logCliHelper = new LogCLIHelpers();
|
||||||
logCliHelper.setConf(getConf());
|
logCliHelper.setConf(getConf());
|
||||||
|
|
||||||
|
yarnClient = createYarnClient();
|
||||||
|
|
||||||
YarnApplicationState appState = YarnApplicationState.NEW;
|
YarnApplicationState appState = YarnApplicationState.NEW;
|
||||||
ApplicationReport appReport = null;
|
ApplicationReport appReport = null;
|
||||||
try {
|
try {
|
||||||
@ -824,6 +831,8 @@ private Options createCommandOpts() {
|
|||||||
+ "By default, it will print all available logs."
|
+ "By default, it will print all available logs."
|
||||||
+ " Work with -log_files to get only specific logs. If specified, the"
|
+ " Work with -log_files to get only specific logs. If specified, the"
|
||||||
+ " applicationId can be omitted");
|
+ " applicationId can be omitted");
|
||||||
|
opts.addOption(CLUSTER_ID_OPTION, true, "ClusterId. "
|
||||||
|
+ "By default, it will take default cluster id from the RM");
|
||||||
opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress in the format "
|
opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress in the format "
|
||||||
+ "nodename:port");
|
+ "nodename:port");
|
||||||
opts.addOption(APP_OWNER_OPTION, true,
|
opts.addOption(APP_OWNER_OPTION, true,
|
||||||
@ -892,6 +901,7 @@ private Options createCommandOpts() {
|
|||||||
+ "and fetch all logs.");
|
+ "and fetch all logs.");
|
||||||
opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID");
|
opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID");
|
||||||
opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID");
|
opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID");
|
||||||
|
opts.getOption(CLUSTER_ID_OPTION).setArgName("Cluster ID");
|
||||||
opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address");
|
opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address");
|
||||||
opts.getOption(APP_OWNER_OPTION).setArgName("Application Owner");
|
opts.getOption(APP_OWNER_OPTION).setArgName("Application Owner");
|
||||||
opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers");
|
opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers");
|
||||||
@ -913,6 +923,7 @@ private Options createPrintOpts(Options commandOpts) {
|
|||||||
Options printOpts = new Options();
|
Options printOpts = new Options();
|
||||||
printOpts.addOption(commandOpts.getOption(HELP_CMD));
|
printOpts.addOption(commandOpts.getOption(HELP_CMD));
|
||||||
printOpts.addOption(commandOpts.getOption(CONTAINER_ID_OPTION));
|
printOpts.addOption(commandOpts.getOption(CONTAINER_ID_OPTION));
|
||||||
|
printOpts.addOption(commandOpts.getOption(CLUSTER_ID_OPTION));
|
||||||
printOpts.addOption(commandOpts.getOption(NODE_ADDRESS_OPTION));
|
printOpts.addOption(commandOpts.getOption(NODE_ADDRESS_OPTION));
|
||||||
printOpts.addOption(commandOpts.getOption(APP_OWNER_OPTION));
|
printOpts.addOption(commandOpts.getOption(APP_OWNER_OPTION));
|
||||||
printOpts.addOption(commandOpts.getOption(AM_CONTAINER_OPTION));
|
printOpts.addOption(commandOpts.getOption(AM_CONTAINER_OPTION));
|
||||||
|
@ -0,0 +1,240 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
|
import org.apache.hadoop.yarn.client.api.TimelineReaderClient;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is to test class {@link AHSv2ClientImpl).
|
||||||
|
*/
|
||||||
|
public class TestAHSv2ClientImpl {
|
||||||
|
|
||||||
|
private AHSv2ClientImpl client;
|
||||||
|
private TimelineReaderClient spyTimelineReaderClient;
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
|
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||||
|
client = new AHSv2ClientImpl();
|
||||||
|
spyTimelineReaderClient = mock(TimelineReaderClient.class);
|
||||||
|
client.setReaderClient(spyTimelineReaderClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetContainerReport() throws IOException, YarnException {
|
||||||
|
final ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
final ApplicationAttemptId appAttemptId =
|
||||||
|
ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
final ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
|
||||||
|
when(spyTimelineReaderClient.getContainerEntity(containerId, "ALL", null))
|
||||||
|
.thenReturn(createContainerEntity(containerId));
|
||||||
|
ContainerReport report = client.getContainerReport(containerId);
|
||||||
|
Assert.assertEquals(report.getContainerId(), containerId);
|
||||||
|
Assert.assertEquals(report.getAssignedNode().getHost(), "test host");
|
||||||
|
Assert.assertEquals(report.getAssignedNode().getPort(), 100);
|
||||||
|
Assert.assertEquals(report.getAllocatedResource().getVirtualCores(), 8);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAppAttemptReport() throws IOException, YarnException {
|
||||||
|
final ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
final ApplicationAttemptId appAttemptId =
|
||||||
|
ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
when(spyTimelineReaderClient.getApplicationAttemptEntity(appAttemptId,
|
||||||
|
"ALL", null))
|
||||||
|
.thenReturn(createAppAttemptTimelineEntity(appAttemptId));
|
||||||
|
ApplicationAttemptReport report =
|
||||||
|
client.getApplicationAttemptReport(appAttemptId);
|
||||||
|
Assert.assertEquals(report.getApplicationAttemptId(), appAttemptId);
|
||||||
|
Assert.assertEquals(report.getFinishTime(), Integer.MAX_VALUE + 2L);
|
||||||
|
Assert.assertEquals(report.getOriginalTrackingUrl(),
|
||||||
|
"test original tracking url");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAppReport() throws IOException, YarnException {
|
||||||
|
final ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
when(spyTimelineReaderClient.getApplicationEntity(appId, "ALL", null))
|
||||||
|
.thenReturn(createApplicationTimelineEntity(appId, false, false));
|
||||||
|
ApplicationReport report = client.getApplicationReport(appId);
|
||||||
|
Assert.assertEquals(report.getApplicationId(), appId);
|
||||||
|
Assert.assertEquals(report.getAppNodeLabelExpression(), "test_node_label");
|
||||||
|
Assert.assertTrue(report.getApplicationTags().contains("Test_APP_TAGS_1"));
|
||||||
|
Assert.assertEquals(report.getYarnApplicationState(),
|
||||||
|
YarnApplicationState.FINISHED);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TimelineEntity createApplicationTimelineEntity(
|
||||||
|
ApplicationId appId, boolean emptyACLs,
|
||||||
|
boolean wrongAppId) {
|
||||||
|
TimelineEntity entity = new TimelineEntity();
|
||||||
|
entity.setType(ApplicationMetricsConstants.ENTITY_TYPE);
|
||||||
|
if (wrongAppId) {
|
||||||
|
entity.setId("wrong_app_id");
|
||||||
|
} else {
|
||||||
|
entity.setId(appId.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, "test app");
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
|
||||||
|
"test app type");
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, "user1");
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
|
||||||
|
"test queue");
|
||||||
|
entityInfo.put(
|
||||||
|
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, "false");
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
|
||||||
|
Priority.newInstance(0));
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
|
||||||
|
Integer.MAX_VALUE + 1L);
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.APP_MEM_METRICS, 123);
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.APP_CPU_METRICS, 345);
|
||||||
|
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS, 456);
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS, 789);
|
||||||
|
|
||||||
|
if (emptyACLs) {
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, "");
|
||||||
|
} else {
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
|
||||||
|
"user2");
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<String> appTags = new HashSet<String>();
|
||||||
|
appTags.add("Test_APP_TAGS_1");
|
||||||
|
appTags.add("Test_APP_TAGS_2");
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, appTags);
|
||||||
|
entity.setInfo(entityInfo);
|
||||||
|
|
||||||
|
Map<String, String> configs = new HashMap<>();
|
||||||
|
configs.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
|
||||||
|
"test_node_label");
|
||||||
|
entity.setConfigs(configs);
|
||||||
|
|
||||||
|
TimelineEvent tEvent = new TimelineEvent();
|
||||||
|
tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
||||||
|
tEvent.setTimestamp(Integer.MAX_VALUE + 1L + appId.getId());
|
||||||
|
entity.addEvent(tEvent);
|
||||||
|
|
||||||
|
// send a YARN_APPLICATION_STATE_UPDATED event
|
||||||
|
// after YARN_APPLICATION_FINISHED
|
||||||
|
// The final YarnApplicationState should not be changed
|
||||||
|
tEvent = new TimelineEvent();
|
||||||
|
tEvent.setId(
|
||||||
|
ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE);
|
||||||
|
tEvent.setTimestamp(Integer.MAX_VALUE + 2L + appId.getId());
|
||||||
|
Map<String, Object> eventInfo = new HashMap<>();
|
||||||
|
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
|
||||||
|
YarnApplicationState.KILLED);
|
||||||
|
tEvent.setInfo(eventInfo);
|
||||||
|
entity.addEvent(tEvent);
|
||||||
|
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TimelineEntity createAppAttemptTimelineEntity(
|
||||||
|
ApplicationAttemptId appAttemptId) {
|
||||||
|
TimelineEntity entity = new TimelineEntity();
|
||||||
|
entity.setType(AppAttemptMetricsConstants.ENTITY_TYPE);
|
||||||
|
entity.setId(appAttemptId.toString());
|
||||||
|
|
||||||
|
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||||
|
entityInfo.put(AppAttemptMetricsConstants.TRACKING_URL_INFO,
|
||||||
|
"test tracking url");
|
||||||
|
entityInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO,
|
||||||
|
"test original tracking url");
|
||||||
|
entityInfo.put(AppAttemptMetricsConstants.HOST_INFO, "test host");
|
||||||
|
entityInfo.put(AppAttemptMetricsConstants.RPC_PORT_INFO, 100);
|
||||||
|
entityInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
|
||||||
|
ContainerId.newContainerId(appAttemptId, 1));
|
||||||
|
entity.setInfo(entityInfo);
|
||||||
|
|
||||||
|
TimelineEvent tEvent = new TimelineEvent();
|
||||||
|
tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
|
||||||
|
tEvent.setTimestamp(Integer.MAX_VALUE + 1L);
|
||||||
|
entity.addEvent(tEvent);
|
||||||
|
|
||||||
|
tEvent = new TimelineEvent();
|
||||||
|
tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
|
||||||
|
tEvent.setTimestamp(Integer.MAX_VALUE + 2L);
|
||||||
|
entity.addEvent(tEvent);
|
||||||
|
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TimelineEntity createContainerEntity(ContainerId containerId) {
|
||||||
|
TimelineEntity entity = new TimelineEntity();
|
||||||
|
entity.setType(ContainerMetricsConstants.ENTITY_TYPE);
|
||||||
|
entity.setId(containerId.toString());
|
||||||
|
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||||
|
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO, 1024);
|
||||||
|
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO, 8);
|
||||||
|
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
|
||||||
|
"test host");
|
||||||
|
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO, 100);
|
||||||
|
entityInfo
|
||||||
|
.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO, -1);
|
||||||
|
entityInfo.put(ContainerMetricsConstants
|
||||||
|
.ALLOCATED_HOST_HTTP_ADDRESS_INFO, "http://test:1234");
|
||||||
|
entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
|
||||||
|
"test diagnostics info");
|
||||||
|
entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO, -1);
|
||||||
|
entityInfo.put(ContainerMetricsConstants.STATE_INFO,
|
||||||
|
ContainerState.COMPLETE.toString());
|
||||||
|
entity.setInfo(entityInfo);
|
||||||
|
|
||||||
|
TimelineEvent tEvent = new TimelineEvent();
|
||||||
|
tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
|
||||||
|
tEvent.setTimestamp(123456);
|
||||||
|
entity.addEvent(tEvent);
|
||||||
|
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
}
|
@ -246,6 +246,9 @@ public void testHelpMessage() throws Exception {
|
|||||||
pw.println(" --client_max_retries to");
|
pw.println(" --client_max_retries to");
|
||||||
pw.println(" create a retry client. The");
|
pw.println(" create a retry client. The");
|
||||||
pw.println(" default value is 1000.");
|
pw.println(" default value is 1000.");
|
||||||
|
pw.println(" -clusterId <Cluster ID> ClusterId. By default, it");
|
||||||
|
pw.println(" will take default cluster id");
|
||||||
|
pw.println(" from the RM");
|
||||||
pw.println(" -containerId <Container ID> ContainerId. By default, it");
|
pw.println(" -containerId <Container ID> ContainerId. By default, it");
|
||||||
pw.println(" will print all available");
|
pw.println(" will print all available");
|
||||||
pw.println(" logs. Work with -log_files");
|
pw.println(" logs. Work with -log_files");
|
||||||
|
@ -0,0 +1,22 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
/** Yarn Common Metrics package. **/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
package org.apache.hadoop.yarn.server.metrics;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
@ -0,0 +1,449 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.util.timeline;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NavigableSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class to generate reports from timeline entities.
|
||||||
|
*/
|
||||||
|
public final class TimelineEntityV2Converter {
|
||||||
|
private TimelineEntityV2Converter() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ContainerReport convertToContainerReport(
|
||||||
|
TimelineEntity entity) {
|
||||||
|
int allocatedMem = 0;
|
||||||
|
int allocatedVcore = 0;
|
||||||
|
String allocatedHost = null;
|
||||||
|
int allocatedPort = -1;
|
||||||
|
int allocatedPriority = 0;
|
||||||
|
long createdTime = 0;
|
||||||
|
long finishedTime = 0;
|
||||||
|
String diagnosticsInfo = null;
|
||||||
|
int exitStatus = ContainerExitStatus.INVALID;
|
||||||
|
ContainerState state = null;
|
||||||
|
String nodeHttpAddress = null;
|
||||||
|
Map<String, Object> entityInfo = entity.getInfo();
|
||||||
|
if (entityInfo != null) {
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO)) {
|
||||||
|
allocatedMem = (Integer) entityInfo.get(
|
||||||
|
ContainerMetricsConstants.ALLOCATED_MEMORY_INFO);
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(ContainerMetricsConstants.ALLOCATED_VCORE_INFO)) {
|
||||||
|
allocatedVcore = (Integer) entityInfo.get(
|
||||||
|
ContainerMetricsConstants.ALLOCATED_VCORE_INFO);
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(ContainerMetricsConstants.ALLOCATED_HOST_INFO)) {
|
||||||
|
allocatedHost =
|
||||||
|
entityInfo
|
||||||
|
.get(ContainerMetricsConstants.ALLOCATED_HOST_INFO)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(ContainerMetricsConstants.ALLOCATED_PORT_INFO)) {
|
||||||
|
allocatedPort = (Integer) entityInfo.get(
|
||||||
|
ContainerMetricsConstants.ALLOCATED_PORT_INFO);
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO)) {
|
||||||
|
allocatedPriority = Integer.parseInt(entityInfo.get(
|
||||||
|
ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO).toString());
|
||||||
|
}
|
||||||
|
if (entityInfo.containsKey(
|
||||||
|
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO)) {
|
||||||
|
nodeHttpAddress =
|
||||||
|
(String) entityInfo.get(
|
||||||
|
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO);
|
||||||
|
}
|
||||||
|
if (entityInfo.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO)) {
|
||||||
|
diagnosticsInfo =
|
||||||
|
entityInfo.get(
|
||||||
|
ContainerMetricsConstants.DIAGNOSTICS_INFO)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
if (entityInfo.containsKey(ContainerMetricsConstants.EXIT_STATUS_INFO)) {
|
||||||
|
exitStatus = (Integer) entityInfo.get(
|
||||||
|
ContainerMetricsConstants.EXIT_STATUS_INFO);
|
||||||
|
}
|
||||||
|
if (entityInfo.containsKey(ContainerMetricsConstants.STATE_INFO)) {
|
||||||
|
state =
|
||||||
|
ContainerState.valueOf(entityInfo.get(
|
||||||
|
ContainerMetricsConstants.STATE_INFO).toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
NavigableSet<TimelineEvent> events = entity.getEvents();
|
||||||
|
if (events != null) {
|
||||||
|
for (TimelineEvent event : events) {
|
||||||
|
if (event.getId().equals(
|
||||||
|
ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE)) {
|
||||||
|
createdTime = event.getTimestamp();
|
||||||
|
} else if (event.getId().equals(
|
||||||
|
ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE)) {
|
||||||
|
finishedTime = event.getTimestamp();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String logUrl = null;
|
||||||
|
NodeId allocatedNode = null;
|
||||||
|
if (allocatedHost != null) {
|
||||||
|
allocatedNode = NodeId.newInstance(allocatedHost, allocatedPort);
|
||||||
|
}
|
||||||
|
return ContainerReport.newInstance(
|
||||||
|
ContainerId.fromString(entity.getId()),
|
||||||
|
Resource.newInstance(allocatedMem, allocatedVcore), allocatedNode,
|
||||||
|
Priority.newInstance(allocatedPriority),
|
||||||
|
createdTime, finishedTime, diagnosticsInfo, logUrl, exitStatus, state,
|
||||||
|
nodeHttpAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ApplicationAttemptReport convertToApplicationAttemptReport(
|
||||||
|
TimelineEntity entity) {
|
||||||
|
String host = null;
|
||||||
|
int rpcPort = -1;
|
||||||
|
ContainerId amContainerId = null;
|
||||||
|
String trackingUrl = null;
|
||||||
|
String originalTrackingUrl = null;
|
||||||
|
String diagnosticsInfo = null;
|
||||||
|
YarnApplicationAttemptState state = null;
|
||||||
|
Map<String, Object> entityInfo = entity.getInfo();
|
||||||
|
long startTime = 0;
|
||||||
|
long finishTime = 0;
|
||||||
|
|
||||||
|
if (entityInfo != null) {
|
||||||
|
if (entityInfo.containsKey(AppAttemptMetricsConstants.HOST_INFO)) {
|
||||||
|
host =
|
||||||
|
entityInfo.get(AppAttemptMetricsConstants.HOST_INFO)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(AppAttemptMetricsConstants.RPC_PORT_INFO)) {
|
||||||
|
rpcPort = (Integer) entityInfo.get(
|
||||||
|
AppAttemptMetricsConstants.RPC_PORT_INFO);
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)) {
|
||||||
|
amContainerId =
|
||||||
|
ContainerId.fromString(entityInfo.get(
|
||||||
|
AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)
|
||||||
|
.toString());
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(AppAttemptMetricsConstants.TRACKING_URL_INFO)) {
|
||||||
|
trackingUrl =
|
||||||
|
entityInfo.get(
|
||||||
|
AppAttemptMetricsConstants.TRACKING_URL_INFO)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(
|
||||||
|
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO)) {
|
||||||
|
originalTrackingUrl =
|
||||||
|
entityInfo
|
||||||
|
.get(
|
||||||
|
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(AppAttemptMetricsConstants.DIAGNOSTICS_INFO)) {
|
||||||
|
diagnosticsInfo =
|
||||||
|
entityInfo.get(
|
||||||
|
AppAttemptMetricsConstants.DIAGNOSTICS_INFO)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(AppAttemptMetricsConstants.STATE_INFO)) {
|
||||||
|
state =
|
||||||
|
YarnApplicationAttemptState.valueOf(entityInfo.get(
|
||||||
|
AppAttemptMetricsConstants.STATE_INFO)
|
||||||
|
.toString());
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)) {
|
||||||
|
amContainerId =
|
||||||
|
ContainerId.fromString(entityInfo.get(
|
||||||
|
AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)
|
||||||
|
.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
NavigableSet<TimelineEvent> events = entity.getEvents();
|
||||||
|
if (events != null) {
|
||||||
|
for (TimelineEvent event : events) {
|
||||||
|
if (event.getId().equals(
|
||||||
|
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) {
|
||||||
|
startTime = event.getTimestamp();
|
||||||
|
} else if (event.getId().equals(
|
||||||
|
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) {
|
||||||
|
finishTime = event.getTimestamp();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ApplicationAttemptReport.newInstance(
|
||||||
|
ApplicationAttemptId.fromString(entity.getId()),
|
||||||
|
host, rpcPort, trackingUrl, originalTrackingUrl, diagnosticsInfo,
|
||||||
|
state, amContainerId, startTime, finishTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ApplicationReport convertToApplicationReport(
|
||||||
|
TimelineEntity entity) {
|
||||||
|
String user = null;
|
||||||
|
String queue = null;
|
||||||
|
String name = null;
|
||||||
|
String type = null;
|
||||||
|
boolean unmanagedApplication = false;
|
||||||
|
long createdTime = 0;
|
||||||
|
long finishedTime = 0;
|
||||||
|
float progress = 0.0f;
|
||||||
|
int applicationPriority = 0;
|
||||||
|
ApplicationAttemptId latestApplicationAttemptId = null;
|
||||||
|
String diagnosticsInfo = null;
|
||||||
|
FinalApplicationStatus finalStatus = FinalApplicationStatus.UNDEFINED;
|
||||||
|
YarnApplicationState state = YarnApplicationState.ACCEPTED;
|
||||||
|
ApplicationResourceUsageReport appResources = null;
|
||||||
|
Set<String> appTags = null;
|
||||||
|
String appNodeLabelExpression = null;
|
||||||
|
String amNodeLabelExpression = null;
|
||||||
|
Map<String, Object> entityInfo = entity.getInfo();
|
||||||
|
if (entityInfo != null) {
|
||||||
|
if (entityInfo.containsKey(
|
||||||
|
ApplicationMetricsConstants.USER_ENTITY_INFO)) {
|
||||||
|
user =
|
||||||
|
entityInfo.get(ApplicationMetricsConstants.USER_ENTITY_INFO)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
if (entityInfo.containsKey(
|
||||||
|
ApplicationMetricsConstants.QUEUE_ENTITY_INFO)) {
|
||||||
|
queue =
|
||||||
|
entityInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
if (entityInfo.containsKey(
|
||||||
|
ApplicationMetricsConstants.NAME_ENTITY_INFO)) {
|
||||||
|
name =
|
||||||
|
entityInfo.get(ApplicationMetricsConstants.NAME_ENTITY_INFO)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
if (entityInfo.containsKey(
|
||||||
|
ApplicationMetricsConstants.TYPE_ENTITY_INFO)) {
|
||||||
|
type =
|
||||||
|
entityInfo.get(ApplicationMetricsConstants.TYPE_ENTITY_INFO)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
if (entityInfo.containsKey(
|
||||||
|
ApplicationMetricsConstants.TYPE_ENTITY_INFO)) {
|
||||||
|
type =
|
||||||
|
entityInfo.get(ApplicationMetricsConstants.TYPE_ENTITY_INFO)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(
|
||||||
|
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO)) {
|
||||||
|
unmanagedApplication =
|
||||||
|
Boolean.parseBoolean(entityInfo.get(
|
||||||
|
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO)
|
||||||
|
.toString());
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO)) {
|
||||||
|
applicationPriority = Integer.parseInt(entityInfo.get(
|
||||||
|
ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO).toString());
|
||||||
|
}
|
||||||
|
if (entityInfo.containsKey(ApplicationMetricsConstants.APP_TAGS_INFO)) {
|
||||||
|
appTags = new HashSet<>();
|
||||||
|
Object obj = entityInfo.get(ApplicationMetricsConstants.APP_TAGS_INFO);
|
||||||
|
if (obj != null && obj instanceof Collection<?>) {
|
||||||
|
for(Object o : (Collection<?>)obj) {
|
||||||
|
if (o != null) {
|
||||||
|
appTags.add(o.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(
|
||||||
|
ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO)) {
|
||||||
|
latestApplicationAttemptId = ApplicationAttemptId.fromString(
|
||||||
|
entityInfo.get(
|
||||||
|
ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO)
|
||||||
|
.toString());
|
||||||
|
}
|
||||||
|
if (entityInfo.containsKey(
|
||||||
|
ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)) {
|
||||||
|
diagnosticsInfo =
|
||||||
|
entityInfo.get(
|
||||||
|
ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)) {
|
||||||
|
finalStatus =
|
||||||
|
FinalApplicationStatus.valueOf(entityInfo.get(
|
||||||
|
ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)
|
||||||
|
.toString());
|
||||||
|
}
|
||||||
|
if (entityInfo
|
||||||
|
.containsKey(ApplicationMetricsConstants.STATE_EVENT_INFO)) {
|
||||||
|
state =
|
||||||
|
YarnApplicationState.valueOf(entityInfo.get(
|
||||||
|
ApplicationMetricsConstants.STATE_EVENT_INFO).toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, String> configs = entity.getConfigs();
|
||||||
|
if (configs
|
||||||
|
.containsKey(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION)) {
|
||||||
|
appNodeLabelExpression = configs
|
||||||
|
.get(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION);
|
||||||
|
}
|
||||||
|
if (configs
|
||||||
|
.containsKey(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION)) {
|
||||||
|
amNodeLabelExpression =
|
||||||
|
configs.get(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION);
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<TimelineMetric> metrics = entity.getMetrics();
|
||||||
|
if (metrics != null) {
|
||||||
|
long vcoreSeconds = 0;
|
||||||
|
long memorySeconds = 0;
|
||||||
|
long preemptedVcoreSeconds = 0;
|
||||||
|
long preemptedMemorySeconds = 0;
|
||||||
|
|
||||||
|
for (TimelineMetric metric : metrics) {
|
||||||
|
switch (metric.getId()) {
|
||||||
|
case ApplicationMetricsConstants.APP_CPU_METRICS:
|
||||||
|
vcoreSeconds = getAverageValue(metric.getValues().values());
|
||||||
|
break;
|
||||||
|
case ApplicationMetricsConstants.APP_MEM_METRICS:
|
||||||
|
memorySeconds = getAverageValue(metric.getValues().values());
|
||||||
|
break;
|
||||||
|
case ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS:
|
||||||
|
preemptedVcoreSeconds = getAverageValue(metric.getValues().values());
|
||||||
|
break;
|
||||||
|
case ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS:
|
||||||
|
preemptedVcoreSeconds = getAverageValue(metric.getValues().values());
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// Should not happen..
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Map<String, Long> resourceSecondsMap = new HashMap<>();
|
||||||
|
Map<String, Long> preemptedResoureSecondsMap = new HashMap<>();
|
||||||
|
resourceSecondsMap
|
||||||
|
.put(ResourceInformation.MEMORY_MB.getName(), memorySeconds);
|
||||||
|
resourceSecondsMap
|
||||||
|
.put(ResourceInformation.VCORES.getName(), vcoreSeconds);
|
||||||
|
preemptedResoureSecondsMap.put(ResourceInformation.MEMORY_MB.getName(),
|
||||||
|
preemptedMemorySeconds);
|
||||||
|
preemptedResoureSecondsMap
|
||||||
|
.put(ResourceInformation.VCORES.getName(), preemptedVcoreSeconds);
|
||||||
|
|
||||||
|
appResources = ApplicationResourceUsageReport
|
||||||
|
.newInstance(0, 0, null, null, null, resourceSecondsMap, 0, 0,
|
||||||
|
preemptedResoureSecondsMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
NavigableSet<TimelineEvent> events = entity.getEvents();
|
||||||
|
long updatedTimeStamp = 0L;
|
||||||
|
if (events != null) {
|
||||||
|
for (TimelineEvent event : events) {
|
||||||
|
if (event.getId().equals(
|
||||||
|
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
|
||||||
|
createdTime = event.getTimestamp();
|
||||||
|
} else if (event.getId().equals(
|
||||||
|
ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) {
|
||||||
|
// This type of events are parsed in time-stamp descending order
|
||||||
|
// which means the previous event could override the information
|
||||||
|
// from the later same type of event. Hence compare timestamp
|
||||||
|
// before over writing.
|
||||||
|
if (event.getTimestamp() > updatedTimeStamp) {
|
||||||
|
updatedTimeStamp = event.getTimestamp();
|
||||||
|
}
|
||||||
|
} else if (event.getId().equals(
|
||||||
|
ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE)) {
|
||||||
|
Map<String, Object> eventInfo = event.getInfo();
|
||||||
|
if (eventInfo == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (eventInfo.containsKey(
|
||||||
|
ApplicationMetricsConstants.STATE_EVENT_INFO)) {
|
||||||
|
if (state == YarnApplicationState.ACCEPTED) {
|
||||||
|
state = YarnApplicationState.valueOf(eventInfo.get(
|
||||||
|
ApplicationMetricsConstants.STATE_EVENT_INFO).toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (event.getId().equals(
|
||||||
|
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
|
||||||
|
progress=1.0F;
|
||||||
|
state = YarnApplicationState.FINISHED;
|
||||||
|
finishedTime = event.getTimestamp();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ApplicationReport.newInstance(
|
||||||
|
ApplicationId.fromString(entity.getId()),
|
||||||
|
latestApplicationAttemptId, user, queue, name, null, -1, null, state,
|
||||||
|
diagnosticsInfo, null, createdTime, finishedTime, finalStatus,
|
||||||
|
appResources, null, progress, type, null, appTags, unmanagedApplication,
|
||||||
|
Priority.newInstance(applicationPriority), appNodeLabelExpression,
|
||||||
|
amNodeLabelExpression);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long getAverageValue(Collection<Number> values) {
|
||||||
|
if (values == null || values.isEmpty()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
long sum = 0;
|
||||||
|
for (Number value : values) {
|
||||||
|
sum += value.longValue();
|
||||||
|
}
|
||||||
|
return sum/values.size();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user