MAPREDUCE-2789. Complete schedulingInfo on CLI. Contributed by Eric Payne.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1182616 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
277e520579
commit
26d63ecdae
@ -1600,6 +1600,8 @@ Release 0.23.0 - Unreleased
|
||||
MAPREDUCE-2666. Retrieve shuffle port number from JobHistory on MR AM
|
||||
restart. (Jonathan Eagles via acmurthy)
|
||||
|
||||
MAPREDUCE-2789. Complete schedulingInfo on CLI. (Eric Payne via acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -425,6 +425,11 @@ public static JobStatus fromYarn(ApplicationReport application,
|
||||
jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
|
||||
jobStatus.setStartTime(application.getStartTime());
|
||||
jobStatus.setFailureInfo(application.getDiagnostics());
|
||||
jobStatus.setNeededMem(application.getApplicationResourceUsageReport().getNeededResources().getMemory());
|
||||
jobStatus.setNumReservedSlots(application.getApplicationResourceUsageReport().getNumReservedContainers());
|
||||
jobStatus.setNumUsedSlots(application.getApplicationResourceUsageReport().getNumUsedContainers());
|
||||
jobStatus.setReservedMem(application.getApplicationResourceUsageReport().getReservedResources().getMemory());
|
||||
jobStatus.setUsedMem(application.getApplicationResourceUsageReport().getUsedResources().getMemory());
|
||||
return jobStatus;
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,9 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.QueueInfoPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
@ -42,6 +44,15 @@ public void testFromYarn() throws Exception {
|
||||
applicationReport.setYarnApplicationState(state);
|
||||
applicationReport.setStartTime(appStartTime);
|
||||
applicationReport.setUser("TestTypeConverter-user");
|
||||
ApplicationResourceUsageReportPBImpl appUsageRpt = new ApplicationResourceUsageReportPBImpl();
|
||||
ResourcePBImpl r = new ResourcePBImpl();
|
||||
r.setMemory(2048);
|
||||
appUsageRpt.setNeededResources(r);
|
||||
appUsageRpt.setNumReservedContainers(1);
|
||||
appUsageRpt.setNumUsedContainers(3);
|
||||
appUsageRpt.setReservedResources(r);
|
||||
appUsageRpt.setUsedResources(r);
|
||||
applicationReport.setApplicationResourceUsageReport(appUsageRpt);
|
||||
JobStatus jobStatus = TypeConverter.fromYarn(applicationReport, "dummy-jobfile");
|
||||
Assert.assertEquals(appStartTime, jobStatus.getStartTime());
|
||||
Assert.assertEquals(state.toString(), jobStatus.getState().toString());
|
||||
@ -60,6 +71,15 @@ public void testFromYarnApplicationReport() {
|
||||
when(mockReport.getUser()).thenReturn("dummy-user");
|
||||
when(mockReport.getQueue()).thenReturn("dummy-queue");
|
||||
String jobFile = "dummy-path/job.xml";
|
||||
ApplicationResourceUsageReportPBImpl appUsageRpt = new ApplicationResourceUsageReportPBImpl();
|
||||
ResourcePBImpl r = new ResourcePBImpl();
|
||||
r.setMemory(2048);
|
||||
appUsageRpt.setNeededResources(r);
|
||||
appUsageRpt.setNumReservedContainers(1);
|
||||
appUsageRpt.setNumUsedContainers(3);
|
||||
appUsageRpt.setReservedResources(r);
|
||||
appUsageRpt.setUsedResources(r);
|
||||
when(mockReport.getApplicationResourceUsageReport()).thenReturn(appUsageRpt);
|
||||
JobStatus status = TypeConverter.fromYarn(mockReport, jobFile);
|
||||
Assert.assertNotNull("fromYarn returned null status", status);
|
||||
Assert.assertEquals("jobFile set incorrectly", "dummy-path/job.xml", status.getJobFile());
|
||||
@ -69,6 +89,11 @@ public void testFromYarnApplicationReport() {
|
||||
Assert.assertEquals("schedulingInfo set incorrectly", "dummy-tracking-url", status.getSchedulingInfo());
|
||||
Assert.assertEquals("jobId set incorrectly", 6789, status.getJobID().getId());
|
||||
Assert.assertEquals("state set incorrectly", JobStatus.State.KILLED, status.getState());
|
||||
Assert.assertEquals("needed mem info set incorrectly", 2048, status.getNeededMem());
|
||||
Assert.assertEquals("num rsvd slots info set incorrectly", 1, status.getNumReservedSlots());
|
||||
Assert.assertEquals("num used slots info set incorrectly", 3, status.getNumUsedSlots());
|
||||
Assert.assertEquals("rsvd mem info set incorrectly", 2048, status.getReservedMem());
|
||||
Assert.assertEquals("used mem info set incorrectly", 2048, status.getUsedMem());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -92,6 +92,11 @@ public int getValue() {
|
||||
private boolean isRetired;
|
||||
private String historyFile = "";
|
||||
private String trackingUrl ="";
|
||||
private int numUsedSlots;
|
||||
private int numReservedSlots;
|
||||
private int usedMem;
|
||||
private int reservedMem;
|
||||
private int neededMem;
|
||||
|
||||
|
||||
/**
|
||||
@ -487,6 +492,76 @@ public synchronized String getHistoryFile() {
|
||||
return historyFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return number of used mapred slots
|
||||
*/
|
||||
public int getNumUsedSlots() {
|
||||
return numUsedSlots;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param n number of used mapred slots
|
||||
*/
|
||||
public void setNumUsedSlots(int n) {
|
||||
numUsedSlots = n;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of reserved slots
|
||||
*/
|
||||
public int getNumReservedSlots() {
|
||||
return numReservedSlots;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param n the number of reserved slots
|
||||
*/
|
||||
public void setNumReservedSlots(int n) {
|
||||
this.numReservedSlots = n;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the used memory
|
||||
*/
|
||||
public int getUsedMem() {
|
||||
return usedMem;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param m the used memory
|
||||
*/
|
||||
public void setUsedMem(int m) {
|
||||
this.usedMem = m;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the reserved memory
|
||||
*/
|
||||
public int getReservedMem() {
|
||||
return reservedMem;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param r the reserved memory
|
||||
*/
|
||||
public void setReservedMem(int r) {
|
||||
this.reservedMem = r;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the needed memory
|
||||
*/
|
||||
public int getNeededMem() {
|
||||
return neededMem;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param n the needed memory
|
||||
*/
|
||||
public void setNeededMem(int n) {
|
||||
this.neededMem = n;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
buffer.append("job-id : " + jobid);
|
||||
@ -499,6 +574,11 @@ public String toString() {
|
||||
buffer.append("user-name : " + user);
|
||||
buffer.append("priority : " + priority);
|
||||
buffer.append("scheduling-info : " + schedulingInfo);
|
||||
buffer.append("num-used-slots" + numUsedSlots);
|
||||
buffer.append("num-reserved-slots" + numReservedSlots);
|
||||
buffer.append("used-mem" + usedMem);
|
||||
buffer.append("reserved-mem" + reservedMem);
|
||||
buffer.append("needed-mem" + neededMem);
|
||||
return buffer.toString();
|
||||
}
|
||||
}
|
||||
|
@ -53,6 +53,7 @@
|
||||
@InterfaceStability.Stable
|
||||
public class CLI extends Configured implements Tool {
|
||||
private static final Log LOG = LogFactory.getLog(CLI.class);
|
||||
private Cluster cluster;
|
||||
|
||||
public CLI() {
|
||||
}
|
||||
@ -210,7 +211,7 @@ public int run(String[] argv) throws Exception {
|
||||
}
|
||||
|
||||
// initialize cluster
|
||||
Cluster cluster = new Cluster(getConf());
|
||||
cluster = new Cluster(getConf());
|
||||
|
||||
// Submit the request
|
||||
try {
|
||||
@ -527,12 +528,26 @@ public void displayJobList(JobStatus[] jobs)
|
||||
throws IOException, InterruptedException {
|
||||
System.out.println("Total jobs:" + jobs.length);
|
||||
System.out.println("JobId\tState\tStartTime\t" +
|
||||
"UserName\tQueue\tPriority\tSchedulingInfo");
|
||||
"UserName\tQueue\tPriority\tMaps\tReduces\tUsedContainers\t" +
|
||||
"RsvdContainers\tUsedMem\tRsvdMem\tNeededMem\tAM info");
|
||||
for (JobStatus job : jobs) {
|
||||
System.out.printf("%s\t%s\t%d\t%s\t%s\t%s\t%s\n", job.getJobID().toString(),
|
||||
job.getState(), job.getStartTime(),
|
||||
TaskReport[] mapReports =
|
||||
cluster.getJob(job.getJobID()).getTaskReports(TaskType.MAP);
|
||||
TaskReport[] reduceReports =
|
||||
cluster.getJob(job.getJobID()).getTaskReports(TaskType.REDUCE);
|
||||
|
||||
System.out.printf("%s\t%s\t%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d\t%dM\t%dM\t%dM\t%s\n",
|
||||
job.getJobID().toString(), job.getState(), job.getStartTime(),
|
||||
job.getUsername(), job.getQueue(),
|
||||
job.getPriority().name(), job.getSchedulingInfo());
|
||||
job.getPriority().name(),
|
||||
mapReports.length,
|
||||
reduceReports.length,
|
||||
job.getNumUsedSlots(),
|
||||
job.getNumReservedSlots(),
|
||||
job.getUsedMem(),
|
||||
job.getReservedMem(),
|
||||
job.getNeededMem(),
|
||||
job.getSchedulingInfo());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -77,7 +77,7 @@ private ApplicationReport getUnknownApplicationReport() {
|
||||
// Setting AppState to NEW and finalStatus to UNDEFINED as they are never used
|
||||
// for a non running job
|
||||
return BuilderUtils.newApplicationReport(unknownAppId, "N/A", "N/A", "N/A", "N/A", 0, "",
|
||||
YarnApplicationState.NEW, "N/A", "N/A", 0, 0, FinalApplicationStatus.UNDEFINED);
|
||||
YarnApplicationState.NEW, "N/A", "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null);
|
||||
}
|
||||
|
||||
NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
|
||||
|
@ -211,4 +211,19 @@ public interface ApplicationReport {
|
||||
@Unstable
|
||||
void setFinalApplicationStatus(FinalApplicationStatus finishState);
|
||||
|
||||
/**
|
||||
* Retrieve the structure containing the job resources for this application
|
||||
* @return the job resources structure for this application
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
ApplicationResourceUsageReport getApplicationResourceUsageReport();
|
||||
|
||||
/**
|
||||
* Store the structure containing the job resources for this application
|
||||
* @param appResources structure for this application
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
void setApplicationResourceUsageReport(ApplicationResourceUsageReport appResources);
|
||||
}
|
||||
|
@ -0,0 +1,75 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
|
||||
/**
|
||||
* Contains various scheduling metrics to be reported by UI and CLI.
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public interface ApplicationResourceUsageReport {
|
||||
@Public
|
||||
@Stable
|
||||
int getNumUsedContainers();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setNumUsedContainers(int num_containers);
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
int getNumReservedContainers();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setNumReservedContainers(int num_reserved_containers);
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
Resource getUsedResources();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setUsedResources(Resource resources);
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
Resource getReservedResources();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setReservedResources(Resource reserved_resources);
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
Resource getNeededResources();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setNeededResources(Resource needed_resources);
|
||||
}
|
@ -23,10 +23,12 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
|
||||
@ -61,6 +63,24 @@ public ApplicationId getApplicationId() {
|
||||
return this.applicationId;
|
||||
}
|
||||
|
||||
public void setApplicationResourceUsageReport(ApplicationResourceUsageReport appInfo) {
|
||||
maybeInitBuilder();
|
||||
if (appInfo == null) {
|
||||
builder.clearAppResourceUsage();
|
||||
return;
|
||||
}
|
||||
builder.setAppResourceUsage(convertToProtoFormat(appInfo));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
|
||||
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasAppResourceUsage()) {
|
||||
return null;
|
||||
}
|
||||
return convertFromProtoFormat(p.getAppResourceUsage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTrackingUrl() {
|
||||
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
@ -312,6 +332,14 @@ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
|
||||
return ((ApplicationIdPBImpl) t).getProto();
|
||||
}
|
||||
|
||||
private ApplicationResourceUsageReport convertFromProtoFormat(ApplicationResourceUsageReportProto s) {
|
||||
return ProtoUtils.convertFromProtoFormat(s);
|
||||
}
|
||||
|
||||
private ApplicationResourceUsageReportProto convertToProtoFormat(ApplicationResourceUsageReport s) {
|
||||
return ProtoUtils.convertToProtoFormat(s);
|
||||
}
|
||||
|
||||
private ApplicationIdPBImpl convertFromProtoFormat(
|
||||
ApplicationIdProto applicationId) {
|
||||
return new ApplicationIdPBImpl(applicationId);
|
||||
|
@ -0,0 +1,186 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
|
||||
public class ApplicationResourceUsageReportPBImpl
|
||||
extends ProtoBase<ApplicationResourceUsageReportProto>
|
||||
implements ApplicationResourceUsageReport {
|
||||
ApplicationResourceUsageReportProto proto =
|
||||
ApplicationResourceUsageReportProto.getDefaultInstance();
|
||||
ApplicationResourceUsageReportProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
Resource usedResources;
|
||||
Resource reservedResources;
|
||||
Resource neededResources;
|
||||
|
||||
public ApplicationResourceUsageReportPBImpl() {
|
||||
builder = ApplicationResourceUsageReportProto.newBuilder();
|
||||
}
|
||||
|
||||
public ApplicationResourceUsageReportPBImpl(
|
||||
ApplicationResourceUsageReportProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public synchronized ApplicationResourceUsageReportProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void mergeLocalToBuilder() {
|
||||
if (this.usedResources != null
|
||||
&& !((ResourcePBImpl) this.usedResources).getProto().equals(
|
||||
builder.getUsedResources())) {
|
||||
builder.setUsedResources(convertToProtoFormat(this.usedResources));
|
||||
}
|
||||
if (this.reservedResources != null
|
||||
&& !((ResourcePBImpl) this.reservedResources).getProto().equals(
|
||||
builder.getReservedResources())) {
|
||||
builder.setReservedResources(
|
||||
convertToProtoFormat(this.reservedResources));
|
||||
}
|
||||
if (this.neededResources != null
|
||||
&& !((ResourcePBImpl) this.neededResources).getProto().equals(
|
||||
builder.getNeededResources())) {
|
||||
builder.setNeededResources(convertToProtoFormat(this.neededResources));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
maybeInitBuilder();
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private synchronized void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = ApplicationResourceUsageReportProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getNumUsedContainers() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return (p.getNumUsedContainers());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setNumUsedContainers(int num_containers) {
|
||||
maybeInitBuilder();
|
||||
builder.setNumUsedContainers((num_containers));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getNumReservedContainers() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return (p.getNumReservedContainers());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setNumReservedContainers(
|
||||
int num_reserved_containers) {
|
||||
maybeInitBuilder();
|
||||
builder.setNumReservedContainers((num_reserved_containers));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Resource getUsedResources() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.usedResources != null) {
|
||||
return this.usedResources;
|
||||
}
|
||||
if (!p.hasUsedResources()) {
|
||||
return null;
|
||||
}
|
||||
this.usedResources = convertFromProtoFormat(p.getUsedResources());
|
||||
return this.usedResources;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setUsedResources(Resource resources) {
|
||||
maybeInitBuilder();
|
||||
if (resources == null)
|
||||
builder.clearUsedResources();
|
||||
this.usedResources = resources;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Resource getReservedResources() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.reservedResources != null) {
|
||||
return this.reservedResources;
|
||||
}
|
||||
if (!p.hasReservedResources()) {
|
||||
return null;
|
||||
}
|
||||
this.reservedResources = convertFromProtoFormat(p.getReservedResources());
|
||||
return this.reservedResources;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setReservedResources(Resource reserved_resources) {
|
||||
maybeInitBuilder();
|
||||
if (reserved_resources == null)
|
||||
builder.clearReservedResources();
|
||||
this.reservedResources = reserved_resources;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Resource getNeededResources() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.neededResources != null) {
|
||||
return this.neededResources;
|
||||
}
|
||||
if (!p.hasNeededResources()) {
|
||||
return null;
|
||||
}
|
||||
this.neededResources = convertFromProtoFormat(p.getNeededResources());
|
||||
return this.neededResources;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setNeededResources(Resource reserved_resources) {
|
||||
maybeInitBuilder();
|
||||
if (reserved_resources == null)
|
||||
builder.clearNeededResources();
|
||||
this.neededResources = reserved_resources;
|
||||
}
|
||||
|
||||
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
|
||||
return new ResourcePBImpl(p);
|
||||
}
|
||||
|
||||
private ResourceProto convertToProtoFormat(Resource t) {
|
||||
return ((ResourcePBImpl)t).getProto();
|
||||
}
|
||||
}
|
@ -26,7 +26,10 @@
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
|
||||
@ -62,6 +65,17 @@ public static YarnApplicationState convertFromProtoFormat(YarnApplicationStatePr
|
||||
return YarnApplicationState.valueOf(e.name());
|
||||
}
|
||||
|
||||
/*
|
||||
* ApplicationResourceUsageReport
|
||||
*/
|
||||
public static ApplicationResourceUsageReportProto convertToProtoFormat(ApplicationResourceUsageReport e) {
|
||||
return ((ApplicationResourceUsageReportPBImpl)e).getProto();
|
||||
}
|
||||
|
||||
public static ApplicationResourceUsageReport convertFromProtoFormat(ApplicationResourceUsageReportProto e) {
|
||||
return new ApplicationResourceUsageReportPBImpl(e);
|
||||
}
|
||||
|
||||
/*
|
||||
* FinalApplicationStatus
|
||||
*/
|
||||
|
@ -137,6 +137,14 @@ message LocalResourceProto {
|
||||
optional LocalResourceVisibilityProto visibility= 5;
|
||||
}
|
||||
|
||||
message ApplicationResourceUsageReportProto {
|
||||
optional int32 num_used_containers = 1;
|
||||
optional int32 num_reserved_containers = 2;
|
||||
optional ResourceProto used_resources = 3;
|
||||
optional ResourceProto reserved_resources = 4;
|
||||
optional ResourceProto needed_resources = 5;
|
||||
}
|
||||
|
||||
message ApplicationReportProto {
|
||||
optional ApplicationIdProto applicationId = 1;
|
||||
optional string user = 2;
|
||||
@ -153,6 +161,7 @@ message ApplicationReportProto {
|
||||
optional int64 startTime = 13;
|
||||
optional int64 finishTime = 14;
|
||||
optional FinalApplicationStatusProto final_application_status = 15;
|
||||
optional ApplicationResourceUsageReportProto app_resource_Usage = 16;
|
||||
}
|
||||
|
||||
message NodeIdProto {
|
||||
|
@ -39,6 +39,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
@ -245,7 +246,7 @@ public static ApplicationReport newApplicationReport(
|
||||
ApplicationId applicationId, String user, String queue, String name,
|
||||
String host, int rpcPort, String clientToken, YarnApplicationState state,
|
||||
String diagnostics, String url, long startTime, long finishTime,
|
||||
FinalApplicationStatus finalStatus) {
|
||||
FinalApplicationStatus finalStatus, ApplicationResourceUsageReport appResources) {
|
||||
ApplicationReport report = recordFactory
|
||||
.newRecordInstance(ApplicationReport.class);
|
||||
report.setApplicationId(applicationId);
|
||||
@ -261,6 +262,7 @@ public static ApplicationReport newApplicationReport(
|
||||
report.setStartTime(startTime);
|
||||
report.setFinishTime(finishTime);
|
||||
report.setFinalApplicationStatus(finalStatus);
|
||||
report.setApplicationResourceUsageReport(appResources);
|
||||
return report;
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import com.google.common.collect.Iterators;
|
||||
@ -81,6 +82,7 @@ public static ApplicationReport newApp(int i) {
|
||||
final String queue = newQueue();
|
||||
final FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
|
||||
return new ApplicationReport() {
|
||||
private ApplicationResourceUsageReport appUsageReport;
|
||||
@Override public ApplicationId getApplicationId() { return id; }
|
||||
@Override public String getUser() { return user; }
|
||||
@Override public String getName() { return name; }
|
||||
@ -88,6 +90,10 @@ public static ApplicationReport newApp(int i) {
|
||||
@Override public String getQueue() { return queue; }
|
||||
@Override public String getTrackingUrl() { return ""; }
|
||||
@Override public FinalApplicationStatus getFinalApplicationStatus() { return finishState; }
|
||||
@Override
|
||||
public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
|
||||
return this.appUsageReport;
|
||||
}
|
||||
public void setApplicationId(ApplicationId applicationId) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
@ -98,6 +104,10 @@ public void setTrackingUrl(String url) {
|
||||
|
||||
}
|
||||
@Override
|
||||
public void setApplicationResourceUsageReport(ApplicationResourceUsageReport appResources) {
|
||||
this.appUsageReport = appResources;
|
||||
}
|
||||
@Override
|
||||
public void setName(String name) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
|
@ -32,6 +32,7 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
@ -325,18 +326,20 @@ public ApplicationReport createAndGetApplicationReport() {
|
||||
String trackingUrl = "N/A";
|
||||
String host = "N/A";
|
||||
int rpcPort = -1;
|
||||
ApplicationResourceUsageReport appUsageReport = null;
|
||||
FinalApplicationStatus finishState = getFinalApplicationStatus();
|
||||
if (this.currentAttempt != null) {
|
||||
trackingUrl = this.currentAttempt.getTrackingUrl();
|
||||
clientToken = this.currentAttempt.getClientToken();
|
||||
host = this.currentAttempt.getHost();
|
||||
rpcPort = this.currentAttempt.getRpcPort();
|
||||
appUsageReport = currentAttempt.getApplicationResourceUsageReport();
|
||||
}
|
||||
return BuilderUtils.newApplicationReport(this.applicationId, this.user,
|
||||
this.queue, this.name, host, rpcPort, clientToken,
|
||||
createApplicationState(this.stateMachine.getCurrentState()),
|
||||
this.diagnostics.toString(), trackingUrl,
|
||||
this.startTime, this.finishTime, finishState);
|
||||
this.startTime, this.finishTime, finishState, appUsageReport);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
@ -22,6 +22,7 @@
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
@ -127,4 +128,10 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
||||
* @return the application submission context for this Application.
|
||||
*/
|
||||
ApplicationSubmissionContext getSubmissionContext();
|
||||
|
||||
/*
|
||||
* Get application container and resource usage information.
|
||||
* @return an ApplicationResourceUsageReport object.
|
||||
*/
|
||||
ApplicationResourceUsageReport getApplicationResourceUsageReport();
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
@ -31,6 +32,7 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
@ -47,6 +49,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
|
||||
@ -58,7 +61,9 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
@ -428,6 +433,52 @@ public void handle(RMAppAttemptEvent event) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
int numUsedContainers = 0;
|
||||
int numReservedContainers = 0;
|
||||
int reservedResources = 0;
|
||||
int currentConsumption = 0;
|
||||
SchedulerAppReport schedApp =
|
||||
scheduler.getSchedulerAppInfo(this.getAppAttemptId());
|
||||
Collection<RMContainer> liveContainers;
|
||||
Collection<RMContainer> reservedContainers;
|
||||
if (schedApp != null) {
|
||||
liveContainers = schedApp.getLiveContainers();
|
||||
reservedContainers = schedApp.getReservedContainers();
|
||||
if (liveContainers != null) {
|
||||
numUsedContainers = liveContainers.size();
|
||||
for (RMContainer lc : liveContainers) {
|
||||
currentConsumption += lc.getContainer().getResource().getMemory();
|
||||
}
|
||||
}
|
||||
if (reservedContainers != null) {
|
||||
numReservedContainers = reservedContainers.size();
|
||||
for (RMContainer rc : reservedContainers) {
|
||||
reservedResources += rc.getContainer().getResource().getMemory();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ApplicationResourceUsageReport appResources =
|
||||
recordFactory.newRecordInstance(ApplicationResourceUsageReport.class);
|
||||
appResources.setNumUsedContainers(numUsedContainers);
|
||||
appResources.setNumReservedContainers(numReservedContainers);
|
||||
appResources.setUsedResources(
|
||||
Resources.createResource(currentConsumption));
|
||||
appResources.setReservedResources(
|
||||
Resources.createResource(reservedResources));
|
||||
appResources.setNeededResources(
|
||||
Resources.createResource(currentConsumption + reservedResources));
|
||||
return appResources;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private static class BaseTransition implements
|
||||
SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user