YARN-4206. Add Application timeouts in Application report and CLI. Contributed by Rohith Sharma K S.
This commit is contained in:
parent
e15c20edba
commit
eb0a483ed0
@ -25,6 +25,7 @@
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
@ -447,4 +448,13 @@ public abstract void setLogAggregationStatus(
|
||||
|
||||
@Unstable
|
||||
public abstract void setAmNodeLabelExpression(String amNodeLabelExpression);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract List<ApplicationTimeout> getApplicationTimeouts();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setApplicationTimeouts(
|
||||
List<ApplicationTimeout> timeouts);
|
||||
}
|
||||
|
@ -0,0 +1,99 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* {@code ApplicationTimeout} is a report for configured application timeouts.
|
||||
* It includes details such as:
|
||||
* <ul>
|
||||
* <li>{@link ApplicationTimeoutType} of the timeout type.</li>
|
||||
* <li>Expiry time in ISO8601 standard with format
|
||||
* <b>yyyy-MM-dd'T'HH:mm:ss.SSSZ</b>.</li>
|
||||
* <li>Remaining time in seconds.</li>
|
||||
* </ul>
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract class ApplicationTimeout {
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public static ApplicationTimeout newInstance(ApplicationTimeoutType type,
|
||||
String expiryTime, long remainingTime) {
|
||||
ApplicationTimeout timeouts = Records.newRecord(ApplicationTimeout.class);
|
||||
timeouts.setTimeoutType(type);
|
||||
timeouts.setExpiryTime(expiryTime);
|
||||
timeouts.setRemainingTime(remainingTime);
|
||||
return timeouts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the application timeout type.
|
||||
* @return timeoutType of an application timeout.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract ApplicationTimeoutType getTimeoutType();
|
||||
|
||||
/**
|
||||
* Set the application timeout type.
|
||||
* @param timeoutType of an application timeout.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setTimeoutType(ApplicationTimeoutType timeoutType);
|
||||
|
||||
/**
|
||||
* Get <code>expiryTime</code> for given timeout type.
|
||||
* @return expiryTime in ISO8601 standard with format
|
||||
* <b>yyyy-MM-dd'T'HH:mm:ss.SSSZ</b>.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getExpiryTime();
|
||||
|
||||
/**
|
||||
* Set <code>expiryTime</code> for given timeout type.
|
||||
* @param expiryTime in ISO8601 standard with format
|
||||
* <b>yyyy-MM-dd'T'HH:mm:ss.SSSZ</b>.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setExpiryTime(String expiryTime);
|
||||
|
||||
/**
|
||||
* Get <code>Remaining Time</code> of an application for given timeout type.
|
||||
* @return Remaining Time in seconds.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract long getRemainingTime();
|
||||
|
||||
/**
|
||||
* Set <code>Remaining Time</code> of an application for given timeout type.
|
||||
* @param remainingTime in seconds.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setRemainingTime(long remainingTime);
|
||||
}
|
@ -214,6 +214,13 @@ message ApplicationReportProto {
|
||||
optional PriorityProto priority = 23;
|
||||
optional string appNodeLabelExpression = 24;
|
||||
optional string amNodeLabelExpression = 25;
|
||||
repeated ApplicationTimeoutProto application_timeouts = 26;
|
||||
}
|
||||
|
||||
message ApplicationTimeoutProto {
|
||||
required ApplicationTimeoutTypeProto application_timeout_type = 1;
|
||||
optional string expire_time = 2;
|
||||
optional int64 remaining_time = 3;
|
||||
}
|
||||
|
||||
enum LogAggregationStatusProto {
|
||||
|
@ -43,6 +43,8 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
@ -844,4 +846,13 @@ public abstract Priority updateApplicationPriority(
|
||||
*/
|
||||
public abstract void signalToContainer(ContainerId containerId,
|
||||
SignalContainerCommand command) throws YarnException, IOException;
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
|
||||
UpdateApplicationTimeoutsRequest request)
|
||||
throws YarnException, IOException {
|
||||
throw new UnsupportedOperationException("The sub-class extending "
|
||||
+ YarnClient.class.getName() + " is expected to implement this !");
|
||||
}
|
||||
}
|
||||
|
@ -84,6 +84,8 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
@ -917,4 +919,11 @@ public void signalToContainer(ContainerId containerId,
|
||||
SignalContainerRequest.newInstance(containerId, command);
|
||||
rmClient.signalToContainer(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
|
||||
UpdateApplicationTimeoutsRequest request)
|
||||
throws YarnException, IOException {
|
||||
return rmClient.updateApplicationTimeouts(request);
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.nio.charset.Charset;
|
||||
import java.text.DecimalFormat;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -39,11 +40,14 @@
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
@ -53,7 +57,6 @@
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@ -84,6 +87,7 @@ public class ApplicationCLI extends YarnCLI {
|
||||
public static final String CONTAINER = "container";
|
||||
public static final String APP_ID = "appId";
|
||||
public static final String UPDATE_PRIORITY = "updatePriority";
|
||||
public static final String UPDATE_LIFETIME = "updateLifetime";
|
||||
|
||||
private boolean allAppStates;
|
||||
|
||||
@ -139,6 +143,9 @@ public int run(String[] args) throws Exception {
|
||||
opts.addOption(UPDATE_PRIORITY, true,
|
||||
"update priority of an application. ApplicationId can be"
|
||||
+ " passed using 'appId' option.");
|
||||
opts.addOption(UPDATE_LIFETIME, true,
|
||||
"update timeout of an application from NOW. ApplicationId can be"
|
||||
+ " passed using 'appId' option. Timeout value is in seconds.");
|
||||
Option killOpt = new Option(KILL_CMD, true, "Kills the application. "
|
||||
+ "Set of applications can be provided separated with space");
|
||||
killOpt.setValueSeparator(' ');
|
||||
@ -150,6 +157,7 @@ public int run(String[] args) throws Exception {
|
||||
opts.getOption(STATUS_CMD).setArgName("Application ID");
|
||||
opts.getOption(APP_ID).setArgName("Application ID");
|
||||
opts.getOption(UPDATE_PRIORITY).setArgName("Priority");
|
||||
opts.getOption(UPDATE_LIFETIME).setArgName("Timeout");
|
||||
} else if (args.length > 0 && args[0].equalsIgnoreCase(APPLICATION_ATTEMPT)) {
|
||||
title = APPLICATION_ATTEMPT;
|
||||
opts.addOption(STATUS_CMD, true,
|
||||
@ -296,6 +304,17 @@ public int run(String[] args) throws Exception {
|
||||
}
|
||||
updateApplicationPriority(cliParser.getOptionValue(APP_ID),
|
||||
cliParser.getOptionValue(UPDATE_PRIORITY));
|
||||
} else if (cliParser.hasOption(UPDATE_LIFETIME)) {
|
||||
if (!cliParser.hasOption(APP_ID)) {
|
||||
printUsage(title, opts);
|
||||
return exitCode;
|
||||
}
|
||||
|
||||
long timeoutInSec =
|
||||
Long.parseLong(cliParser.getOptionValue(UPDATE_LIFETIME));
|
||||
|
||||
updateApplicationTimeout(cliParser.getOptionValue(APP_ID),
|
||||
ApplicationTimeoutType.LIFETIME, timeoutInSec);
|
||||
} else if (cliParser.hasOption(SIGNAL_CMD)) {
|
||||
if (args.length < 3 || args.length > 4) {
|
||||
printUsage(title, opts);
|
||||
@ -316,6 +335,22 @@ public int run(String[] args) throws Exception {
|
||||
return 0;
|
||||
}
|
||||
|
||||
private void updateApplicationTimeout(String applicationId,
|
||||
ApplicationTimeoutType timeoutType, long timeoutInSec)
|
||||
throws YarnException, IOException {
|
||||
ApplicationId appId = ApplicationId.fromString(applicationId);
|
||||
String newTimeout =
|
||||
Times.formatISO8601(System.currentTimeMillis() + timeoutInSec * 1000);
|
||||
sysout.println("Updating timeout for given timeoutType: "
|
||||
+ timeoutType.toString() + " of an application " + applicationId);
|
||||
UpdateApplicationTimeoutsRequest request = UpdateApplicationTimeoutsRequest
|
||||
.newInstance(appId, Collections.singletonMap(timeoutType, newTimeout));
|
||||
client.updateApplicationTimeouts(request);
|
||||
sysout.println(
|
||||
"Successfully updated " + timeoutType.toString() + " of an application "
|
||||
+ applicationId + ". New expiry time is " + newTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Signals the containerId
|
||||
*
|
||||
@ -678,7 +713,13 @@ private int printApplicationReport(String applicationId)
|
||||
appReportStr.print("\tApplication Node Label Expression : ");
|
||||
appReportStr.println(appReport.getAppNodeLabelExpression());
|
||||
appReportStr.print("\tAM container Node Label Expression : ");
|
||||
appReportStr.print(appReport.getAmNodeLabelExpression());
|
||||
appReportStr.println(appReport.getAmNodeLabelExpression());
|
||||
for (ApplicationTimeout timeout : appReport.getApplicationTimeouts()) {
|
||||
appReportStr.print("\tTimeoutType : " + timeout.getTimeoutType());
|
||||
appReportStr.print("\tExpiryTime : " + timeout.getExpiryTime());
|
||||
appReportStr.println(
|
||||
"\tRemainingTime : " + timeout.getRemainingTime() + "seconds");
|
||||
}
|
||||
} else {
|
||||
appReportStr.print("Application with id '" + applicationId
|
||||
+ "' doesn't exist in RM.");
|
||||
|
@ -36,6 +36,7 @@
|
||||
import java.io.PrintWriter;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
@ -46,11 +47,14 @@
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.lang.time.DateFormatUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
@ -124,6 +128,11 @@ public void testGetApplicationReport() throws Exception {
|
||||
null, null, false, Priority.newInstance(0), "high-mem", "high-mem");
|
||||
newApplicationReport.setLogAggregationStatus(LogAggregationStatus.SUCCEEDED);
|
||||
newApplicationReport.setPriority(Priority.newInstance(0));
|
||||
ApplicationTimeout timeout = ApplicationTimeout
|
||||
.newInstance(ApplicationTimeoutType.LIFETIME, "UNLIMITED", -1);
|
||||
newApplicationReport
|
||||
.setApplicationTimeouts(Collections.singletonList(timeout));
|
||||
|
||||
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
|
||||
newApplicationReport);
|
||||
int result = cli.run(new String[] { "application", "-status", applicationId.toString() });
|
||||
@ -155,6 +164,10 @@ public void testGetApplicationReport() throws Exception {
|
||||
pw.println("\tUnmanaged Application : false");
|
||||
pw.println("\tApplication Node Label Expression : high-mem");
|
||||
pw.println("\tAM container Node Label Expression : high-mem");
|
||||
pw.print("\tTimeoutType : LIFETIME");
|
||||
pw.print("\tExpiryTime : UNLIMITED");
|
||||
pw.println("\tRemainingTime : -1seconds");
|
||||
pw.println();
|
||||
pw.close();
|
||||
String appReportStr = baos.toString("UTF-8");
|
||||
Assert.assertEquals(appReportStr, sysOutStream.toString());
|
||||
@ -1984,6 +1997,10 @@ private String createApplicationCLIHelpMessage() throws IOException {
|
||||
pw.println(" specify which queue to move an");
|
||||
pw.println(" application to.");
|
||||
pw.println(" -status <Application ID> Prints the status of the application.");
|
||||
pw.println(" -updateLifetime <Timeout> update timeout of an application from");
|
||||
pw.println(" NOW. ApplicationId can be passed using");
|
||||
pw.println(" 'appId' option. Timeout value is in");
|
||||
pw.println(" seconds.");
|
||||
pw.println(" -updatePriority <Priority> update priority of an application.");
|
||||
pw.println(" ApplicationId can be passed using 'appId'");
|
||||
pw.println(" option.");
|
||||
@ -2074,4 +2091,27 @@ public void testAppAttemptReportWhileContainerIsNotAssigned()
|
||||
applicationId.toString() });
|
||||
assertEquals(0, result);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testUpdateApplicationTimeout() throws Exception {
|
||||
ApplicationCLI cli = createAndGetAppCLI();
|
||||
ApplicationId applicationId = ApplicationId.newInstance(1234, 6);
|
||||
|
||||
ApplicationReport appReport = ApplicationReport.newInstance(applicationId,
|
||||
ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue",
|
||||
"appname", "host", 124, null, YarnApplicationState.RUNNING,
|
||||
"diagnostics", "url", 0, 0, FinalApplicationStatus.UNDEFINED, null,
|
||||
"N/A", 0.53789f, "YARN", null);
|
||||
ApplicationTimeout timeout = ApplicationTimeout
|
||||
.newInstance(ApplicationTimeoutType.LIFETIME, "N/A", -1);
|
||||
appReport.setApplicationTimeouts(Collections.singletonList(timeout));
|
||||
when(client.getApplicationReport(any(ApplicationId.class)))
|
||||
.thenReturn(appReport);
|
||||
|
||||
int result = cli.run(new String[] { "application", "-appId",
|
||||
applicationId.toString(), "-updateLifetime", "10" });
|
||||
Assert.assertEquals(result, 0);
|
||||
verify(client)
|
||||
.updateApplicationTimeouts(any(UpdateApplicationTimeoutsRequest.class));
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
@ -35,6 +36,7 @@
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
|
||||
@ -42,7 +44,10 @@
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
@Private
|
||||
@ -58,6 +63,7 @@ public class ApplicationReportPBImpl extends ApplicationReport {
|
||||
private Token amRmToken = null;
|
||||
private Set<String> applicationTags = null;
|
||||
private Priority priority = null;
|
||||
private List<ApplicationTimeout> applicationTimeoutList = null;
|
||||
|
||||
public ApplicationReportPBImpl() {
|
||||
builder = ApplicationReportProto.newBuilder();
|
||||
@ -492,6 +498,9 @@ private void mergeLocalToBuilder() {
|
||||
builder.getPriority())) {
|
||||
builder.setPriority(convertToProtoFormat(this.priority));
|
||||
}
|
||||
if (this.applicationTimeoutList != null) {
|
||||
addLocalApplicationTimeoutToProto();
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
@ -668,4 +677,78 @@ public void setAmNodeLabelExpression(String amNodeLabelExpression) {
|
||||
}
|
||||
builder.setAmNodeLabelExpression((amNodeLabelExpression));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationTimeout> getApplicationTimeouts() {
|
||||
initLocalApplicationsList();
|
||||
return this.applicationTimeoutList;
|
||||
}
|
||||
|
||||
private void initLocalApplicationsList() {
|
||||
if (this.applicationTimeoutList != null) {
|
||||
return;
|
||||
}
|
||||
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<ApplicationTimeoutProto> list = p.getApplicationTimeoutsList();
|
||||
this.applicationTimeoutList = new ArrayList<ApplicationTimeout>();
|
||||
|
||||
for (ApplicationTimeoutProto a : list) {
|
||||
this.applicationTimeoutList.add(convertFromProtoFormat(a));
|
||||
}
|
||||
}
|
||||
|
||||
private void addLocalApplicationTimeoutToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearApplicationTimeouts();
|
||||
if (applicationTimeoutList == null) {
|
||||
return;
|
||||
}
|
||||
Iterable<ApplicationTimeoutProto> iterable =
|
||||
new Iterable<ApplicationTimeoutProto>() {
|
||||
@Override
|
||||
public Iterator<ApplicationTimeoutProto> iterator() {
|
||||
return new Iterator<ApplicationTimeoutProto>() {
|
||||
|
||||
private Iterator<ApplicationTimeout> iter =
|
||||
applicationTimeoutList.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationTimeoutProto next() {
|
||||
return convertToProtoFormat(iter.next());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
};
|
||||
builder.addAllApplicationTimeouts(iterable);
|
||||
}
|
||||
|
||||
private ApplicationTimeoutPBImpl convertFromProtoFormat(
|
||||
ApplicationTimeoutProto p) {
|
||||
return new ApplicationTimeoutPBImpl(p);
|
||||
}
|
||||
|
||||
private ApplicationTimeoutProto convertToProtoFormat(ApplicationTimeout t) {
|
||||
return ((ApplicationTimeoutPBImpl) t).getProto();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationTimeouts(List<ApplicationTimeout> timeouts) {
|
||||
maybeInitBuilder();
|
||||
if (timeouts == null) {
|
||||
builder.clearApplicationTimeouts();
|
||||
}
|
||||
this.applicationTimeoutList = timeouts;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,130 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutProtoOrBuilder;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
/**
|
||||
* PB implementation for ApplicationTimeout class.
|
||||
*/
|
||||
public class ApplicationTimeoutPBImpl extends ApplicationTimeout {
|
||||
|
||||
private ApplicationTimeoutProto proto =
|
||||
ApplicationTimeoutProto.getDefaultInstance();
|
||||
private ApplicationTimeoutProto.Builder builder = null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
public ApplicationTimeoutPBImpl() {
|
||||
builder = ApplicationTimeoutProto.newBuilder();
|
||||
}
|
||||
|
||||
public ApplicationTimeoutPBImpl(ApplicationTimeoutProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public ApplicationTimeoutProto getProto() {
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = ApplicationTimeoutProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationTimeoutType getTimeoutType() {
|
||||
ApplicationTimeoutProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasApplicationTimeoutType()) {
|
||||
return null;
|
||||
}
|
||||
return ProtoUtils.convertFromProtoFormat(p.getApplicationTimeoutType());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeoutType(ApplicationTimeoutType type) {
|
||||
maybeInitBuilder();
|
||||
if (type == null) {
|
||||
builder.clearApplicationTimeoutType();
|
||||
return;
|
||||
}
|
||||
builder.setApplicationTimeoutType(ProtoUtils.convertToProtoFormat(type));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExpiryTime() {
|
||||
ApplicationTimeoutProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasExpireTime()) {
|
||||
return null;
|
||||
}
|
||||
return p.getExpireTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setExpiryTime(String expiryTime) {
|
||||
maybeInitBuilder();
|
||||
if (expiryTime == null) {
|
||||
builder.clearExpireTime();
|
||||
return;
|
||||
}
|
||||
builder.setExpireTime(expiryTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRemainingTime() {
|
||||
ApplicationTimeoutProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getRemainingTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRemainingTime(long remainingTime) {
|
||||
maybeInitBuilder();
|
||||
builder.setRemainingTime(remainingTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null) {
|
||||
return false;
|
||||
}
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
}
|
@ -104,6 +104,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
@ -397,6 +398,7 @@ public static void setup() throws Exception {
|
||||
generateByNewInstance(RestartContainerResponse.class);
|
||||
generateByNewInstance(RollbackResponse.class);
|
||||
generateByNewInstance(CommitResponse.class);
|
||||
generateByNewInstance(ApplicationTimeout.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -56,6 +56,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
@ -112,6 +113,7 @@
|
||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
|
||||
@ -122,6 +124,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
|
||||
private static final String UNAVAILABLE = "N/A";
|
||||
private static final String UNLIMITED = "UNLIMITED";
|
||||
private static final long UNKNOWN = -1L;
|
||||
private static final EnumSet<RMAppState> COMPLETED_APP_STATES =
|
||||
EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
|
||||
RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
|
||||
@ -787,6 +791,19 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
|
||||
report.setUnmanagedApp(submissionContext.getUnmanagedAM());
|
||||
report.setAppNodeLabelExpression(getAppNodeLabelExpression());
|
||||
report.setAmNodeLabelExpression(getAmNodeLabelExpression());
|
||||
|
||||
ApplicationTimeout timeout = ApplicationTimeout
|
||||
.newInstance(ApplicationTimeoutType.LIFETIME, UNLIMITED, UNKNOWN);
|
||||
// Currently timeout type supported is LIFETIME. When more timeout types
|
||||
// are supported in YARN-5692, the below logic need to be changed.
|
||||
if (!this.applicationTimeouts.isEmpty()) {
|
||||
long timeoutInMillis = applicationTimeouts
|
||||
.get(ApplicationTimeoutType.LIFETIME).longValue();
|
||||
timeout.setExpiryTime(Times.formatISO8601(timeoutInMillis));
|
||||
timeout.setRemainingTime(
|
||||
Math.max((timeoutInMillis - systemClock.getTime()) / 1000, 0));
|
||||
}
|
||||
report.setApplicationTimeouts(Collections.singletonList(timeout));
|
||||
return report;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
|
@ -24,18 +24,23 @@
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
@ -101,8 +106,9 @@ public void testApplicationLifetimeMonitor() throws Exception {
|
||||
new HashMap<ApplicationTimeoutType, String>();
|
||||
long newLifetime = 10L;
|
||||
// update 10L seconds more to timeout
|
||||
updateTimeout.put(ApplicationTimeoutType.LIFETIME,
|
||||
Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000));
|
||||
String formatISO8601 =
|
||||
Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000);
|
||||
updateTimeout.put(ApplicationTimeoutType.LIFETIME, formatISO8601);
|
||||
UpdateApplicationTimeoutsRequest request =
|
||||
UpdateApplicationTimeoutsRequest.newInstance(app2.getApplicationId(),
|
||||
updateTimeout);
|
||||
@ -124,6 +130,26 @@ public void testApplicationLifetimeMonitor() throws Exception {
|
||||
Assert.assertTrue("Application lifetime value not updated",
|
||||
afterUpdate > beforeUpdate);
|
||||
|
||||
// verify for application report.
|
||||
RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
GetApplicationReportRequest appRequest =
|
||||
recordFactory.newRecordInstance(GetApplicationReportRequest.class);
|
||||
appRequest.setApplicationId(app2.getApplicationId());
|
||||
List<ApplicationTimeout> applicationTimeoutList = rm.getRMContext()
|
||||
.getClientRMService().getApplicationReport(appRequest)
|
||||
.getApplicationReport().getApplicationTimeouts();
|
||||
Assert.assertTrue("Application Timeout list are empty.",
|
||||
!applicationTimeoutList.isEmpty());
|
||||
ApplicationTimeout timeout = applicationTimeoutList.iterator().next();
|
||||
Assert.assertEquals("Application timeout Type is incorrect.",
|
||||
ApplicationTimeoutType.LIFETIME.toString(),
|
||||
timeout.getTimeoutType().toString());
|
||||
Assert.assertEquals("Application timeout string is incorrect.",
|
||||
formatISO8601, timeout.getExpiryTime());
|
||||
Assert.assertTrue("Application remaining time is incorrect",
|
||||
timeout.getRemainingTime() > 0);
|
||||
|
||||
rm.waitForState(app2.getApplicationId(), RMAppState.KILLED);
|
||||
// verify for app killed with updated lifetime
|
||||
Assert.assertTrue("Application killed before lifetime value",
|
||||
|
Loading…
Reference in New Issue
Block a user