diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java index b590a51f10..6da8e66b8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.client.api.impl.AHSClientImpl; +import org.apache.hadoop.yarn.client.api.impl.AHSv2ClientImpl; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -46,8 +47,13 @@ public abstract class AHSClient extends AbstractService { */ @Public public static AHSClient createAHSClient() { - AHSClient client = new AHSClientImpl(); - return client; + return new AHSClientImpl(); + } + + @InterfaceStability.Evolving + @Public + public static AHSClient createAHSv2Client() { + return new AHSv2ClientImpl(); } @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AHSv2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AHSv2ClientImpl.java new file mode 100644 index 0000000000..e797c28162 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AHSv2ClientImpl.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.client.api.AHSClient; +import org.apache.hadoop.yarn.client.api.TimelineReaderClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.timeline.TimelineEntityV2Converter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This class provides Application History client implementation which uses + * ATS v2 as backend. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class AHSv2ClientImpl extends AHSClient { + private TimelineReaderClient readerClient; + + public AHSv2ClientImpl() { + super(AHSv2ClientImpl.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) { + readerClient = TimelineReaderClient.createTimelineReaderClient(); + readerClient.init(conf); + } + + @VisibleForTesting + protected void setReaderClient(TimelineReaderClient readerClient) { + this.readerClient = readerClient; + } + + @Override + public void serviceStart() { + readerClient.start(); + } + + @Override + public void serviceStop() { + readerClient.stop(); + } + + @Override + public ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + TimelineEntity entity = readerClient.getApplicationEntity( + appId, "ALL", null); + return TimelineEntityV2Converter.convertToApplicationReport(entity); + } + + @Override + public List getApplications() + throws YarnException, IOException { + throw new UnsupportedOperationException("ATSv2.0 doesn't support retrieving" + + " ALL application entities."); + } + + @Override + public ApplicationAttemptReport getApplicationAttemptReport( + ApplicationAttemptId applicationAttemptId) + throws YarnException, IOException { + TimelineEntity entity = readerClient.getApplicationAttemptEntity( + applicationAttemptId, "ALL", null); + return TimelineEntityV2Converter.convertToApplicationAttemptReport(entity); + } + + @Override + public List getApplicationAttempts( + ApplicationId applicationId) throws YarnException, IOException { + List entities = readerClient.getApplicationAttemptEntities( + applicationId, "ALL", null, 0, null); + List appAttemptReports = + new ArrayList<>(); + if (entities != null && !entities.isEmpty()) { + for (TimelineEntity entity : entities) { + ApplicationAttemptReport container = + TimelineEntityV2Converter.convertToApplicationAttemptReport( + entity); + appAttemptReports.add(container); + } + } + return appAttemptReports; + } + + @Override + public ContainerReport getContainerReport(ContainerId containerId) + throws YarnException, IOException { + TimelineEntity entity = readerClient.getContainerEntity(containerId, + "ALL", null); + return TimelineEntityV2Converter.convertToContainerReport(entity); + } + + @Override + public List getContainers(ApplicationAttemptId + applicationAttemptId) throws YarnException, IOException { + ApplicationId appId = applicationAttemptId.getApplicationId(); + Map filters = new HashMap<>(); + filters.put("infofilters", "SYSTEM_INFO_PARENT_ENTITY eq {\"id\":\"" + + applicationAttemptId.toString() + + "\",\"type\":\"YARN_APPLICATION_ATTEMPT\"}"); + List entities = readerClient.getContainerEntities( + appId, "ALL", filters, 0, null); + List containers = + new ArrayList<>(); + if (entities != null && !entities.isEmpty()) { + for (TimelineEntity entity : entities) { + ContainerReport container = + TimelineEntityV2Converter.convertToContainerReport( + entity); + containers.add(container); + } + } + return containers; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index acfc3ff70b..227f7ed70a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -149,6 +149,7 @@ public class YarnClientImpl extends YarnClient { private long asyncApiPollIntervalMillis; private long asyncApiPollTimeoutMillis; protected AHSClient historyClient; + private AHSClient ahsV2Client; private boolean historyServiceEnabled; protected volatile TimelineClient timelineClient; @VisibleForTesting @@ -159,6 +160,8 @@ public class YarnClientImpl extends YarnClient { protected boolean timelineServiceBestEffort; private boolean loadResourceTypesFromServer; + private boolean timelineV2ServiceEnabled; + private static final String ROOT = "root"; public YarnClientImpl() { @@ -188,6 +191,10 @@ protected void serviceInit(Configuration conf) throws Exception { timelineService = TimelineUtils.buildTimelineTokenService(conf); } + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + timelineV2ServiceEnabled = true; + } + // The AHSClientService is enabled by default when we start the // TimelineServer which means we are able to get history information // for applications/applicationAttempts/containers by using ahsClient @@ -200,6 +207,11 @@ protected void serviceInit(Configuration conf) throws Exception { historyClient.init(conf); } + if (timelineV2ServiceEnabled) { + ahsV2Client = AHSClient.createAHSv2Client(); + ahsV2Client.init(conf); + } + timelineServiceBestEffort = conf.getBoolean( YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_BEST_EFFORT); @@ -223,6 +235,9 @@ protected void serviceStart() throws Exception { if (historyServiceEnabled) { historyClient.start(); } + if (timelineV2ServiceEnabled) { + ahsV2Client.start(); + } } catch (IOException e) { throw new YarnRuntimeException(e); } @@ -244,6 +259,9 @@ protected void serviceStop() throws Exception { if (historyServiceEnabled) { historyClient.stop(); } + if (timelineV2ServiceEnabled) { + ahsV2Client.stop(); + } if (timelineClient != null) { timelineClient.stop(); } @@ -516,6 +534,14 @@ public ApplicationReport getApplicationReport(ApplicationId appId) request.setApplicationId(appId); response = rmClient.getApplicationReport(request); } catch (ApplicationNotFoundException e) { + if (timelineV2ServiceEnabled) { + try { + return ahsV2Client.getApplicationReport(appId); + } catch (Exception ex) { + LOG.warn("Failed to fetch application report from " + + "ATS v2", ex); + } + } if (!historyServiceEnabled) { // Just throw it as usual if historyService is not enabled. throw e; @@ -726,15 +752,24 @@ public ApplicationAttemptReport getApplicationAttemptReport( .getApplicationAttemptReport(request); return response.getApplicationAttemptReport(); } catch (YarnException e) { - if (!historyServiceEnabled) { - // Just throw it as usual if historyService is not enabled. - throw e; - } + // Even if history-service is enabled, treat all exceptions still the same // except the following if (e.getClass() != ApplicationNotFoundException.class) { throw e; } + if (timelineV2ServiceEnabled) { + try { + return ahsV2Client.getApplicationAttemptReport(appAttemptId); + } catch (Exception ex) { + LOG.warn("Failed to fetch application attempt report from " + + "ATS v2", ex); + } + } + if (!historyServiceEnabled) { + // Just throw it as usual if historyService is not enabled. + throw e; + } return historyClient.getApplicationAttemptReport(appAttemptId); } } @@ -750,15 +785,23 @@ public List getApplicationAttempts( .getApplicationAttempts(request); return response.getApplicationAttemptList(); } catch (YarnException e) { - if (!historyServiceEnabled) { - // Just throw it as usual if historyService is not enabled. - throw e; - } // Even if history-service is enabled, treat all exceptions still the same // except the following if (e.getClass() != ApplicationNotFoundException.class) { throw e; } + if (timelineV2ServiceEnabled) { + try { + return ahsV2Client.getApplicationAttempts(appId); + } catch (Exception ex) { + LOG.warn("Failed to fetch application attempts from " + + "ATS v2", ex); + } + } + if (!historyServiceEnabled) { + // Just throw it as usual if historyService is not enabled. + throw e; + } return historyClient.getApplicationAttempts(appId); } } @@ -774,16 +817,24 @@ public ContainerReport getContainerReport(ContainerId containerId) .getContainerReport(request); return response.getContainerReport(); } catch (YarnException e) { - if (!historyServiceEnabled) { - // Just throw it as usual if historyService is not enabled. - throw e; - } // Even if history-service is enabled, treat all exceptions still the same // except the following if (e.getClass() != ApplicationNotFoundException.class && e.getClass() != ContainerNotFoundException.class) { throw e; } + if (timelineV2ServiceEnabled) { + try { + return ahsV2Client.getContainerReport(containerId); + } catch (Exception ex) { + LOG.warn("Failed to fetch container report from " + + "ATS v2", ex); + } + } + if (!historyServiceEnabled) { + // Just throw it as usual if historyService is not enabled. + throw e; + } return historyClient.getContainerReport(containerId); } } @@ -802,71 +853,88 @@ public List getContainers( GetContainersResponse response = rmClient.getContainers(request); containersForAttempt.addAll(response.getContainerList()); } catch (YarnException e) { - if (e.getClass() != ApplicationNotFoundException.class - || !historyServiceEnabled) { - // If Application is not in RM and history service is enabled then we - // need to check with history service else throw exception. + // Even if history-service is enabled, treat all exceptions still the same + // except the following + if (e.getClass() != ApplicationNotFoundException.class) { + throw e; + } + if (!historyServiceEnabled && !timelineV2ServiceEnabled) { + // if both history server and ATSv2 are not enabled throw exception. throw e; } appNotFoundInRM = true; } - - if (historyServiceEnabled) { - // Check with AHS even if found in RM because to capture info of finished - // containers also - List containersListFromAHS = null; - try { - containersListFromAHS = - historyClient.getContainers(applicationAttemptId); - } catch (IOException e) { - // History service access might be enabled but system metrics publisher - // is disabled hence app not found exception is possible - if (appNotFoundInRM) { - // app not found in bothM and RM then propagate the exception. - throw e; - } + // Check with AHS even if found in RM because to capture info of finished + // containers also + List containersListFromAHS = null; + try { + containersListFromAHS = + getContainerReportFromHistory(applicationAttemptId); + } catch (IOException e) { + if (appNotFoundInRM) { + throw e; + } + } + if (null != containersListFromAHS && containersListFromAHS.size() > 0) { + // remove duplicates + Set containerIdsToBeKeptFromAHS = + new HashSet(); + Iterator tmpItr = containersListFromAHS.iterator(); + while (tmpItr.hasNext()) { + containerIdsToBeKeptFromAHS.add(tmpItr.next().getContainerId()); } - if (null != containersListFromAHS && containersListFromAHS.size() > 0) { - // remove duplicates + Iterator rmContainers = + containersForAttempt.iterator(); + while (rmContainers.hasNext()) { + ContainerReport tmp = rmContainers.next(); + containerIdsToBeKeptFromAHS.remove(tmp.getContainerId()); + // Remove containers from AHS as container from RM will have latest + // information + } - Set containerIdsToBeKeptFromAHS = - new HashSet(); - Iterator tmpItr = containersListFromAHS.iterator(); - while (tmpItr.hasNext()) { - containerIdsToBeKeptFromAHS.add(tmpItr.next().getContainerId()); - } - - Iterator rmContainers = - containersForAttempt.iterator(); - while (rmContainers.hasNext()) { - ContainerReport tmp = rmContainers.next(); - containerIdsToBeKeptFromAHS.remove(tmp.getContainerId()); - // Remove containers from AHS as container from RM will have latest - // information - } - - if (containerIdsToBeKeptFromAHS.size() > 0 - && containersListFromAHS.size() != containerIdsToBeKeptFromAHS - .size()) { - Iterator containersFromHS = - containersListFromAHS.iterator(); - while (containersFromHS.hasNext()) { - ContainerReport containerReport = containersFromHS.next(); - if (containerIdsToBeKeptFromAHS.contains(containerReport - .getContainerId())) { - containersForAttempt.add(containerReport); - } + if (containerIdsToBeKeptFromAHS.size() > 0 + && containersListFromAHS.size() != containerIdsToBeKeptFromAHS + .size()) { + Iterator containersFromHS = + containersListFromAHS.iterator(); + while (containersFromHS.hasNext()) { + ContainerReport containerReport = containersFromHS.next(); + if (containerIdsToBeKeptFromAHS.contains(containerReport + .getContainerId())) { + containersForAttempt.add(containerReport); } - } else if (containersListFromAHS.size() == containerIdsToBeKeptFromAHS - .size()) { - containersForAttempt.addAll(containersListFromAHS); } + } else if (containersListFromAHS.size() == containerIdsToBeKeptFromAHS + .size()) { + containersForAttempt.addAll(containersListFromAHS); } } return containersForAttempt; } + private List getContainerReportFromHistory( + ApplicationAttemptId applicationAttemptId) + throws IOException, YarnException { + List containersListFromAHS = null; + if (timelineV2ServiceEnabled) { + try { + containersListFromAHS = ahsV2Client.getContainers(applicationAttemptId); + } catch (Exception e) { + LOG.warn("Got an error while fetching container report from ATSv2", e); + if (historyServiceEnabled) { + containersListFromAHS = historyClient.getContainers( + applicationAttemptId); + } else { + throw e; + } + } + } else if (historyServiceEnabled) { + containersListFromAHS = historyClient.getContainers(applicationAttemptId); + } + return containersListFromAHS; + } + @Override public void moveApplicationAcrossQueues(ApplicationId appId, String queue) throws YarnException, IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index cfd4c79fe7..a1550a53ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -96,6 +96,7 @@ public class LogsCLI extends Configured implements Tool { private static final String CONTAINER_ID_OPTION = "containerId"; private static final String APPLICATION_ID_OPTION = "applicationId"; + private static final String CLUSTER_ID_OPTION = "clusterId"; private static final String NODE_ADDRESS_OPTION = "nodeAddress"; private static final String APP_OWNER_OPTION = "appOwner"; private static final String AM_CONTAINER_OPTION = "am"; @@ -134,7 +135,6 @@ public class LogsCLI extends Configured implements Tool { @Override public int run(String[] args) throws Exception { try { - yarnClient = createYarnClient(); webServiceClient = new Client(new URLConnectionClientHandler( new HttpURLConnectionFactory() { @Override @@ -171,6 +171,7 @@ private int runCommand(String[] args) throws Exception { } CommandLineParser parser = new GnuParser(); String appIdStr = null; + String clusterIdStr = null; String containerIdStr = null; String nodeAddress = null; String appOwner = null; @@ -207,6 +208,10 @@ private int runCommand(String[] args) throws Exception { return -1; } } + if (commandLine.hasOption(CLUSTER_ID_OPTION)) { + clusterIdStr = commandLine.getOptionValue(CLUSTER_ID_OPTION); + getConf().set(YarnConfiguration.RM_CLUSTER_ID, clusterIdStr); + } if (commandLine.hasOption(PER_CONTAINER_LOG_FILES_OPTION)) { logFiles = commandLine.getOptionValues(PER_CONTAINER_LOG_FILES_OPTION); } @@ -303,6 +308,8 @@ private int runCommand(String[] args) throws Exception { LogCLIHelpers logCliHelper = new LogCLIHelpers(); logCliHelper.setConf(getConf()); + yarnClient = createYarnClient(); + YarnApplicationState appState = YarnApplicationState.NEW; ApplicationReport appReport = null; try { @@ -824,6 +831,8 @@ private Options createCommandOpts() { + "By default, it will print all available logs." + " Work with -log_files to get only specific logs. If specified, the" + " applicationId can be omitted"); + opts.addOption(CLUSTER_ID_OPTION, true, "ClusterId. " + + "By default, it will take default cluster id from the RM"); opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress in the format " + "nodename:port"); opts.addOption(APP_OWNER_OPTION, true, @@ -892,6 +901,7 @@ private Options createCommandOpts() { + "and fetch all logs."); opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID"); opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID"); + opts.getOption(CLUSTER_ID_OPTION).setArgName("Cluster ID"); opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address"); opts.getOption(APP_OWNER_OPTION).setArgName("Application Owner"); opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers"); @@ -913,6 +923,7 @@ private Options createPrintOpts(Options commandOpts) { Options printOpts = new Options(); printOpts.addOption(commandOpts.getOption(HELP_CMD)); printOpts.addOption(commandOpts.getOption(CONTAINER_ID_OPTION)); + printOpts.addOption(commandOpts.getOption(CLUSTER_ID_OPTION)); printOpts.addOption(commandOpts.getOption(NODE_ADDRESS_OPTION)); printOpts.addOption(commandOpts.getOption(APP_OWNER_OPTION)); printOpts.addOption(commandOpts.getOption(AM_CONTAINER_OPTION)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSv2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSv2ClientImpl.java new file mode 100644 index 0000000000..99473a35d4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSv2ClientImpl.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.client.api.TimelineReaderClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * This class is to test class {@link AHSv2ClientImpl). + */ +public class TestAHSv2ClientImpl { + + private AHSv2ClientImpl client; + private TimelineReaderClient spyTimelineReaderClient; + @Before + public void setup() { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + client = new AHSv2ClientImpl(); + spyTimelineReaderClient = mock(TimelineReaderClient.class); + client.setReaderClient(spyTimelineReaderClient); + } + + @Test + public void testGetContainerReport() throws IOException, YarnException { + final ApplicationId appId = ApplicationId.newInstance(0, 1); + final ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + final ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); + when(spyTimelineReaderClient.getContainerEntity(containerId, "ALL", null)) + .thenReturn(createContainerEntity(containerId)); + ContainerReport report = client.getContainerReport(containerId); + Assert.assertEquals(report.getContainerId(), containerId); + Assert.assertEquals(report.getAssignedNode().getHost(), "test host"); + Assert.assertEquals(report.getAssignedNode().getPort(), 100); + Assert.assertEquals(report.getAllocatedResource().getVirtualCores(), 8); + } + + @Test + public void testGetAppAttemptReport() throws IOException, YarnException { + final ApplicationId appId = ApplicationId.newInstance(0, 1); + final ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + when(spyTimelineReaderClient.getApplicationAttemptEntity(appAttemptId, + "ALL", null)) + .thenReturn(createAppAttemptTimelineEntity(appAttemptId)); + ApplicationAttemptReport report = + client.getApplicationAttemptReport(appAttemptId); + Assert.assertEquals(report.getApplicationAttemptId(), appAttemptId); + Assert.assertEquals(report.getFinishTime(), Integer.MAX_VALUE + 2L); + Assert.assertEquals(report.getOriginalTrackingUrl(), + "test original tracking url"); + } + + @Test + public void testGetAppReport() throws IOException, YarnException { + final ApplicationId appId = ApplicationId.newInstance(0, 1); + when(spyTimelineReaderClient.getApplicationEntity(appId, "ALL", null)) + .thenReturn(createApplicationTimelineEntity(appId, false, false)); + ApplicationReport report = client.getApplicationReport(appId); + Assert.assertEquals(report.getApplicationId(), appId); + Assert.assertEquals(report.getAppNodeLabelExpression(), "test_node_label"); + Assert.assertTrue(report.getApplicationTags().contains("Test_APP_TAGS_1")); + Assert.assertEquals(report.getYarnApplicationState(), + YarnApplicationState.FINISHED); + } + + private static TimelineEntity createApplicationTimelineEntity( + ApplicationId appId, boolean emptyACLs, + boolean wrongAppId) { + TimelineEntity entity = new TimelineEntity(); + entity.setType(ApplicationMetricsConstants.ENTITY_TYPE); + if (wrongAppId) { + entity.setId("wrong_app_id"); + } else { + entity.setId(appId.toString()); + } + + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, "test app"); + entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, + "test app type"); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, "user1"); + entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + "test queue"); + entityInfo.put( + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, "false"); + entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + Priority.newInstance(0)); + entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, + Integer.MAX_VALUE + 1L); + entityInfo.put(ApplicationMetricsConstants.APP_MEM_METRICS, 123); + entityInfo.put(ApplicationMetricsConstants.APP_CPU_METRICS, 345); + + entityInfo.put(ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS, 456); + entityInfo.put(ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS, 789); + + if (emptyACLs) { + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, ""); + } else { + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, + "user2"); + } + + Set appTags = new HashSet(); + appTags.add("Test_APP_TAGS_1"); + appTags.add("Test_APP_TAGS_2"); + entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, appTags); + entity.setInfo(entityInfo); + + Map configs = new HashMap<>(); + configs.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, + "test_node_label"); + entity.setConfigs(configs); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 1L + appId.getId()); + entity.addEvent(tEvent); + + // send a YARN_APPLICATION_STATE_UPDATED event + // after YARN_APPLICATION_FINISHED + // The final YarnApplicationState should not be changed + tEvent = new TimelineEvent(); + tEvent.setId( + ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 2L + appId.getId()); + Map eventInfo = new HashMap<>(); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + YarnApplicationState.KILLED); + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + + return entity; + } + + private static TimelineEntity createAppAttemptTimelineEntity( + ApplicationAttemptId appAttemptId) { + TimelineEntity entity = new TimelineEntity(); + entity.setType(AppAttemptMetricsConstants.ENTITY_TYPE); + entity.setId(appAttemptId.toString()); + + Map entityInfo = new HashMap(); + entityInfo.put(AppAttemptMetricsConstants.TRACKING_URL_INFO, + "test tracking url"); + entityInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO, + "test original tracking url"); + entityInfo.put(AppAttemptMetricsConstants.HOST_INFO, "test host"); + entityInfo.put(AppAttemptMetricsConstants.RPC_PORT_INFO, 100); + entityInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO, + ContainerId.newContainerId(appAttemptId, 1)); + entity.setInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 1L); + entity.addEvent(tEvent); + + tEvent = new TimelineEvent(); + tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(Integer.MAX_VALUE + 2L); + entity.addEvent(tEvent); + + return entity; + } + + private static TimelineEntity createContainerEntity(ContainerId containerId) { + TimelineEntity entity = new TimelineEntity(); + entity.setType(ContainerMetricsConstants.ENTITY_TYPE); + entity.setId(containerId.toString()); + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO, 1024); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO, 8); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO, + "test host"); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO, 100); + entityInfo + .put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO, -1); + entityInfo.put(ContainerMetricsConstants + .ALLOCATED_HOST_HTTP_ADDRESS_INFO, "http://test:1234"); + entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO, + "test diagnostics info"); + entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO, -1); + entityInfo.put(ContainerMetricsConstants.STATE_INFO, + ContainerState.COMPLETE.toString()); + entity.setInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE); + tEvent.setTimestamp(123456); + entity.addEvent(tEvent); + + return entity; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index d8440b8820..5366769340 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -246,6 +246,9 @@ public void testHelpMessage() throws Exception { pw.println(" --client_max_retries to"); pw.println(" create a retry client. The"); pw.println(" default value is 1000."); + pw.println(" -clusterId ClusterId. By default, it"); + pw.println(" will take default cluster id"); + pw.println(" from the RM"); pw.println(" -containerId ContainerId. By default, it"); pw.println(" will print all available"); pw.println(" logs. Work with -log_files"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java similarity index 99% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java index 797aad50b9..2a62a79549 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/AppAttemptMetricsConstants.java @@ -36,7 +36,7 @@ public class AppAttemptMetricsConstants { public static final String PARENT_PRIMARY_FILTER = "YARN_APPLICATION_ATTEMPT_PARENT"; - + public static final String TRACKING_URL_INFO = "YARN_APPLICATION_ATTEMPT_TRACKING_URL"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java similarity index 99% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java index 4cec409bb4..93da565341 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ApplicationMetricsConstants.java @@ -69,10 +69,10 @@ public class ApplicationMetricsConstants { public static final String STATE_EVENT_INFO = "YARN_APPLICATION_STATE"; - + public static final String APP_CPU_METRICS = "YARN_APPLICATION_CPU"; - + public static final String APP_MEM_METRICS = "YARN_APPLICATION_MEMORY"; @@ -96,10 +96,10 @@ public class ApplicationMetricsConstants { public static final String LATEST_APP_ATTEMPT_EVENT_INFO = "YARN_APPLICATION_LATEST_APP_ATTEMPT"; - + public static final String YARN_APP_CALLER_CONTEXT = "YARN_APPLICATION_CALLER_CONTEXT"; - + public static final String YARN_APP_CALLER_SIGNATURE = "YARN_APPLICATION_CALLER_SIGNATURE"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java new file mode 100644 index 0000000000..b7ef6fb924 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Yarn Common Metrics package. **/ +@InterfaceAudience.Private +package org.apache.hadoop.yarn.server.metrics; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineEntityV2Converter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineEntityV2Converter.java new file mode 100644 index 0000000000..7e382a47d8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineEntityV2Converter.java @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util.timeline; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; + +/** + * Utility class to generate reports from timeline entities. + */ +public final class TimelineEntityV2Converter { + private TimelineEntityV2Converter() { + } + + public static ContainerReport convertToContainerReport( + TimelineEntity entity) { + int allocatedMem = 0; + int allocatedVcore = 0; + String allocatedHost = null; + int allocatedPort = -1; + int allocatedPriority = 0; + long createdTime = 0; + long finishedTime = 0; + String diagnosticsInfo = null; + int exitStatus = ContainerExitStatus.INVALID; + ContainerState state = null; + String nodeHttpAddress = null; + Map entityInfo = entity.getInfo(); + if (entityInfo != null) { + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO)) { + allocatedMem = (Integer) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_MEMORY_INFO); + } + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_VCORE_INFO)) { + allocatedVcore = (Integer) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_VCORE_INFO); + } + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_HOST_INFO)) { + allocatedHost = + entityInfo + .get(ContainerMetricsConstants.ALLOCATED_HOST_INFO) + .toString(); + } + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_PORT_INFO)) { + allocatedPort = (Integer) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_PORT_INFO); + } + if (entityInfo + .containsKey(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO)) { + allocatedPriority = Integer.parseInt(entityInfo.get( + ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO).toString()); + } + if (entityInfo.containsKey( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO)) { + nodeHttpAddress = + (String) entityInfo.get( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO); + } + if (entityInfo.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO)) { + diagnosticsInfo = + entityInfo.get( + ContainerMetricsConstants.DIAGNOSTICS_INFO) + .toString(); + } + if (entityInfo.containsKey(ContainerMetricsConstants.EXIT_STATUS_INFO)) { + exitStatus = (Integer) entityInfo.get( + ContainerMetricsConstants.EXIT_STATUS_INFO); + } + if (entityInfo.containsKey(ContainerMetricsConstants.STATE_INFO)) { + state = + ContainerState.valueOf(entityInfo.get( + ContainerMetricsConstants.STATE_INFO).toString()); + } + } + NavigableSet events = entity.getEvents(); + if (events != null) { + for (TimelineEvent event : events) { + if (event.getId().equals( + ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE)) { + createdTime = event.getTimestamp(); + } else if (event.getId().equals( + ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE)) { + finishedTime = event.getTimestamp(); + } + } + } + String logUrl = null; + NodeId allocatedNode = null; + if (allocatedHost != null) { + allocatedNode = NodeId.newInstance(allocatedHost, allocatedPort); + } + return ContainerReport.newInstance( + ContainerId.fromString(entity.getId()), + Resource.newInstance(allocatedMem, allocatedVcore), allocatedNode, + Priority.newInstance(allocatedPriority), + createdTime, finishedTime, diagnosticsInfo, logUrl, exitStatus, state, + nodeHttpAddress); + } + + public static ApplicationAttemptReport convertToApplicationAttemptReport( + TimelineEntity entity) { + String host = null; + int rpcPort = -1; + ContainerId amContainerId = null; + String trackingUrl = null; + String originalTrackingUrl = null; + String diagnosticsInfo = null; + YarnApplicationAttemptState state = null; + Map entityInfo = entity.getInfo(); + long startTime = 0; + long finishTime = 0; + + if (entityInfo != null) { + if (entityInfo.containsKey(AppAttemptMetricsConstants.HOST_INFO)) { + host = + entityInfo.get(AppAttemptMetricsConstants.HOST_INFO) + .toString(); + } + if (entityInfo + .containsKey(AppAttemptMetricsConstants.RPC_PORT_INFO)) { + rpcPort = (Integer) entityInfo.get( + AppAttemptMetricsConstants.RPC_PORT_INFO); + } + if (entityInfo + .containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)) { + amContainerId = + ContainerId.fromString(entityInfo.get( + AppAttemptMetricsConstants.MASTER_CONTAINER_INFO) + .toString()); + } + if (entityInfo + .containsKey(AppAttemptMetricsConstants.TRACKING_URL_INFO)) { + trackingUrl = + entityInfo.get( + AppAttemptMetricsConstants.TRACKING_URL_INFO) + .toString(); + } + if (entityInfo + .containsKey( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO)) { + originalTrackingUrl = + entityInfo + .get( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO) + .toString(); + } + if (entityInfo + .containsKey(AppAttemptMetricsConstants.DIAGNOSTICS_INFO)) { + diagnosticsInfo = + entityInfo.get( + AppAttemptMetricsConstants.DIAGNOSTICS_INFO) + .toString(); + } + if (entityInfo + .containsKey(AppAttemptMetricsConstants.STATE_INFO)) { + state = + YarnApplicationAttemptState.valueOf(entityInfo.get( + AppAttemptMetricsConstants.STATE_INFO) + .toString()); + } + if (entityInfo + .containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)) { + amContainerId = + ContainerId.fromString(entityInfo.get( + AppAttemptMetricsConstants.MASTER_CONTAINER_INFO) + .toString()); + } + } + NavigableSet events = entity.getEvents(); + if (events != null) { + for (TimelineEvent event : events) { + if (event.getId().equals( + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) { + startTime = event.getTimestamp(); + } else if (event.getId().equals( + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) { + finishTime = event.getTimestamp(); + } + } + } + return ApplicationAttemptReport.newInstance( + ApplicationAttemptId.fromString(entity.getId()), + host, rpcPort, trackingUrl, originalTrackingUrl, diagnosticsInfo, + state, amContainerId, startTime, finishTime); + } + + public static ApplicationReport convertToApplicationReport( + TimelineEntity entity) { + String user = null; + String queue = null; + String name = null; + String type = null; + boolean unmanagedApplication = false; + long createdTime = 0; + long finishedTime = 0; + float progress = 0.0f; + int applicationPriority = 0; + ApplicationAttemptId latestApplicationAttemptId = null; + String diagnosticsInfo = null; + FinalApplicationStatus finalStatus = FinalApplicationStatus.UNDEFINED; + YarnApplicationState state = YarnApplicationState.ACCEPTED; + ApplicationResourceUsageReport appResources = null; + Set appTags = null; + String appNodeLabelExpression = null; + String amNodeLabelExpression = null; + Map entityInfo = entity.getInfo(); + if (entityInfo != null) { + if (entityInfo.containsKey( + ApplicationMetricsConstants.USER_ENTITY_INFO)) { + user = + entityInfo.get(ApplicationMetricsConstants.USER_ENTITY_INFO) + .toString(); + } + if (entityInfo.containsKey( + ApplicationMetricsConstants.QUEUE_ENTITY_INFO)) { + queue = + entityInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO) + .toString(); + } + if (entityInfo.containsKey( + ApplicationMetricsConstants.NAME_ENTITY_INFO)) { + name = + entityInfo.get(ApplicationMetricsConstants.NAME_ENTITY_INFO) + .toString(); + } + if (entityInfo.containsKey( + ApplicationMetricsConstants.TYPE_ENTITY_INFO)) { + type = + entityInfo.get(ApplicationMetricsConstants.TYPE_ENTITY_INFO) + .toString(); + } + if (entityInfo.containsKey( + ApplicationMetricsConstants.TYPE_ENTITY_INFO)) { + type = + entityInfo.get(ApplicationMetricsConstants.TYPE_ENTITY_INFO) + .toString(); + } + if (entityInfo + .containsKey( + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO)) { + unmanagedApplication = + Boolean.parseBoolean(entityInfo.get( + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO) + .toString()); + } + if (entityInfo + .containsKey(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO)) { + applicationPriority = Integer.parseInt(entityInfo.get( + ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO).toString()); + } + if (entityInfo.containsKey(ApplicationMetricsConstants.APP_TAGS_INFO)) { + appTags = new HashSet<>(); + Object obj = entityInfo.get(ApplicationMetricsConstants.APP_TAGS_INFO); + if (obj != null && obj instanceof Collection) { + for(Object o : (Collection)obj) { + if (o != null) { + appTags.add(o.toString()); + } + } + } + } + if (entityInfo + .containsKey( + ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO)) { + latestApplicationAttemptId = ApplicationAttemptId.fromString( + entityInfo.get( + ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO) + .toString()); + } + if (entityInfo.containsKey( + ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)) { + diagnosticsInfo = + entityInfo.get( + ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO) + .toString(); + } + if (entityInfo + .containsKey(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)) { + finalStatus = + FinalApplicationStatus.valueOf(entityInfo.get( + ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO) + .toString()); + } + if (entityInfo + .containsKey(ApplicationMetricsConstants.STATE_EVENT_INFO)) { + state = + YarnApplicationState.valueOf(entityInfo.get( + ApplicationMetricsConstants.STATE_EVENT_INFO).toString()); + } + } + + Map configs = entity.getConfigs(); + if (configs + .containsKey(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION)) { + appNodeLabelExpression = configs + .get(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION); + } + if (configs + .containsKey(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION)) { + amNodeLabelExpression = + configs.get(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION); + } + + Set metrics = entity.getMetrics(); + if (metrics != null) { + long vcoreSeconds = 0; + long memorySeconds = 0; + long preemptedVcoreSeconds = 0; + long preemptedMemorySeconds = 0; + + for (TimelineMetric metric : metrics) { + switch (metric.getId()) { + case ApplicationMetricsConstants.APP_CPU_METRICS: + vcoreSeconds = getAverageValue(metric.getValues().values()); + break; + case ApplicationMetricsConstants.APP_MEM_METRICS: + memorySeconds = getAverageValue(metric.getValues().values()); + break; + case ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS: + preemptedVcoreSeconds = getAverageValue(metric.getValues().values()); + break; + case ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS: + preemptedVcoreSeconds = getAverageValue(metric.getValues().values()); + break; + default: + // Should not happen.. + break; + } + } + Map resourceSecondsMap = new HashMap<>(); + Map preemptedResoureSecondsMap = new HashMap<>(); + resourceSecondsMap + .put(ResourceInformation.MEMORY_MB.getName(), memorySeconds); + resourceSecondsMap + .put(ResourceInformation.VCORES.getName(), vcoreSeconds); + preemptedResoureSecondsMap.put(ResourceInformation.MEMORY_MB.getName(), + preemptedMemorySeconds); + preemptedResoureSecondsMap + .put(ResourceInformation.VCORES.getName(), preemptedVcoreSeconds); + + appResources = ApplicationResourceUsageReport + .newInstance(0, 0, null, null, null, resourceSecondsMap, 0, 0, + preemptedResoureSecondsMap); + } + + NavigableSet events = entity.getEvents(); + long updatedTimeStamp = 0L; + if (events != null) { + for (TimelineEvent event : events) { + if (event.getId().equals( + ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { + createdTime = event.getTimestamp(); + } else if (event.getId().equals( + ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) { + // This type of events are parsed in time-stamp descending order + // which means the previous event could override the information + // from the later same type of event. Hence compare timestamp + // before over writing. + if (event.getTimestamp() > updatedTimeStamp) { + updatedTimeStamp = event.getTimestamp(); + } + } else if (event.getId().equals( + ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE)) { + Map eventInfo = event.getInfo(); + if (eventInfo == null) { + continue; + } + if (eventInfo.containsKey( + ApplicationMetricsConstants.STATE_EVENT_INFO)) { + if (state == YarnApplicationState.ACCEPTED) { + state = YarnApplicationState.valueOf(eventInfo.get( + ApplicationMetricsConstants.STATE_EVENT_INFO).toString()); + } + } + } else if (event.getId().equals( + ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + progress=1.0F; + state = YarnApplicationState.FINISHED; + finishedTime = event.getTimestamp(); + } + } + } + return ApplicationReport.newInstance( + ApplicationId.fromString(entity.getId()), + latestApplicationAttemptId, user, queue, name, null, -1, null, state, + diagnosticsInfo, null, createdTime, finishedTime, finalStatus, + appResources, null, progress, type, null, appTags, unmanagedApplication, + Priority.newInstance(applicationPriority), appNodeLabelExpression, + amNodeLabelExpression); + } + + private static long getAverageValue(Collection values) { + if (values == null || values.isEmpty()) { + return 0; + } + long sum = 0; + for (Number value : values) { + sum += value.longValue(); + } + return sum/values.size(); + } +}