YARN-4218. Metric for resource*time that was preempted. Contributed by Chang Li.
This commit is contained in:
parent
3a98419532
commit
93eeb13164
@ -36,7 +36,8 @@ public abstract class ApplicationResourceUsageReport {
|
||||
public static ApplicationResourceUsageReport newInstance(
|
||||
int numUsedContainers, int numReservedContainers, Resource usedResources,
|
||||
Resource reservedResources, Resource neededResources, long memorySeconds,
|
||||
long vcoreSeconds, float queueUsagePerc, float clusterUsagePerc) {
|
||||
long vcoreSeconds, float queueUsagePerc, float clusterUsagePerc,
|
||||
long preemptedMemorySeconds, long preemptedVcoresSeconds) {
|
||||
ApplicationResourceUsageReport report =
|
||||
Records.newRecord(ApplicationResourceUsageReport.class);
|
||||
report.setNumUsedContainers(numUsedContainers);
|
||||
@ -48,6 +49,8 @@ public static ApplicationResourceUsageReport newInstance(
|
||||
report.setVcoreSeconds(vcoreSeconds);
|
||||
report.setQueueUsagePercentage(queueUsagePerc);
|
||||
report.setClusterUsagePercentage(clusterUsagePerc);
|
||||
report.setPreemptedMemorySeconds(preemptedMemorySeconds);
|
||||
report.setPreemptedVcoreSeconds(preemptedVcoresSeconds);
|
||||
return report;
|
||||
}
|
||||
|
||||
@ -188,4 +191,42 @@ public static ApplicationResourceUsageReport newInstance(
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setClusterUsagePercentage(float clusterUsagePerc);
|
||||
|
||||
/**
|
||||
* Set the aggregated amount of memory preempted (in megabytes)
|
||||
* the application has allocated times the number of seconds
|
||||
* the application has been running.
|
||||
* @param memorySeconds the aggregated amount of memory seconds
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setPreemptedMemorySeconds(long memorySeconds);
|
||||
|
||||
/**
|
||||
* Get the aggregated amount of memory preempted(in megabytes)
|
||||
* the application has allocated times the number of
|
||||
* seconds the application has been running.
|
||||
* @return the aggregated amount of memory seconds
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract long getPreemptedMemorySeconds();
|
||||
|
||||
/**
|
||||
* Set the aggregated number of vcores preempted that the application has
|
||||
* allocated times the number of seconds the application has been running.
|
||||
* @param vcoreSeconds the aggregated number of vcore seconds
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setPreemptedVcoreSeconds(long vcoreSeconds);
|
||||
|
||||
/**
|
||||
* Get the aggregated number of vcores preempted that the application has
|
||||
* allocated times the number of seconds the application has been running.
|
||||
* @return the aggregated number of vcore seconds
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract long getPreemptedVcoreSeconds();
|
||||
}
|
||||
|
@ -184,6 +184,8 @@ message ApplicationResourceUsageReportProto {
|
||||
optional int64 vcore_seconds = 7;
|
||||
optional float queue_usage_percentage = 8;
|
||||
optional float cluster_usage_percentage = 9;
|
||||
optional int64 preempted_memory_seconds = 10;
|
||||
optional int64 preempted_vcore_seconds = 11;
|
||||
}
|
||||
|
||||
message ApplicationReportProto {
|
||||
|
@ -658,8 +658,15 @@ private int printApplicationReport(String applicationId)
|
||||
//completed app report in the timeline server doesn't have usage report
|
||||
appReportStr.print(usageReport.getMemorySeconds() + " MB-seconds, ");
|
||||
appReportStr.println(usageReport.getVcoreSeconds() + " vcore-seconds");
|
||||
appReportStr.print("\tAggregate Resource Preempted : ");
|
||||
appReportStr.print(usageReport.getPreemptedMemorySeconds() +
|
||||
" MB-seconds, ");
|
||||
appReportStr.println(usageReport.getPreemptedVcoreSeconds() +
|
||||
" vcore-seconds");
|
||||
} else {
|
||||
appReportStr.println("N/A");
|
||||
appReportStr.print("\tAggregate Resource Preempted : ");
|
||||
appReportStr.println("N/A");
|
||||
}
|
||||
appReportStr.print("\tLog Aggregation Status : ");
|
||||
appReportStr.println(appReport.getLogAggregationStatus() == null ? "N/A"
|
||||
|
@ -115,7 +115,7 @@ public void testGetApplicationReport() throws Exception {
|
||||
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
|
||||
ApplicationResourceUsageReport usageReport = i == 0 ? null :
|
||||
ApplicationResourceUsageReport.newInstance(
|
||||
2, 0, null, null, null, 123456, 4567, 0, 0);
|
||||
2, 0, null, null, null, 123456, 4567, 0, 0, 1111, 2222);
|
||||
ApplicationReport newApplicationReport = ApplicationReport.newInstance(
|
||||
applicationId, ApplicationAttemptId.newInstance(applicationId, 1),
|
||||
"user", "queue", "appname", "host", 124, null,
|
||||
@ -148,6 +148,8 @@ public void testGetApplicationReport() throws Exception {
|
||||
pw.println("\tAM Host : host");
|
||||
pw.println("\tAggregate Resource Allocation : " +
|
||||
(i == 0 ? "N/A" : "123456 MB-seconds, 4567 vcore-seconds"));
|
||||
pw.println("\tAggregate Resource Preempted : " +
|
||||
(i == 0 ? "N/A" : "1111 MB-seconds, 2222 vcore-seconds"));
|
||||
pw.println("\tLog Aggregation Status : SUCCEEDED");
|
||||
pw.println("\tDiagnostics : diagnostics");
|
||||
pw.println("\tUnmanaged Application : false");
|
||||
|
@ -224,6 +224,34 @@ public synchronized long getVcoreSeconds() {
|
||||
return (p.getVcoreSeconds());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setPreemptedMemorySeconds(
|
||||
long preemptedMemorySeconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setPreemptedMemorySeconds(preemptedMemorySeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized long getPreemptedMemorySeconds() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return p.getPreemptedMemorySeconds();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setPreemptedVcoreSeconds(
|
||||
long vcoreSeconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setPreemptedVcoreSeconds(vcoreSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized long getPreemptedVcoreSeconds() {
|
||||
ApplicationResourceUsageReportProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
return (p.getPreemptedVcoreSeconds());
|
||||
}
|
||||
|
||||
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
|
||||
return new ResourcePBImpl(p);
|
||||
}
|
||||
|
@ -334,9 +334,15 @@ private static ApplicationReportExt convertToApplicationReport(
|
||||
ApplicationMetricsConstants.APP_CPU_METRICS).toString());
|
||||
long memorySeconds=Long.parseLong(entityInfo.get(
|
||||
ApplicationMetricsConstants.APP_MEM_METRICS).toString());
|
||||
long preemptedMemorySeconds = Long.parseLong(entityInfo.get(
|
||||
ApplicationMetricsConstants
|
||||
.APP_MEM_PREEMPT_METRICS).toString());
|
||||
long preemptedVcoreSeconds = Long.parseLong(entityInfo.get(
|
||||
ApplicationMetricsConstants
|
||||
.APP_CPU_PREEMPT_METRICS).toString());
|
||||
appResources = ApplicationResourceUsageReport
|
||||
.newInstance(0, 0, null, null, null, memorySeconds, vcoreSeconds, 0,
|
||||
0);
|
||||
0, preemptedMemorySeconds, preemptedVcoreSeconds);
|
||||
}
|
||||
if (entityInfo.containsKey(ApplicationMetricsConstants.APP_TAGS_INFO)) {
|
||||
appTags = new HashSet<String>();
|
||||
|
@ -245,6 +245,11 @@ public ApplicationReport run() throws Exception {
|
||||
applicationResourceUsageReport.getMemorySeconds());
|
||||
Assert
|
||||
.assertEquals(345, applicationResourceUsageReport.getVcoreSeconds());
|
||||
Assert.assertEquals(456,
|
||||
applicationResourceUsageReport.getPreemptedMemorySeconds());
|
||||
Assert
|
||||
.assertEquals(789, applicationResourceUsageReport
|
||||
.getPreemptedVcoreSeconds());
|
||||
Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
|
||||
app.getFinalApplicationStatus());
|
||||
Assert.assertEquals(YarnApplicationState.FINISHED,
|
||||
@ -503,8 +508,10 @@ private static TimelineEntity createApplicationTimelineEntity(
|
||||
Priority.newInstance(0));
|
||||
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
|
||||
Integer.MAX_VALUE + 1L);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_MEM_METRICS,123);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_CPU_METRICS,345);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_MEM_METRICS, 123);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_CPU_METRICS, 345);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS,456);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS,789);
|
||||
if (emptyACLs) {
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, "");
|
||||
} else {
|
||||
|
@ -88,6 +88,12 @@ public class ApplicationMetricsConstants {
|
||||
public static final String APP_AM_CONTAINER_PREEMPTED =
|
||||
"YARN_APPLICATION_AM_CONTAINER_PREEMPTED";
|
||||
|
||||
public static final String APP_CPU_PREEMPT_METRICS =
|
||||
"YARN_APPLICATION_CPU_PREEMPT_METRIC";
|
||||
|
||||
public static final String APP_MEM_PREEMPT_METRICS =
|
||||
"YARN_APPLICATION_MEM_PREEMPT_METRIC";
|
||||
|
||||
public static final String LATEST_APP_ATTEMPT_EVENT_INFO =
|
||||
"YARN_APPLICATION_LATEST_APP_ATTEMPT";
|
||||
|
||||
|
@ -446,7 +446,8 @@ public static ApplicationSubmissionContext newApplicationSubmissionContext(
|
||||
public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
|
||||
int numUsedContainers, int numReservedContainers, Resource usedResources,
|
||||
Resource reservedResources, Resource neededResources, long memorySeconds,
|
||||
long vcoreSeconds) {
|
||||
long vcoreSeconds, long preemptedMemorySeconds,
|
||||
long preemptedVcoreSeconds) {
|
||||
ApplicationResourceUsageReport report =
|
||||
recordFactory.newRecordInstance(ApplicationResourceUsageReport.class);
|
||||
report.setNumUsedContainers(numUsedContainers);
|
||||
@ -456,6 +457,8 @@ public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
|
||||
report.setNeededResources(neededResources);
|
||||
report.setMemorySeconds(memorySeconds);
|
||||
report.setVcoreSeconds(vcoreSeconds);
|
||||
report.setPreemptedMemorySeconds(preemptedMemorySeconds);
|
||||
report.setPreemptedVcoreSeconds(preemptedVcoreSeconds);
|
||||
return report;
|
||||
}
|
||||
|
||||
|
@ -182,6 +182,8 @@ public static SummaryBuilder createAppSummary(RMApp app) {
|
||||
.add("finalStatus", app.getFinalApplicationStatus())
|
||||
.add("memorySeconds", metrics.getMemorySeconds())
|
||||
.add("vcoreSeconds", metrics.getVcoreSeconds())
|
||||
.add("preemptedMemorySeconds", metrics.getPreemptedMemorySeconds())
|
||||
.add("preemptedVcoreSeconds", metrics.getPreemptedVcoreSeconds())
|
||||
.add("preemptedAMContainers", metrics.getNumAMContainersPreempted())
|
||||
.add("preemptedNonAMContainers", metrics.getNumNonAMContainersPreempted())
|
||||
.add("preemptedResources", metrics.getResourcePreempted())
|
||||
|
@ -453,7 +453,7 @@ public static YarnApplicationAttemptState createApplicationAttemptState(
|
||||
DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
|
||||
BuilderUtils.newApplicationResourceUsageReport(-1, -1,
|
||||
Resources.createResource(-1, -1), Resources.createResource(-1, -1),
|
||||
Resources.createResource(-1, -1), 0, 0);
|
||||
Resources.createResource(-1, -1), 0, 0, 0, 0);
|
||||
|
||||
|
||||
/**
|
||||
|
@ -144,6 +144,10 @@ public void appFinished(RMApp app, RMAppState state, long finishedTime) {
|
||||
appMetrics.getVcoreSeconds());
|
||||
entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
|
||||
appMetrics.getMemorySeconds());
|
||||
entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS,
|
||||
appMetrics.getPreemptedMemorySeconds());
|
||||
entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS,
|
||||
appMetrics.getPreemptedVcoreSeconds());
|
||||
tEvent.setEventInfo(eventInfo);
|
||||
|
||||
entity.addEvent(tEvent);
|
||||
|
@ -193,6 +193,12 @@ private Set<TimelineMetric> getTimelinelineAppMetrics(
|
||||
entityMetrics.add(getTimelineMetric(
|
||||
ApplicationMetricsConstants.APP_MEM_METRICS, timestamp,
|
||||
appMetrics.getMemorySeconds()));
|
||||
entityMetrics.add(getTimelineMetric(
|
||||
ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS, timestamp,
|
||||
appMetrics.getPreemptedMemorySeconds()));
|
||||
entityMetrics.add(getTimelineMetric(
|
||||
ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS, timestamp,
|
||||
appMetrics.getPreemptedVcoreSeconds()));
|
||||
entityMetrics.add(getTimelineMetric(
|
||||
ApplicationMetricsConstants.APP_RESOURCE_PREEMPTED_CPU, timestamp,
|
||||
appMetrics.getResourcePreempted().getVirtualCores()));
|
||||
|
@ -69,6 +69,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
|
||||
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.StateMachine;
|
||||
@ -826,16 +827,19 @@ protected abstract void updateApplicationStateInternal(ApplicationId appId,
|
||||
*/
|
||||
public void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
|
||||
Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
|
||||
|
||||
RMAppAttemptMetrics attempMetrics = appAttempt.getRMAppAttemptMetrics();
|
||||
AggregateAppResourceUsage resUsage =
|
||||
appAttempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
|
||||
attempMetrics.getAggregateAppResourceUsage();
|
||||
ApplicationAttemptStateData attemptState =
|
||||
ApplicationAttemptStateData.newInstance(
|
||||
appAttempt.getAppAttemptId(),
|
||||
appAttempt.getMasterContainer(),
|
||||
credentials, appAttempt.getStartTime(),
|
||||
resUsage.getMemorySeconds(),
|
||||
resUsage.getVcoreSeconds());
|
||||
resUsage.getVcoreSeconds(),
|
||||
attempMetrics.getPreemptedMemory(),
|
||||
attempMetrics.getPreemptedVcore()
|
||||
);
|
||||
|
||||
dispatcher.getEventHandler().handle(
|
||||
new RMStateStoreAppAttemptEvent(attemptState));
|
||||
|
@ -40,7 +40,8 @@ public static ApplicationAttemptStateData newInstance(
|
||||
Credentials attemptTokens, long startTime, RMAppAttemptState finalState,
|
||||
String finalTrackingUrl, String diagnostics,
|
||||
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
|
||||
long finishTime, long memorySeconds, long vcoreSeconds) {
|
||||
long finishTime, long memorySeconds, long vcoreSeconds,
|
||||
long preemptedMemorySeconds, long preemptedVcoreSeconds) {
|
||||
ApplicationAttemptStateData attemptStateData =
|
||||
Records.newRecord(ApplicationAttemptStateData.class);
|
||||
attemptStateData.setAttemptId(attemptId);
|
||||
@ -55,16 +56,20 @@ public static ApplicationAttemptStateData newInstance(
|
||||
attemptStateData.setFinishTime(finishTime);
|
||||
attemptStateData.setMemorySeconds(memorySeconds);
|
||||
attemptStateData.setVcoreSeconds(vcoreSeconds);
|
||||
attemptStateData.setPreemptedMemorySeconds(preemptedMemorySeconds);
|
||||
attemptStateData.setPreemptedVcoreSeconds(preemptedVcoreSeconds);
|
||||
return attemptStateData;
|
||||
}
|
||||
|
||||
public static ApplicationAttemptStateData newInstance(
|
||||
ApplicationAttemptId attemptId, Container masterContainer,
|
||||
Credentials attemptTokens, long startTime, long memorySeconds,
|
||||
long vcoreSeconds) {
|
||||
long vcoreSeconds, long preemptedMemorySeconds,
|
||||
long preemptedVcoreSeconds) {
|
||||
return newInstance(attemptId, masterContainer, attemptTokens,
|
||||
startTime, null, "N/A", "", null, ContainerExitStatus.INVALID, 0,
|
||||
memorySeconds, vcoreSeconds);
|
||||
memorySeconds, vcoreSeconds,
|
||||
preemptedMemorySeconds, preemptedVcoreSeconds);
|
||||
}
|
||||
|
||||
|
||||
@ -182,4 +187,32 @@ public abstract void setFinalApplicationStatus(
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setVcoreSeconds(long vcoreSeconds);
|
||||
|
||||
/**
|
||||
* Get the <em>preempted memory seconds</em>
|
||||
* (in MB seconds) of the application.
|
||||
* @return <em>preempted memory seconds</em>
|
||||
* (in MB seconds) of the application
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract long getPreemptedMemorySeconds();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setPreemptedMemorySeconds(long memorySeconds);
|
||||
|
||||
/**
|
||||
* Get the <em>preempted vcore seconds</em>
|
||||
* of the application.
|
||||
* @return <em>preempted vcore seconds</em>
|
||||
* of the application
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract long getPreemptedVcoreSeconds();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setPreemptedVcoreSeconds(long vcoreSeconds);
|
||||
}
|
||||
|
@ -262,6 +262,30 @@ public void setVcoreSeconds(long vcoreSeconds) {
|
||||
builder.setVcoreSeconds(vcoreSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPreemptedMemorySeconds() {
|
||||
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getPreemptedMemorySeconds();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPreemptedVcoreSeconds() {
|
||||
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getPreemptedVcoreSeconds();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPreemptedMemorySeconds(long memorySeconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setPreemptedMemorySeconds(memorySeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPreemptedVcoreSeconds(long vcoreSeconds) {
|
||||
maybeInitBuilder();
|
||||
builder.setPreemptedVcoreSeconds(vcoreSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinalApplicationStatus getFinalApplicationStatus() {
|
||||
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||
|
@ -757,6 +757,12 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
|
||||
RMAppMetrics rmAppMetrics = getRMAppMetrics();
|
||||
appUsageReport.setMemorySeconds(rmAppMetrics.getMemorySeconds());
|
||||
appUsageReport.setVcoreSeconds(rmAppMetrics.getVcoreSeconds());
|
||||
appUsageReport.
|
||||
setPreemptedMemorySeconds(rmAppMetrics.
|
||||
getPreemptedMemorySeconds());
|
||||
appUsageReport.
|
||||
setPreemptedVcoreSeconds(rmAppMetrics.
|
||||
getPreemptedVcoreSeconds());
|
||||
}
|
||||
|
||||
if (currentApplicationAttemptId == null) {
|
||||
@ -1630,6 +1636,8 @@ public RMAppMetrics getRMAppMetrics() {
|
||||
int numNonAMContainerPreempted = 0;
|
||||
long memorySeconds = 0;
|
||||
long vcoreSeconds = 0;
|
||||
long preemptedMemorySeconds = 0;
|
||||
long preemptedVcoreSeconds = 0;
|
||||
for (RMAppAttempt attempt : attempts.values()) {
|
||||
if (null != attempt) {
|
||||
RMAppAttemptMetrics attemptMetrics =
|
||||
@ -1645,12 +1653,15 @@ public RMAppMetrics getRMAppMetrics() {
|
||||
attempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
|
||||
memorySeconds += resUsage.getMemorySeconds();
|
||||
vcoreSeconds += resUsage.getVcoreSeconds();
|
||||
preemptedMemorySeconds += attemptMetrics.getPreemptedMemory();
|
||||
preemptedVcoreSeconds += attemptMetrics.getPreemptedVcore();
|
||||
}
|
||||
}
|
||||
|
||||
return new RMAppMetrics(resourcePreempted,
|
||||
numNonAMContainerPreempted, numAMContainerPreempted,
|
||||
memorySeconds, vcoreSeconds);
|
||||
memorySeconds, vcoreSeconds,
|
||||
preemptedMemorySeconds, preemptedVcoreSeconds);
|
||||
}
|
||||
|
||||
@Private
|
||||
|
@ -26,15 +26,20 @@ public class RMAppMetrics {
|
||||
final int numAMContainersPreempted;
|
||||
final long memorySeconds;
|
||||
final long vcoreSeconds;
|
||||
private final long preemptedMemorySeconds;
|
||||
private final long preemptedVcoreSeconds;
|
||||
|
||||
public RMAppMetrics(Resource resourcePreempted,
|
||||
int numNonAMContainersPreempted, int numAMContainersPreempted,
|
||||
long memorySeconds, long vcoreSeconds) {
|
||||
long memorySeconds, long vcoreSeconds, long preemptedMemorySeconds,
|
||||
long preemptedVcoreSeconds) {
|
||||
this.resourcePreempted = resourcePreempted;
|
||||
this.numNonAMContainersPreempted = numNonAMContainersPreempted;
|
||||
this.numAMContainersPreempted = numAMContainersPreempted;
|
||||
this.memorySeconds = memorySeconds;
|
||||
this.vcoreSeconds = vcoreSeconds;
|
||||
this.preemptedMemorySeconds = preemptedMemorySeconds;
|
||||
this.preemptedVcoreSeconds = preemptedVcoreSeconds;
|
||||
}
|
||||
|
||||
public Resource getResourcePreempted() {
|
||||
@ -56,4 +61,13 @@ public long getMemorySeconds() {
|
||||
public long getVcoreSeconds() {
|
||||
return vcoreSeconds;
|
||||
}
|
||||
|
||||
public long getPreemptedMemorySeconds() {
|
||||
return preemptedMemorySeconds;
|
||||
}
|
||||
|
||||
public long getPreemptedVcoreSeconds() {
|
||||
return preemptedVcoreSeconds;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -889,6 +889,10 @@ public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
|
||||
this.attemptMetrics.getAggregateAppResourceUsage();
|
||||
report.setMemorySeconds(resUsage.getMemorySeconds());
|
||||
report.setVcoreSeconds(resUsage.getVcoreSeconds());
|
||||
report.setPreemptedMemorySeconds(
|
||||
this.attemptMetrics.getPreemptedMemory());
|
||||
report.setPreemptedVcoreSeconds(
|
||||
this.attemptMetrics.getPreemptedVcore());
|
||||
return report;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
@ -921,6 +925,9 @@ public void recover(RMState state) {
|
||||
this.finishTime = attemptState.getFinishTime();
|
||||
this.attemptMetrics.updateAggregateAppResourceUsage(
|
||||
attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
|
||||
this.attemptMetrics.updateAggregatePreemptedAppResourceUsage(
|
||||
attemptState.getPreemptedMemorySeconds(),
|
||||
attemptState.getPreemptedVcoreSeconds());
|
||||
}
|
||||
|
||||
public void transferStateFromAttempt(RMAppAttempt attempt) {
|
||||
@ -1296,7 +1303,9 @@ applicationAttemptId, getMasterContainer(),
|
||||
startTime, stateToBeStored, finalTrackingUrl, diags,
|
||||
finalStatus, exitStatus,
|
||||
getFinishTime(), resUsage.getMemorySeconds(),
|
||||
resUsage.getVcoreSeconds());
|
||||
resUsage.getVcoreSeconds(),
|
||||
this.attemptMetrics.getPreemptedMemory(),
|
||||
this.attemptMetrics.getPreemptedVcore());
|
||||
LOG.info("Updating application attempt " + applicationAttemptId
|
||||
+ " with final state: " + targetedFinalState + ", and exit status: "
|
||||
+ exitStatus);
|
||||
|
@ -50,6 +50,8 @@ public class RMAppAttemptMetrics {
|
||||
private WriteLock writeLock;
|
||||
private AtomicLong finishedMemorySeconds = new AtomicLong(0);
|
||||
private AtomicLong finishedVcoreSeconds = new AtomicLong(0);
|
||||
private AtomicLong preemptedMemorySeconds = new AtomicLong(0);
|
||||
private AtomicLong preemptedVcoreSeconds = new AtomicLong(0);
|
||||
private RMContext rmContext;
|
||||
|
||||
private int[][] localityStatistics =
|
||||
@ -98,6 +100,14 @@ public Resource getResourcePreempted() {
|
||||
}
|
||||
}
|
||||
|
||||
public long getPreemptedMemory() {
|
||||
return preemptedMemorySeconds.get();
|
||||
}
|
||||
|
||||
public long getPreemptedVcore() {
|
||||
return preemptedVcoreSeconds.get();
|
||||
}
|
||||
|
||||
public int getNumNonAMContainersPreempted() {
|
||||
return numNonAMContainersPreempted.get();
|
||||
}
|
||||
@ -134,6 +144,12 @@ public void updateAggregateAppResourceUsage(long finishedMemorySeconds,
|
||||
this.finishedVcoreSeconds.addAndGet(finishedVcoreSeconds);
|
||||
}
|
||||
|
||||
public void updateAggregatePreemptedAppResourceUsage(
|
||||
long preemptedMemorySeconds, long preemptedVcoreSeconds) {
|
||||
this.preemptedMemorySeconds.addAndGet(preemptedMemorySeconds);
|
||||
this.preemptedVcoreSeconds.addAndGet(preemptedVcoreSeconds);
|
||||
}
|
||||
|
||||
public void incNumAllocatedContainers(NodeType containerType,
|
||||
NodeType requestType) {
|
||||
localityStatistics[containerType.index][requestType.index]++;
|
||||
|
@ -729,19 +729,12 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {
|
||||
}
|
||||
|
||||
private static void updateAttemptMetrics(RMContainerImpl container) {
|
||||
// If this is a preempted container, update preemption metrics
|
||||
Resource resource = container.getContainer().getResource();
|
||||
RMAppAttempt rmAttempt = container.rmContext.getRMApps()
|
||||
.get(container.getApplicationAttemptId().getApplicationId())
|
||||
.getCurrentAppAttempt();
|
||||
|
||||
if (rmAttempt != null) {
|
||||
if (ContainerExitStatus.PREEMPTED == container.finishedStatus
|
||||
.getExitStatus()) {
|
||||
rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
|
||||
container);
|
||||
}
|
||||
|
||||
long usedMillis = container.finishTime - container.creationTime;
|
||||
long memorySeconds = resource.getMemorySize()
|
||||
* usedMillis / DateUtils.MILLIS_PER_SECOND;
|
||||
@ -749,6 +742,15 @@ private static void updateAttemptMetrics(RMContainerImpl container) {
|
||||
* usedMillis / DateUtils.MILLIS_PER_SECOND;
|
||||
rmAttempt.getRMAppAttemptMetrics()
|
||||
.updateAggregateAppResourceUsage(memorySeconds,vcoreSeconds);
|
||||
// If this is a preempted container, update preemption metrics
|
||||
if (ContainerExitStatus.PREEMPTED == container.finishedStatus
|
||||
.getExitStatus()) {
|
||||
rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
|
||||
container);
|
||||
rmAttempt.getRMAppAttemptMetrics()
|
||||
.updateAggregatePreemptedAppResourceUsage(memorySeconds,
|
||||
vcoreSeconds);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -914,7 +914,7 @@ public ApplicationResourceUsageReport getResourceUsageReport() {
|
||||
Resources.add(usedResourceClone, reservedResourceClone),
|
||||
runningResourceUsage.getMemorySeconds(),
|
||||
runningResourceUsage.getVcoreSeconds(), queueUsagePerc,
|
||||
clusterUsagePerc);
|
||||
clusterUsagePerc, 0, 0);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
@ -101,7 +101,13 @@ protected void createApplicationMetricsTable(Block html){
|
||||
._("Aggregate Resource Allocation:",
|
||||
String.format("%d MB-seconds, %d vcore-seconds",
|
||||
appMetrics == null ? "N/A" : appMetrics.getMemorySeconds(),
|
||||
appMetrics == null ? "N/A" : appMetrics.getVcoreSeconds()));
|
||||
appMetrics == null ? "N/A" : appMetrics.getVcoreSeconds()))
|
||||
._("Aggregate Preempted Resource Allocation:",
|
||||
String.format("%d MB-seconds, %d vcore-seconds",
|
||||
appMetrics == null ? "N/A" : appMetrics.getPreemptedMemorySeconds(),
|
||||
appMetrics == null ? "N/A" :
|
||||
appMetrics.getPreemptedVcoreSeconds()));
|
||||
|
||||
pdiv._();
|
||||
}
|
||||
|
||||
|
@ -100,6 +100,8 @@ public class AppInfo {
|
||||
protected long preemptedResourceVCores;
|
||||
protected int numNonAMContainerPreempted;
|
||||
protected int numAMContainerPreempted;
|
||||
private long preemptedMemorySeconds;
|
||||
private long preemptedVcoreSeconds;
|
||||
|
||||
// list of resource requests
|
||||
@XmlElement(name = "resourceRequests")
|
||||
@ -216,6 +218,8 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess,
|
||||
appMetrics.getResourcePreempted().getVirtualCores();
|
||||
memorySeconds = appMetrics.getMemorySeconds();
|
||||
vcoreSeconds = appMetrics.getVcoreSeconds();
|
||||
preemptedMemorySeconds = appMetrics.getPreemptedMemorySeconds();
|
||||
preemptedVcoreSeconds = appMetrics.getPreemptedVcoreSeconds();
|
||||
unmanagedApplication =
|
||||
appSubmissionContext.getUnmanagedAM();
|
||||
appNodeLabelExpression =
|
||||
@ -382,6 +386,13 @@ public long getVcoreSeconds() {
|
||||
return vcoreSeconds;
|
||||
}
|
||||
|
||||
public long getPreemptedMemorySeconds() {
|
||||
return preemptedMemorySeconds;
|
||||
}
|
||||
|
||||
public long getPreemptedVcoreSeconds() {
|
||||
return preemptedVcoreSeconds;
|
||||
}
|
||||
public List<ResourceRequestInfo> getResourceRequests() {
|
||||
return this.resourceRequests;
|
||||
}
|
||||
|
@ -85,6 +85,8 @@ message ApplicationAttemptStateDataProto {
|
||||
optional int64 memory_seconds = 10;
|
||||
optional int64 vcore_seconds = 11;
|
||||
optional int64 finish_time = 12;
|
||||
optional int64 preempted_memory_seconds = 13;
|
||||
optional int64 preempted_vcore_seconds = 14;
|
||||
}
|
||||
|
||||
message EpochProto {
|
||||
|
@ -689,7 +689,8 @@ public void testEscapeApplicationSummary() {
|
||||
when(app.getState()).thenReturn(RMAppState.RUNNING);
|
||||
when(app.getApplicationType()).thenReturn("MAPREDUCE");
|
||||
RMAppMetrics metrics =
|
||||
new RMAppMetrics(Resource.newInstance(1234, 56), 10, 1, 16384, 64);
|
||||
new RMAppMetrics(Resource.newInstance(1234, 56),
|
||||
10, 1, 16384, 64, 0, 0);
|
||||
when(app.getRMAppMetrics()).thenReturn(metrics);
|
||||
|
||||
RMAppManager.ApplicationSummary.SummaryBuilder summary =
|
||||
|
@ -194,7 +194,7 @@ public Set<NodeId> getRanNodes() {
|
||||
|
||||
@Override
|
||||
public RMAppMetrics getRMAppMetrics() {
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, 0, 0);
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, 0, 0, 0, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -328,7 +328,7 @@ public ApplicationReport createAndGetApplicationReport(
|
||||
String clientUserName, boolean allowAccess) {
|
||||
ApplicationResourceUsageReport usageReport =
|
||||
ApplicationResourceUsageReport.newInstance(0, 0, null, null, null,
|
||||
0, 0, 0, 0);
|
||||
0, 0, 0, 0, 0, 0);
|
||||
ApplicationReport report = ApplicationReport.newInstance(
|
||||
getApplicationId(), appAttemptId, getUser(), getQueue(),
|
||||
getName(), null, 0, null, null, getDiagnostics().toString(),
|
||||
|
@ -223,6 +223,16 @@ public void testPublishApplicationMetrics() throws Exception {
|
||||
app.getRMAppMetrics().getVcoreSeconds(),
|
||||
Long.parseLong(entity.getOtherInfo()
|
||||
.get(ApplicationMetricsConstants.APP_CPU_METRICS).toString()));
|
||||
Assert.assertEquals(
|
||||
app.getRMAppMetrics().getPreemptedMemorySeconds(),
|
||||
Long.parseLong(entity.getOtherInfo()
|
||||
.get(ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS)
|
||||
.toString()));
|
||||
Assert.assertEquals(
|
||||
app.getRMAppMetrics().getPreemptedVcoreSeconds(),
|
||||
Long.parseLong(entity.getOtherInfo()
|
||||
.get(ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS)
|
||||
.toString()));
|
||||
}
|
||||
Assert.assertEquals("context", entity.getOtherInfo()
|
||||
.get(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT));
|
||||
@ -496,7 +506,8 @@ private static RMApp createRMApp(ApplicationId appId) {
|
||||
when(app.getFinalApplicationStatus()).thenReturn(
|
||||
FinalApplicationStatus.UNDEFINED);
|
||||
when(app.getRMAppMetrics()).thenReturn(
|
||||
new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE));
|
||||
new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
|
||||
Integer.MAX_VALUE, Long.MAX_VALUE));
|
||||
Set<String> appTags = new HashSet<String>();
|
||||
appTags.add("test");
|
||||
appTags.add("tags");
|
||||
|
@ -211,7 +211,7 @@ public void testPublishApplicationMetrics() throws Exception {
|
||||
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
File appFile = new File(outputDirApp, timelineServiceFileName);
|
||||
Assert.assertTrue(appFile.exists());
|
||||
verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE, 6);
|
||||
verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE, 8);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
@ -355,7 +355,7 @@ private static RMApp createRMApp(ApplicationId appId) {
|
||||
FinalApplicationStatus.UNDEFINED);
|
||||
when(app.getRMAppMetrics()).thenReturn(
|
||||
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, Integer.MAX_VALUE,
|
||||
Long.MAX_VALUE));
|
||||
Long.MAX_VALUE, 0, 0));
|
||||
when(app.getApplicationTags()).thenReturn(Collections.<String> emptySet());
|
||||
ApplicationSubmissionContext appSubmissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
|
@ -361,7 +361,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper,
|
||||
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
||||
"myTrackingUrl", "attemptDiagnostics",
|
||||
FinalApplicationStatus.SUCCEEDED, 100,
|
||||
oldAttemptState.getFinishTime(), 0, 0);
|
||||
oldAttemptState.getFinishTime(), 0, 0, 0, 0);
|
||||
store.updateApplicationAttemptState(newAttemptState);
|
||||
|
||||
// test updating the state of an app/attempt whose initial state was not
|
||||
@ -385,7 +385,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper,
|
||||
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
||||
"myTrackingUrl", "attemptDiagnostics",
|
||||
FinalApplicationStatus.SUCCEEDED, 111,
|
||||
oldAttemptState.getFinishTime(), 0, 0);
|
||||
oldAttemptState.getFinishTime(), 0, 0, 0, 0);
|
||||
store.updateApplicationAttemptState(dummyAttempt);
|
||||
|
||||
// let things settle down
|
||||
|
@ -428,7 +428,7 @@ public void testFencedState() throws Exception {
|
||||
store.getCredentialsFromAppAttempt(mockAttempt),
|
||||
startTime, RMAppAttemptState.FINISHED, "testUrl",
|
||||
"test", FinalApplicationStatus.SUCCEEDED, 100,
|
||||
finishTime, 0, 0);
|
||||
finishTime, 0, 0, 0, 0);
|
||||
store.updateApplicationAttemptState(newAttemptState);
|
||||
assertEquals("RMStateStore should have been in fenced state",
|
||||
true, store.isFencedState());
|
||||
|
@ -64,7 +64,8 @@ public void testAppBlockRenderWithNullCurrentAppAttempt() throws Exception {
|
||||
when(app.getFinishTime()).thenReturn(0L);
|
||||
when(app.createApplicationState()).thenReturn(YarnApplicationState.FAILED);
|
||||
|
||||
RMAppMetrics appMetrics = new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, 0, 0);
|
||||
RMAppMetrics appMetrics = new RMAppMetrics(
|
||||
Resource.newInstance(0, 0), 0, 0, 0, 0, 0, 0);
|
||||
when(app.getRMAppMetrics()).thenReturn(appMetrics);
|
||||
|
||||
// initialize RM Context, and create RMApp, without creating RMAppAttempt
|
||||
|
@ -142,7 +142,8 @@ private static RMContext mockRMContext(List<RMAppState> states) {
|
||||
MockRMApp app = new MockRMApp(i, i, state) {
|
||||
@Override
|
||||
public RMAppMetrics getRMAppMetrics() {
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, 0, 0);
|
||||
return new RMAppMetrics(Resource.newInstance(0, 0),
|
||||
0, 0, 0, 0, 0, 0);
|
||||
}
|
||||
@Override
|
||||
public YarnApplicationState createApplicationState() {
|
||||
|
@ -1419,7 +1419,7 @@ public void verifyAppsXML(NodeList nodes, RMApp app)
|
||||
public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
|
||||
Exception {
|
||||
|
||||
int expectedNumberOfElements = 32;
|
||||
int expectedNumberOfElements = 34;
|
||||
String appNodeLabelExpression = null;
|
||||
String amNodeLabelExpression = null;
|
||||
if (app.getApplicationSubmissionContext()
|
||||
|
Loading…
Reference in New Issue
Block a user