From bcc15c6290b3912a054323695a6a931b0de163bd Mon Sep 17 00:00:00 2001 From: Jian He Date: Wed, 9 Nov 2016 14:33:58 -0800 Subject: [PATCH] YARN-5611. Provide an API to update lifetime of an application. Contributed by Rohith Sharma K S --- .../hadoop/mapred/TestClientRedirect.java | 9 + .../yarn/api/ApplicationClientProtocol.java | 23 ++ .../UpdateApplicationTimeoutsRequest.java | 81 +++++++ .../UpdateApplicationTimeoutsResponse.java | 46 ++++ .../records/ApplicationSubmissionContext.java | 4 + .../hadoop/yarn/conf/YarnConfiguration.java | 6 +- .../proto/applicationclient_protocol.proto | 1 + .../src/main/proto/yarn_protos.proto | 5 + .../src/main/proto/yarn_service_protos.proto | 9 + ...ApplicationClientProtocolPBClientImpl.java | 21 +- ...pplicationClientProtocolPBServiceImpl.java | 22 ++ ...pdateApplicationTimeoutsRequestPBImpl.java | 220 ++++++++++++++++++ ...dateApplicationTimeoutsResponsePBImpl.java | 73 ++++++ .../yarn/util/AbstractLivelinessMonitor.java | 17 +- .../org/apache/hadoop/yarn/util/Times.java | 33 +++ .../src/main/resources/yarn-default.xml | 4 +- .../amrmproxy/MockResourceManagerFacade.java | 9 + .../resourcemanager/ClientRMService.java | 137 ++++++++--- .../server/resourcemanager/RMAppManager.java | 37 +++ .../server/resourcemanager/RMAuditLogger.java | 4 +- .../server/resourcemanager/RMServerUtils.java | 48 +++- .../recovery/RMStateStore.java | 28 ++- .../recovery/RMStateUpdateAppEvent.java | 15 +- .../records/ApplicationStateData.java | 27 +++ .../impl/pb/ApplicationStateDataPBImpl.java | 86 +++++++ .../server/resourcemanager/rmapp/RMApp.java | 3 + .../resourcemanager/rmapp/RMAppImpl.java | 64 ++++- .../rmapp/monitor/RMAppLifetimeMonitor.java | 72 +++--- .../scheduler/capacity/CapacityScheduler.java | 3 +- ...yarn_server_resourcemanager_recovery.proto | 1 + .../applicationsmanager/MockAsm.java | 6 + .../resourcemanager/rmapp/MockRMApp.java | 6 + .../rmapp/TestApplicationLifetimeMonitor.java | 150 +++++++++++- 33 files changed, 1149 insertions(+), 121 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index 255f998df1..65eac65484 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -124,6 +124,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -485,6 +487,13 @@ public SignalContainerResponse signalToContainer( SignalContainerRequest request) throws IOException { return null; } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return null; + } } class HistoryService extends AMService implements HSClientProtocol { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 8ee43fbcbc..394454f20b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -59,6 +59,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -566,4 +568,25 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( SignalContainerResponse signalToContainer( SignalContainerRequest request) throws YarnException, IOException; + + /** + *

+ * The interface used by client to set ApplicationTimeouts of an application. + * The UpdateApplicationTimeoutsRequest should have timeout value with + * absolute time with ISO8601 format yyyy-MM-dd'T'HH:mm:ss.SSSZ. + *

+ * Note: If application timeout value is less than or equal to current + * time then update application throws YarnException. + * @param request to set ApplicationTimeouts of an application + * @return an empty response that the update has completed successfully. + * @throws YarnException if update request has empty values or application is + * in completing states. + * @throws IOException on IO failures + */ + @Public + @Unstable + @Idempotent + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java new file mode 100644 index 0000000000..0e81e7e34c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The request sent by the client to the ResourceManager to set or + * update the application timeout. + *

+ *

+ * The request includes the {@link ApplicationId} of the application and timeout + * to be set for an application + *

+ */ +@Public +@Unstable +public abstract class UpdateApplicationTimeoutsRequest { + public static UpdateApplicationTimeoutsRequest newInstance( + ApplicationId applicationId, + Map applicationTimeouts) { + UpdateApplicationTimeoutsRequest request = + Records.newRecord(UpdateApplicationTimeoutsRequest.class); + request.setApplicationId(applicationId); + request.setApplicationTimeouts(applicationTimeouts); + return request; + } + + /** + * Get the ApplicationId of the application. + * @return ApplicationId of the application + */ + public abstract ApplicationId getApplicationId(); + + /** + * Set the ApplicationId of the application. + * @param applicationId ApplicationId of the application + */ + public abstract void setApplicationId(ApplicationId applicationId); + + /** + * Get ApplicationTimeouts of the application. Timeout value is + * in ISO8601 standard with format yyyy-MM-dd'T'HH:mm:ss.SSSZ. + * @return all ApplicationTimeouts of the application. + */ + public abstract Map getApplicationTimeouts(); + + /** + * Set the ApplicationTimeouts for the application. Timeout value + * is absolute. Timeout value should meet ISO8601 format. Support ISO8601 + * format is yyyy-MM-dd'T'HH:mm:ss.SSSZ. All pre-existing Map entries + * are cleared before adding the new Map. + * @param applicationTimeouts ApplicationTimeoutss for the + * application + */ + public abstract void setApplicationTimeouts( + Map applicationTimeouts); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java new file mode 100644 index 0000000000..bd02bb85e8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The response sent by the ResourceManager to the client on update + * application timeout. + *

+ *

+ * A response without exception means that the update has completed + * successfully. + *

+ */ +@Public +@Unstable +public abstract class UpdateApplicationTimeoutsResponse { + + public static UpdateApplicationTimeoutsResponse newInstance() { + UpdateApplicationTimeoutsResponse response = + Records.newRecord(UpdateApplicationTimeoutsResponse.class); + return response; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 83f601a5f5..e562aaae5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -549,6 +549,10 @@ public abstract void setLogAggregationContext( /** * Set the ApplicationTimeouts for the application in seconds. * All pre-existing Map entries are cleared before adding the new Map. + *

+ * Note: If application timeout value is less than or equal to zero + * then application submission will throw an exception. + *

* @param applicationTimeouts ApplicationTimeoutss for the * application */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1fd25a79d8..b95bd1a256 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1540,10 +1540,10 @@ public static boolean isAclEnabled(Configuration conf) { // Configurations for applicaiton life time monitor feature - public static final String RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS = - RM_PREFIX + "application-timeouts.lifetime-monitor.interval-ms"; + public static final String RM_APPLICATION_MONITOR_INTERVAL_MS = + RM_PREFIX + "application-timeouts.monitor.interval-ms"; - public static final long DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS = + public static final long DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS = 60000; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto index f1c3839428..ba79db09a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto @@ -60,4 +60,5 @@ service ApplicationClientProtocolService { rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto); rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto); + rpc updateApplicationTimeouts (UpdateApplicationTimeoutsRequestProto) returns (UpdateApplicationTimeoutsResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 9c746fde30..b59d02b4fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -377,6 +377,11 @@ message ApplicationTimeoutMapProto { optional int64 timeout = 2; } +message ApplicationUpdateTimeoutMapProto { + optional ApplicationTimeoutTypeProto application_timeout_type = 1; + optional string expire_time = 2; +} + message LogAggregationContextProto { optional string include_pattern = 1 [default = ".*"]; optional string exclude_pattern = 2 [default = ""]; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 6526bf97a3..d9230d4707 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -267,6 +267,15 @@ message SignalContainerRequestProto { message SignalContainerResponseProto { } +message UpdateApplicationTimeoutsRequestProto { + required ApplicationIdProto applicationId = 1; + repeated ApplicationUpdateTimeoutMapProto application_timeouts = 2; +} + +message UpdateApplicationTimeoutsResponseProto { + repeated ApplicationUpdateTimeoutMapProto application_timeouts = 1; +} + ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// ////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java index 2d755a2f78..ad7cb29608 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java @@ -83,6 +83,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -139,6 +141,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; @@ -165,7 +169,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import com.google.protobuf.ServiceException; @@ -600,4 +604,19 @@ public SignalContainerResponse signalToContainer( return null; } } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + UpdateApplicationTimeoutsRequestProto requestProto = + ((UpdateApplicationTimeoutsRequestPBImpl) request).getProto(); + try { + return new UpdateApplicationTimeoutsResponsePBImpl( + proxy.updateApplicationTimeouts(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java index 300ef57cf6..93ce6a343c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; @@ -111,6 +112,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -162,6 +165,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; @@ -609,4 +614,21 @@ public SignalContainerResponseProto signalToContainer( throw new ServiceException(e); } } + + @Override + public UpdateApplicationTimeoutsResponseProto updateApplicationTimeouts( + RpcController controller, UpdateApplicationTimeoutsRequestProto proto) + throws ServiceException { + UpdateApplicationTimeoutsRequestPBImpl request = + new UpdateApplicationTimeoutsRequestPBImpl(proto); + try { + UpdateApplicationTimeoutsResponse response = + real.updateApplicationTimeouts(request); + return ((UpdateApplicationTimeoutsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java new file mode 100644 index 0000000000..1f86c5524b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationUpdateTimeoutMapProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class UpdateApplicationTimeoutsRequestPBImpl + extends UpdateApplicationTimeoutsRequest { + + UpdateApplicationTimeoutsRequestProto proto = + UpdateApplicationTimeoutsRequestProto.getDefaultInstance(); + UpdateApplicationTimeoutsRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + private Map applicationTimeouts = null; + + public UpdateApplicationTimeoutsRequestPBImpl() { + builder = UpdateApplicationTimeoutsRequestProto.newBuilder(); + } + + public UpdateApplicationTimeoutsRequestPBImpl( + UpdateApplicationTimeoutsRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public UpdateApplicationTimeoutsRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UpdateApplicationTimeoutsRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + if (this.applicationTimeouts != null) { + addApplicationTimeouts(); + } + } + + @Override + public ApplicationId getApplicationId() { + UpdateApplicationTimeoutsRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (this.applicationId != null) { + return applicationId; + } // Else via proto + if (!p.hasApplicationId()) { + return null; + } + applicationId = convertFromProtoFormat(p.getApplicationId()); + return applicationId; + } + + @Override + public void setApplicationId(ApplicationId applicationId) { + maybeInitBuilder(); + if (applicationId == null) { + builder.clearApplicationId(); + } + this.applicationId = applicationId; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public Map getApplicationTimeouts() { + initApplicationTimeout(); + return this.applicationTimeouts; + } + + private void initApplicationTimeout() { + if (this.applicationTimeouts != null) { + return; + } + UpdateApplicationTimeoutsRequestProtoOrBuilder p = + viaProto ? proto : builder; + List lists = + p.getApplicationTimeoutsList(); + this.applicationTimeouts = + new HashMap(lists.size()); + for (ApplicationUpdateTimeoutMapProto timeoutProto : lists) { + this.applicationTimeouts.put( + ProtoUtils + .convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()), + timeoutProto.getExpireTime()); + } + } + + @Override + public void setApplicationTimeouts( + Map appTimeouts) { + if (appTimeouts == null) { + return; + } + initApplicationTimeout(); + this.applicationTimeouts.clear(); + this.applicationTimeouts.putAll(appTimeouts); + } + + private void addApplicationTimeouts() { + maybeInitBuilder(); + builder.clearApplicationTimeouts(); + if (applicationTimeouts == null) { + return; + } + Iterable values = + new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iterator = + applicationTimeouts.keySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ApplicationUpdateTimeoutMapProto next() { + ApplicationTimeoutType key = iterator.next(); + return ApplicationUpdateTimeoutMapProto.newBuilder() + .setExpireTime(applicationTimeouts.get(key)) + .setApplicationTimeoutType( + ProtoUtils.convertToProtoFormat(key)) + .build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationTimeouts(values); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java new file mode 100644 index 0000000000..74f17155f4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class UpdateApplicationTimeoutsResponsePBImpl + extends UpdateApplicationTimeoutsResponse { + UpdateApplicationTimeoutsResponseProto proto = + UpdateApplicationTimeoutsResponseProto.getDefaultInstance(); + UpdateApplicationTimeoutsResponseProto.Builder builder = null; + boolean viaProto = false; + + public UpdateApplicationTimeoutsResponsePBImpl() { + builder = UpdateApplicationTimeoutsResponseProto.newBuilder(); + } + + public UpdateApplicationTimeoutsResponsePBImpl( + UpdateApplicationTimeoutsResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public UpdateApplicationTimeoutsResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java index b605026151..638128e4bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java @@ -46,6 +46,7 @@ public abstract class AbstractLivelinessMonitor extends AbstractService { public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins private long expireInterval = DEFAULT_EXPIRE; private long monitorInterval = expireInterval / 3; + private volatile boolean resetTimerOnStart = true; private final Clock clock; @@ -105,8 +106,8 @@ public synchronized void register(O ob) { register(ob, clock.getTime()); } - public synchronized void register(O ob, long monitorStartTime) { - running.put(ob, monitorStartTime); + public synchronized void register(O ob, long expireTime) { + running.put(ob, expireTime); } public synchronized void unregister(O ob) { @@ -114,12 +115,18 @@ public synchronized void unregister(O ob) { } public synchronized void resetTimer() { - long time = clock.getTime(); - for (O ob : running.keySet()) { - running.put(ob, time); + if (resetTimerOnStart) { + long time = clock.getTime(); + for (O ob : running.keySet()) { + running.put(ob, time); + } } } + protected void setResetTimeOnStart(boolean resetTimeOnStart) { + this.resetTimerOnStart = resetTimeOnStart; + } + private class PingChecker implements Runnable { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java index 8ae3842283..f113bd31c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.util; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; @@ -29,6 +30,8 @@ public class Times { private static final Log LOG = LogFactory.getLog(Times.class); + static final String ISO8601DATEFORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; + // This format should match the one used in yarn.dt.plugins.js static final ThreadLocal dateFormat = new ThreadLocal() { @@ -37,6 +40,14 @@ public class Times { } }; + static final ThreadLocal isoFormat = + new ThreadLocal() { + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat(ISO8601DATEFORMAT); + } + }; + public static long elapsed(long started, long finished) { return Times.elapsed(started, finished, true); } @@ -74,4 +85,26 @@ public static String format(long ts) { return ts > 0 ? String.valueOf(dateFormat.get().format(new Date(ts))) : "N/A"; } + + /** + * Given a time stamp returns ISO-8601 formated string in format + * "yyyy-MM-dd'T'HH:mm:ss.SSSZ". + * @param ts to be formatted in ISO format. + * @return ISO 8601 formatted string. + */ + public static String formatISO8601(long ts) { + return isoFormat.get().format(new Date(ts)); + } + + /** + * Given ISO formatted string with format "yyyy-MM-dd'T'HH:mm:ss.SSSZ", return + * epoch time for local Time zone. + * @param isoString in format of "yyyy-MM-dd'T'HH:mm:ss.SSSZ". + * @return epoch time for local time zone. + * @throws ParseException if given ISO formatted string can not be parsed. + */ + public static long parseISO8601ToLocalTimeInMillis(String isoString) + throws ParseException { + return isoFormat.get().parse(isoString).getTime(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 834ead74e9..019166b0b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3048,9 +3048,9 @@ - The RMAppLifetimeMonitor Service uses this value as lifetime monitor interval + The RMAppLifetimeMonitor Service uses this value as monitor interval - yarn.resourcemanager.application-timeouts.lifetime-monitor.interval-ms + yarn.resourcemanager.application-timeouts.monitor.interval-ms 60000 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index f02e306960..c69313f66a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -91,6 +91,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.AMCommand; @@ -497,4 +499,11 @@ public FailApplicationAttemptResponse failApplicationAttempt( FailApplicationAttemptRequest request) throws YarnException, IOException { throw new NotImplementedException(); } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index e9bd230696..c8af526029 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -108,12 +108,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -1589,37 +1592,11 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( ApplicationId applicationId = request.getApplicationId(); Priority newAppPriority = request.getApplicationPriority(); - UserGroupInformation callerUGI; - try { - callerUGI = UserGroupInformation.getCurrentUser(); - } catch (IOException ie) { - LOG.info("Error getting UGI ", ie); - RMAuditLogger.logFailure("UNKNOWN", AuditConstants.UPDATE_APP_PRIORITY, - "UNKNOWN", "ClientRMService", "Error getting UGI", applicationId); - throw RPCUtil.getRemoteException(ie); - } - RMApp application = this.rmContext.getRMApps().get(applicationId); - if (application == null) { - RMAuditLogger.logFailure(callerUGI.getUserName(), - AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", - "Trying to update priority of an absent application", applicationId); - throw new ApplicationNotFoundException( - "Trying to update priority of an absent application " - + applicationId); - } - - if (!checkAccess(callerUGI, application.getUser(), - ApplicationAccessType.MODIFY_APP, application)) { - RMAuditLogger.logFailure(callerUGI.getShortUserName(), - AuditConstants.UPDATE_APP_PRIORITY, - "User doesn't have permissions to " - + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", - AuditConstants.UNAUTHORIZED_USER, applicationId); - throw RPCUtil.getRemoteException(new AccessControlException("User " - + callerUGI.getShortUserName() + " cannot perform operation " - + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); - } + UserGroupInformation callerUGI = + getCallerUgi(applicationId, AuditConstants.UPDATE_APP_PRIORITY); + RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, + AuditConstants.UPDATE_APP_PRIORITY); UpdateApplicationPriorityResponse response = recordFactory .newRecordInstance(UpdateApplicationPriorityResponse.class); @@ -1724,4 +1701,104 @@ public SignalContainerResponse signalToContainer( .newRecordInstance(SignalContainerResponse.class); } + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + + ApplicationId applicationId = request.getApplicationId(); + Map applicationTimeouts = + request.getApplicationTimeouts(); + + UserGroupInformation callerUGI = + getCallerUgi(applicationId, AuditConstants.UPDATE_APP_TIMEOUTS); + RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, + AuditConstants.UPDATE_APP_TIMEOUTS); + + if (applicationTimeouts.isEmpty()) { + String message = + "At least one ApplicationTimeoutType should be configured" + + " for updating timeouts."; + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", + message, applicationId); + throw RPCUtil.getRemoteException(message); + } + + UpdateApplicationTimeoutsResponse response = recordFactory + .newRecordInstance(UpdateApplicationTimeoutsResponse.class); + + RMAppState state = application.getState(); + if (!EnumSet + .of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.RUNNING) + .contains(state)) { + if (COMPLETED_APP_STATES.contains(state)) { + // If Application is in any of the final states, update timeout + // can be skipped rather throwing exception. + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", + applicationId); + return response; + } + String msg = + "Application is in " + state + " state can not update timeout."; + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", + msg); + throw RPCUtil.getRemoteException(msg); + } + + try { + rmAppManager.updateApplicationTimeout(application, applicationTimeouts); + } catch (YarnException ex) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", + ex.getMessage()); + throw RPCUtil.getRemoteException(ex); + } + + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", applicationId); + return response; + } + + private UserGroupInformation getCallerUgi(ApplicationId applicationId, + String operation) throws YarnException { + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + RMAuditLogger.logFailure("UNKNOWN", operation, "UNKNOWN", + "ClientRMService", "Error getting UGI", applicationId); + throw RPCUtil.getRemoteException(ie); + } + return callerUGI; + } + + private RMApp verifyUserAccessForRMApp(ApplicationId applicationId, + UserGroupInformation callerUGI, String operation) throws YarnException { + RMApp application = this.rmContext.getRMApps().get(applicationId); + if (application == null) { + RMAuditLogger.logFailure(callerUGI.getUserName(), operation, "UNKNOWN", + "ClientRMService", + "Trying to " + operation + " of an absent application", + applicationId); + throw new ApplicationNotFoundException("Trying to " + operation + + " of an absent application " + applicationId); + } + + if (!checkAccess(callerUGI, application.getUser(), + ApplicationAccessType.MODIFY_APP, application)) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), operation, + "User doesn't have permissions to " + + ApplicationAccessType.MODIFY_APP.toString(), + "ClientRMService", AuditConstants.UNAUTHORIZED_USER, applicationId); + throw RPCUtil.getRemoteException(new AccessControlException("User " + + callerUGI.getShortUserName() + " cannot perform operation " + + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); + } + return application; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index c065b60779..7144421a51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -66,6 +67,8 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; /** * This class manages the list of applications for the resource manager. @@ -509,4 +512,38 @@ public void handle(RMAppManagerEvent event) { LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } } + + // transaction method. + public void updateApplicationTimeout(RMApp app, + Map newTimeoutInISO8601Format) + throws YarnException { + ApplicationId applicationId = app.getApplicationId(); + synchronized (applicationId) { + Map newExpireTime = RMServerUtils + .validateISO8601AndConvertToLocalTimeEpoch(newTimeoutInISO8601Format); + + SettableFuture future = SettableFuture.create(); + + Map currentExpireTimeouts = + app.getApplicationTimeouts(); + currentExpireTimeouts.putAll(newExpireTime); + + ApplicationStateData appState = + ApplicationStateData.newInstance(app.getSubmitTime(), + app.getStartTime(), app.getApplicationSubmissionContext(), + app.getUser(), app.getCallerContext()); + appState.setApplicationTimeouts(currentExpireTimeouts); + + // update to state store. Though it synchronous call, update via future to + // know any exception has been set. It is required because in non-HA mode, + // state-store errors are skipped. + this.rmContext.getStateStore() + .updateApplicationStateSynchronously(appState, false, future); + + Futures.get(future, YarnException.class); + + // update in-memory + ((RMAppImpl) app).updateApplicationTimeout(newExpireTime); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index 0361059118..d52e002f86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -63,7 +63,9 @@ public static class AuditConstants { public static final String ALLOC_CONTAINER = "AM Allocated Container"; public static final String RELEASE_CONTAINER = "AM Released Container"; public static final String UPDATE_APP_PRIORITY = - "Update Application Priority Request"; + "Update Application Priority"; + public static final String UPDATE_APP_TIMEOUTS = + "Update Application Timeouts"; public static final String CHANGE_CONTAINER_RESOURCE = "AM Changed Container Resource"; public static final String SIGNAL_CONTAINER = "Signal Container Request"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index b2a085a479..7e31e70bdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.text.ParseException; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; @@ -69,6 +70,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -89,6 +93,8 @@ public class RMServerUtils { protected static final RecordFactory RECORD_FACTORY = RecordFactoryProvider.getRecordFactory(null); + private static Clock clock = SystemClock.getInstance(); + public static List queryRMNodes(RMContext context, EnumSet acceptedStates) { // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING. @@ -398,6 +404,7 @@ public static YarnApplicationState createApplicationState( case FINISHING: case FINISHED: return YarnApplicationState.FINISHED; + case KILLING: case KILLED: return YarnApplicationState.KILLED; case FAILED: @@ -475,7 +482,7 @@ public static void validateApplicationTimeouts( if (timeouts != null) { for (Map.Entry timeout : timeouts .entrySet()) { - if (timeout.getValue() < 0) { + if (timeout.getValue() <= 0) { String message = "Invalid application timeout, value=" + timeout.getValue() + " for type=" + timeout.getKey(); throw new YarnException(message); @@ -483,4 +490,43 @@ public static void validateApplicationTimeouts( } } } + + /** + * Validate ISO8601 format with epoch time. + * @param timeoutsInISO8601 format + * @return expire time in local epoch + * @throws YarnException if given application timeout value is lesser than + * current time. + */ + public static Map validateISO8601AndConvertToLocalTimeEpoch( + Map timeoutsInISO8601) + throws YarnException { + long currentTimeMillis = clock.getTime(); + Map newApplicationTimeout = + new HashMap(); + if (timeoutsInISO8601 != null) { + for (Map.Entry timeout : timeoutsInISO8601 + .entrySet()) { + long expireTime = 0L; + try { + expireTime = + Times.parseISO8601ToLocalTimeInMillis(timeout.getValue()); + } catch (ParseException ex) { + String message = + "Expire time is not in ISO8601 format. ISO8601 supported " + + "format is yyyy-MM-dd'T'HH:mm:ss.SSSZ"; + throw new YarnException(message); + } + if (expireTime < currentTimeMillis) { + String message = + "Expire time is less than current time, current-time=" + + Times.formatISO8601(currentTimeMillis) + " expire-time=" + + Times.formatISO8601(expireTime); + throw new YarnException(message); + } + newApplicationTimeout.put(timeout.getKey(), expireTime); + } + } + return newApplicationTimeout; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index de273c492f..d1f8b40ec1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -31,13 +31,14 @@ import javax.crypto.SecretKey; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.SettableFuture; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.AbstractService; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; @@ -237,6 +239,8 @@ public RMStateStoreState transition(RMStateStore store, boolean isFenced = false; ApplicationStateData appState = ((RMStateUpdateAppEvent) event).getAppState(); + SettableFuture result = + ((RMStateUpdateAppEvent) event).getResult(); ApplicationId appId = appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Updating info for app: " + appId); @@ -246,9 +250,18 @@ public RMStateStoreState transition(RMStateStore store, store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED)); } + + if (result != null) { + result.set(null); + } + } catch (Exception e) { - LOG.error("Error updating app: " + appId, e); + String msg = "Error updating app: " + appId; + LOG.error(msg, e); isFenced = store.notifyStoreOperationFailedInternal(e); + if (result != null) { + result.setException(new YarnException(msg, e)); + } } return finalState(isFenced); }; @@ -774,18 +787,19 @@ public void storeNewApplication(RMApp app) { ApplicationStateData appState = ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), context, app.getUser(), app.getCallerContext()); + appState.setApplicationTimeouts(app.getApplicationTimeouts()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } @SuppressWarnings("unchecked") - public void updateApplicationState( - ApplicationStateData appState) { + public void updateApplicationState(ApplicationStateData appState) { dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); } - public void updateApplicationStateSynchronously( - ApplicationStateData appState, boolean notifyApp) { - handleStoreEvent(new RMStateUpdateAppEvent(appState, notifyApp)); + public void updateApplicationStateSynchronously(ApplicationStateData appState, + boolean notifyApp, SettableFuture resultFuture) { + handleStoreEvent( + new RMStateUpdateAppEvent(appState, notifyApp, resultFuture)); } public void updateFencedState() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java index 69169dd92b..0a6220bb67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java @@ -20,21 +20,28 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import com.google.common.util.concurrent.SettableFuture; + public class RMStateUpdateAppEvent extends RMStateStoreEvent { private final ApplicationStateData appState; // After application state is updated in state store, // should notify back to application or not private boolean notifyApplication; + private SettableFuture future; public RMStateUpdateAppEvent(ApplicationStateData appState) { super(RMStateStoreEventType.UPDATE_APP); this.appState = appState; this.notifyApplication = true; + this.future = null; } - public RMStateUpdateAppEvent(ApplicationStateData appState, boolean notifyApp) { - this(appState); + public RMStateUpdateAppEvent(ApplicationStateData appState, boolean notifyApp, + SettableFuture future) { + super(RMStateStoreEventType.UPDATE_APP); + this.appState = appState; this.notifyApplication = notifyApp; + this.future = future; } public ApplicationStateData getAppState() { @@ -44,4 +51,8 @@ public ApplicationStateData getAppState() { public boolean isNotifyApplication() { return notifyApplication; } + + public SettableFuture getResult() { + return future; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 234838084f..79a5de2584 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.Records; @@ -59,6 +60,25 @@ public static ApplicationStateData newInstance(long submitTime, return appState; } + public static ApplicationStateData newInstance(long submitTime, + long startTime, String user, + ApplicationSubmissionContext submissionContext, RMAppState state, + String diagnostics, long finishTime, CallerContext callerContext, + Map applicationTimeouts) { + ApplicationStateData appState = + Records.newRecord(ApplicationStateData.class); + appState.setSubmitTime(submitTime); + appState.setStartTime(startTime); + appState.setUser(user); + appState.setApplicationSubmissionContext(submissionContext); + appState.setState(state); + appState.setDiagnostics(diagnostics); + appState.setFinishTime(finishTime); + appState.setCallerContext(callerContext); + appState.setApplicationTimeouts(applicationTimeouts); + return appState; + } + public static ApplicationStateData newInstance(long submitTime, long startTime, ApplicationSubmissionContext context, String user, CallerContext callerContext) { @@ -168,4 +188,11 @@ public abstract void setApplicationSubmissionContext( public abstract CallerContext getCallerContext(); public abstract void setCallerContext(CallerContext callerContext); + + @Public + public abstract Map getApplicationTimeouts(); + + @Public + public abstract void setApplicationTimeouts( + Map applicationTimeouts); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index 15ed770840..d037e68a12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -18,10 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutMapProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMAppStateProto; @@ -38,6 +46,7 @@ public class ApplicationStateDataPBImpl extends ApplicationStateData { boolean viaProto = false; private ApplicationSubmissionContext applicationSubmissionContext = null; + private Map applicationTimeouts = null; public ApplicationStateDataPBImpl() { builder = ApplicationStateDataProto.newBuilder(); @@ -63,6 +72,10 @@ private void mergeLocalToBuilder() { ((ApplicationSubmissionContextPBImpl)applicationSubmissionContext) .getProto()); } + + if (this.applicationTimeouts != null) { + addApplicationTimeouts(); + } } private void mergeLocalToProto() { @@ -256,4 +269,77 @@ public static RMAppStateProto convertToProtoFormat(RMAppState e) { public static RMAppState convertFromProtoFormat(RMAppStateProto e) { return RMAppState.valueOf(e.name().replace(RM_APP_PREFIX, "")); } + + @Override + public Map getApplicationTimeouts() { + initApplicationTimeout(); + return this.applicationTimeouts; + } + + private void initApplicationTimeout() { + if (this.applicationTimeouts != null) { + return; + } + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + List lists = p.getApplicationTimeoutsList(); + this.applicationTimeouts = + new HashMap(lists.size()); + for (ApplicationTimeoutMapProto timeoutProto : lists) { + this.applicationTimeouts.put( + ProtoUtils + .convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()), + timeoutProto.getTimeout()); + } + } + + @Override + public void setApplicationTimeouts( + Map appTimeouts) { + if (appTimeouts == null) { + return; + } + initApplicationTimeout(); + this.applicationTimeouts.clear(); + this.applicationTimeouts.putAll(appTimeouts); + } + + private void addApplicationTimeouts() { + maybeInitBuilder(); + builder.clearApplicationTimeouts(); + if (applicationTimeouts == null) { + return; + } + Iterable values = + new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iterator = + applicationTimeouts.keySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ApplicationTimeoutMapProto next() { + ApplicationTimeoutType key = iterator.next(); + return ApplicationTimeoutMapProto.newBuilder() + .setTimeout(applicationTimeouts.get(key)) + .setApplicationTimeoutType( + ProtoUtils.convertToProtoFormat(key)) + .build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationTimeouts(values); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 98cbd92180..cd08743625 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -280,4 +281,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, String getAppNodeLabelExpression(); CallerContext getCallerContext(); + + Map getApplicationTimeouts(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 0fdc3113fa..74e641d6bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -121,6 +121,9 @@ public class RMAppImpl implements RMApp, Recoverable { private static final Log LOG = LogFactory.getLog(RMAppImpl.class); private static final String UNAVAILABLE = "N/A"; + private static final EnumSet COMPLETED_APP_STATES = + EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED, + RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING); // Immutable fields private final ApplicationId applicationId; @@ -179,6 +182,8 @@ public class RMAppImpl implements RMApp, Recoverable { private Map> logAggregationFailureMessagesForNMs = new HashMap>(); private final int maxLogAggregationDiagnosticsInMemory; + private Map applicationTimeouts = + new HashMap(); // These states stored are only valid when app is at killing or final_saving. private RMAppState stateBeforeKilling; @@ -897,6 +902,7 @@ public void recover(RMState state) { this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); this.callerContext = appState.getCallerContext(); + this.applicationTimeouts = appState.getApplicationTimeouts(); // If interval > 0, some attempts might have been deleted. if (this.attemptFailuresValidityInterval > 0) { this.firstAttemptIdInStateStore = appState.getFirstAttemptId(); @@ -1109,17 +1115,16 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } } - long applicationLifetime = - app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); - if (applicationLifetime > 0) { + for (Map.Entry timeout : + app.applicationTimeouts.entrySet()) { app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, - ApplicationTimeoutType.LIFETIME, app.submitTime, - applicationLifetime * 1000); + timeout.getKey(), timeout.getValue()); if (LOG.isDebugEnabled()) { + long remainingTime = timeout.getValue() - app.systemClock.getTime(); LOG.debug("Application " + app.applicationId - + " is registered for timeout monitor, type=" - + ApplicationTimeoutType.LIFETIME + " value=" - + applicationLifetime + " seconds"); + + " is registered for timeout monitor, type=" + timeout.getKey() + + " remaining timeout=" + + (remainingTime > 0 ? remainingTime / 1000 : 0) + " seconds"); } } @@ -1235,10 +1240,17 @@ public void transition(RMAppImpl app, RMAppEvent event) { long applicationLifetime = app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); if (applicationLifetime > 0) { + // calculate next timeout value + Long newTimeout = + Long.valueOf(app.submitTime + (applicationLifetime * 1000)); app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, - ApplicationTimeoutType.LIFETIME, app.submitTime, - applicationLifetime * 1000); - LOG.debug("Application " + app.applicationId + ApplicationTimeoutType.LIFETIME, newTimeout); + + // update applicationTimeouts with new absolute value. + app.applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, + newTimeout); + + LOG.info("Application " + app.applicationId + " is registered for timeout monitor, type=" + ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime + " seconds"); @@ -1292,6 +1304,7 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event, ApplicationStateData.newInstance(this.submitTime, this.startTime, this.user, this.submissionContext, stateToBeStored, diags, this.storedFinishTime, this.callerContext); + appState.setApplicationTimeouts(this.applicationTimeouts); this.rmContext.getStateStore().updateApplicationState(appState); } @@ -1967,4 +1980,31 @@ private long getApplicationLifetime(ApplicationTimeoutType type) { } return applicationLifetime; } -} + + @Override + public Map getApplicationTimeouts() { + this.readLock.lock(); + try { + return new HashMap(this.applicationTimeouts); + } finally { + this.readLock.unlock(); + } + } + + public void updateApplicationTimeout( + Map updateTimeout) { + this.writeLock.lock(); + try { + if (COMPLETED_APP_STATES.contains(getState())) { + return; + } + // update monitoring service + this.rmContext.getRMAppLifetimeMonitor() + .updateApplicationTimeouts(getApplicationId(), updateTimeout); + this.applicationTimeouts.putAll(updateTimeout); + + } finally { + this.writeLock.unlock(); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java index e550c972ae..d194204c92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java @@ -18,9 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor; -import java.util.EnumSet; -import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.commons.logging.Log; @@ -33,7 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; import org.apache.hadoop.yarn.util.SystemClock; @@ -47,12 +45,6 @@ public class RMAppLifetimeMonitor private static final Log LOG = LogFactory.getLog(RMAppLifetimeMonitor.class); private RMContext rmContext; - private Map monitoredApps = - new HashMap(); - - private static final EnumSet COMPLETED_APP_STATES = - EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED, - RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING); public RMAppLifetimeMonitor(RMContext rmContext) { super(RMAppLifetimeMonitor.class.getName(), SystemClock.getInstance()); @@ -61,14 +53,16 @@ public RMAppLifetimeMonitor(RMContext rmContext) { @Override protected void serviceInit(Configuration conf) throws Exception { - long monitorInterval = conf.getLong( - YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS, - YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS); + long monitorInterval = + conf.getLong(YarnConfiguration.RM_APPLICATION_MONITOR_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS); if (monitorInterval <= 0) { monitorInterval = - YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS; + YarnConfiguration.DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS; } setMonitorInterval(monitorInterval); + setExpireInterval(0); // No need of expire interval for App. + setResetTimeOnStart(false); // do not reset expire time on restart LOG.info("Application lifelime monitor interval set to " + monitorInterval + " ms."); super.serviceInit(conf); @@ -77,54 +71,42 @@ protected void serviceInit(Configuration conf) throws Exception { @SuppressWarnings("unchecked") @Override protected synchronized void expire(RMAppToMonitor monitoredAppKey) { - Long remove = monitoredApps.remove(monitoredAppKey); ApplicationId appId = monitoredAppKey.getApplicationId(); RMApp app = rmContext.getRMApps().get(appId); if (app == null) { return; } - // Don't trigger a KILL event if application is in completed states - if (!COMPLETED_APP_STATES.contains(app.getState())) { - String diagnostics = - "Application killed due to exceeding its lifetime period " + remove - + " milliseconds"; - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics)); - } else { - LOG.info("Application " + appId - + " is about to complete. So not killing the application."); - } + String diagnostics = + "Application killed due to exceeding its lifetime period"; + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics)); } - public synchronized void registerApp(ApplicationId appId, - ApplicationTimeoutType timeoutType, long monitorStartTime, long timeout) { + public void registerApp(ApplicationId appId, + ApplicationTimeoutType timeoutType, long expireTime) { RMAppToMonitor appToMonitor = new RMAppToMonitor(appId, timeoutType); - register(appToMonitor, monitorStartTime); - monitoredApps.putIfAbsent(appToMonitor, timeout); + register(appToMonitor, expireTime); } - @Override - protected synchronized long getExpireInterval( - RMAppToMonitor monitoredAppKey) { - return monitoredApps.get(monitoredAppKey); - } - - public synchronized void unregisterApp(ApplicationId appId, + public void unregisterApp(ApplicationId appId, ApplicationTimeoutType timeoutType) { - RMAppToMonitor appToRemove = new RMAppToMonitor(appId, timeoutType); - unregister(appToRemove); - monitoredApps.remove(appToRemove); + RMAppToMonitor remove = new RMAppToMonitor(appId, timeoutType); + unregister(remove); } - public synchronized void unregisterApp(ApplicationId appId, - Set types) { - for (ApplicationTimeoutType type : types) { - unregisterApp(appId, type); + public void unregisterApp(ApplicationId appId, + Set timeoutTypes) { + for (ApplicationTimeoutType timeoutType : timeoutTypes) { + unregisterApp(appId, timeoutType); } } - public synchronized void updateApplicationTimeouts(ApplicationId appId, + public void updateApplicationTimeouts(ApplicationId appId, Map timeouts) { - // TODO in YARN-5611 + for (Entry entry : timeouts.entrySet()) { + ApplicationTimeoutType timeoutType = entry.getKey(); + RMAppToMonitor update = new RMAppToMonitor(appId, timeoutType); + register(update, entry.getValue()); + } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 7e98f10bb9..af51f3c862 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2418,8 +2418,9 @@ public void updateApplicationPriority(Priority newPriority, ApplicationStateData.newInstance(rmApp.getSubmitTime(), rmApp.getStartTime(), rmApp.getApplicationSubmissionContext(), rmApp.getUser(), rmApp.getCallerContext()); + appState.setApplicationTimeouts(rmApp.getApplicationTimeouts()); rmContext.getStateStore().updateApplicationStateSynchronously(appState, - false); + false, null); // As we use iterator over a TreeSet for OrderingPolicy, once we change // priority then reinsert back to make order correct. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto index 6e2398a560..4693818eca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto @@ -69,6 +69,7 @@ message ApplicationStateDataProto { optional string diagnostics = 6 [default = "N/A"]; optional int64 finish_time = 7; optional hadoop.common.RPCCallerContextProto caller_context = 8; + repeated ApplicationTimeoutMapProto application_timeouts = 9; } message ApplicationAttemptStateDataProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 19ee0b17c4..e5b166d737 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -229,6 +230,11 @@ public String getAppNodeLabelExpression() { public CallerContext getCallerContext() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Map getApplicationTimeouts() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 62a5c5282a..bbfa60fc19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -315,4 +316,9 @@ public void removeCollectorAddr() { public void setCollectorAddr(String collectorAddr) { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Map getApplicationTimeouts() { + throw new UnsupportedOperationException("Not supported yet."); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java index 3f2db1dd77..e803a8899c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.Arrays; @@ -27,11 +28,14 @@ import java.util.Set; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -39,8 +43,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart; import org.apache.hadoop.yarn.server.resourcemanager.TestWorkPreservingRMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.util.Times; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -60,15 +66,11 @@ public void setup() throws IOException { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); UserGroupInformation.setConfiguration(conf); - conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); - conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, - true); - conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); - conf.setLong(YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS, + conf.setLong(YarnConfiguration.RM_APPLICATION_MONITOR_INTERVAL_MS, 3000L); } - @Test(timeout = 90000) + @Test(timeout = 60000) public void testApplicationLifetimeMonitor() throws Exception { MockRM rm = null; try { @@ -81,22 +83,64 @@ public void testApplicationLifetimeMonitor() throws Exception { new HashMap(); timeouts.put(ApplicationTimeoutType.LIFETIME, 10L); RMApp app1 = rm.submitApp(1024, appPriority, timeouts); + + // 20L seconds + timeouts.put(ApplicationTimeoutType.LIFETIME, 20L); + RMApp app2 = rm.submitApp(1024, appPriority, timeouts); + nm1.nodeHeartbeat(true); // Send launch Event MockAM am1 = rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId()); am1.registerAppAttempt(); rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); - Assert.assertTrue("Applicaiton killed before lifetime value", + Assert.assertTrue("Application killed before lifetime value", (System.currentTimeMillis() - app1.getSubmitTime()) > 10000); + + Map updateTimeout = + new HashMap(); + long newLifetime = 10L; + // update 10L seconds more to timeout + updateTimeout.put(ApplicationTimeoutType.LIFETIME, + Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000)); + UpdateApplicationTimeoutsRequest request = + UpdateApplicationTimeoutsRequest.newInstance(app2.getApplicationId(), + updateTimeout); + + Map applicationTimeouts = + app2.getApplicationTimeouts(); + // has old timeout time + long beforeUpdate = + applicationTimeouts.get(ApplicationTimeoutType.LIFETIME); + + // update app2 lifetime to new time i.e now + timeout + rm.getRMContext().getClientRMService().updateApplicationTimeouts(request); + + applicationTimeouts = + app2.getApplicationTimeouts(); + long afterUpdate = + applicationTimeouts.get(ApplicationTimeoutType.LIFETIME); + + Assert.assertTrue("Application lifetime value not updated", + afterUpdate > beforeUpdate); + + rm.waitForState(app2.getApplicationId(), RMAppState.KILLED); + // verify for app killed with updated lifetime + Assert.assertTrue("Application killed before lifetime value", + app2.getFinishTime() > afterUpdate); + } finally { stopRM(rm); } } - @SuppressWarnings("rawtypes") @Test(timeout = 180000) public void testApplicationLifetimeOnRMRestart() throws Exception { + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); MockRM rm1 = new MockRM(conf, memStore); @@ -115,6 +159,12 @@ public void testApplicationLifetimeOnRMRestart() throws Exception { // Re-start RM MockRM rm2 = new MockRM(conf, memStore); + + // make sure app has been unregistered with old RM else both will trigger + // Expire event + rm1.getRMContext().getRMAppLifetimeMonitor().unregisterApp( + app1.getApplicationId(), ApplicationTimeoutType.LIFETIME); + rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); @@ -152,9 +202,87 @@ public void testApplicationLifetimeOnRMRestart() throws Exception { // wait for app life time and application to be in killed state. rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.KILLED); - Assert.assertTrue("Applicaiton killed before lifetime value", - (System.currentTimeMillis() - - recoveredApp1.getSubmitTime()) > appLifetime); + Assert.assertTrue("Application killed before lifetime value", + recoveredApp1.getFinishTime() > (recoveredApp1.getSubmitTime() + + appLifetime * 1000)); + } + + @Test(timeout = 60000) + public void testUpdateApplicationTimeoutForStateStoreUpdateFail() + throws Exception { + MockRM rm1 = null; + try { + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + + MemoryRMStateStore memStore = new MemoryRMStateStore() { + private int count = 0; + + @Override + public synchronized void updateApplicationStateInternal( + ApplicationId appId, ApplicationStateData appState) + throws Exception { + // fail only 1 time. + if (count++ == 0) { + throw new Exception("State-store update failed"); + } + super.updateApplicationStateInternal(appId, appState); + } + }; + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); + + long appLifetime = 30L; + Map timeouts = + new HashMap(); + timeouts.put(ApplicationTimeoutType.LIFETIME, appLifetime); + RMApp app1 = rm1.submitApp(200, Priority.newInstance(0), timeouts); + + Map updateTimeout = + new HashMap(); + long newLifetime = 10L; + // update 10L seconds more to timeout i.e 30L seconds overall + updateTimeout.put(ApplicationTimeoutType.LIFETIME, + Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000)); + UpdateApplicationTimeoutsRequest request = + UpdateApplicationTimeoutsRequest.newInstance(app1.getApplicationId(), + updateTimeout); + + Map applicationTimeouts = + app1.getApplicationTimeouts(); + // has old timeout time + long beforeUpdate = + applicationTimeouts.get(ApplicationTimeoutType.LIFETIME); + + try { + // update app2 lifetime to new time i.e now + timeout + rm1.getRMContext().getClientRMService() + .updateApplicationTimeouts(request); + fail("Update application should fail."); + } catch (YarnException e) { + // expected + assertTrue("State-store exception does not containe appId", + e.getMessage().contains(app1.getApplicationId().toString())); + } + + applicationTimeouts = app1.getApplicationTimeouts(); + // has old timeout time + long afterUpdate = + applicationTimeouts.get(ApplicationTimeoutType.LIFETIME); + + Assert.assertEquals("Application timeout is updated", beforeUpdate, + afterUpdate); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + // verify for app killed with updated lifetime + Assert.assertTrue("Application killed before lifetime value", + app1.getFinishTime() > afterUpdate); + } finally { + stopRM(rm1); + } } private void stopRM(MockRM rm) {