From 8f08532bde153811368e1b8336446fba4743f9d2 Mon Sep 17 00:00:00 2001 From: Xuan Date: Fri, 2 Oct 2015 18:50:47 -0700 Subject: [PATCH] YARN-1897. CLI and core support for signal container functionality. Contributed by Ming Ma --- .../hadoop/mapred/ResourceMgrDelegate.java | 7 + .../hadoop/mapred/TestClientRedirect.java | 8 + hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/api/ApplicationClientProtocol.java | 30 ++++ .../SignalContainerRequest.java | 78 ++++++++ .../SignalContainerResponse.java | 36 ++++ .../api/records/SignalContainerCommand.java | 45 +++++ .../proto/applicationclient_protocol.proto | 1 + .../src/main/proto/yarn_protos.proto | 7 + .../src/main/proto/yarn_service_protos.proto | 8 + .../hadoop/yarn/client/api/YarnClient.java | 17 +- .../yarn/client/api/impl/YarnClientImpl.java | 13 +- .../yarn/client/cli/ApplicationCLI.java | 35 ++++ .../hadoop/yarn/client/cli/YarnCLI.java | 1 + .../yarn/client/api/impl/TestYarnClient.java | 25 +++ .../hadoop/yarn/client/cli/TestYarnCLI.java | 30 +++- ...ApplicationClientProtocolPBClientImpl.java | 18 ++ ...pplicationClientProtocolPBServiceImpl.java | 19 ++ .../impl/pb/SignalContainerRequestPBImpl.java | 169 ++++++++++++++++++ .../pb/SignalContainerResponsePBImpl.java | 44 +++++ .../NodeHeartbeatResponse.java | 3 + .../impl/pb/NodeHeartbeatResponsePBImpl.java | 78 +++++++- .../yarn_server_common_service_protos.proto | 2 + .../CMgrSignalContainersEvent.java | 37 ++++ .../ContainerManagerEventType.java | 3 +- .../nodemanager/NodeStatusUpdaterImpl.java | 11 ++ .../ContainerManagerImpl.java | 20 +++ .../launcher/ContainerLaunch.java | 95 ++++++++++ .../launcher/ContainersLauncher.java | 18 +- .../launcher/ContainersLauncherEventType.java | 1 + .../SignalContainersLauncherEvent.java | 38 ++++ .../TestContainerManagerWithLCE.java | 36 ++++ .../nodemanager/TestNodeStatusUpdater.java | 109 ++++++++++- .../amrmproxy/MockResourceManagerFacade.java | 8 + .../BaseContainerManagerTest.java | 4 +- .../TestContainerManager.java | 111 +++++++++++- .../resourcemanager/ClientRMService.java | 69 ++++++- .../server/resourcemanager/RMAuditLogger.java | 1 + .../rmnode/RMNodeEventType.java | 3 + .../resourcemanager/rmnode/RMNodeImpl.java | 21 +++ .../rmnode/RMNodeSignalContainerEvent.java | 38 ++++ .../yarn/server/resourcemanager/MockRM.java | 10 ++ .../resourcemanager/TestSignalContainer.java | 113 ++++++++++++ 43 files changed, 1403 insertions(+), 20 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index 91c308615e..29266d5e77 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.ClientRMProxy; @@ -473,4 +474,10 @@ public void updateApplicationPriority(ApplicationId applicationId, Priority priority) throws YarnException, IOException { client.updateApplicationPriority(applicationId, priority); } + + @Override + public void signalContainer(ContainerId containerId, SignalContainerCommand command) + throws YarnException, IOException { + client.signalContainer(containerId, command); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index 1bf14080c6..8febec6fcb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -112,6 +112,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; @@ -453,6 +455,12 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( IOException { return null; } + + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws IOException { + return null; + } } class HistoryService extends AMService implements HSClientProtocol { diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9e9522c4ee..bd38c2d2d5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -222,6 +222,9 @@ Release 2.8.0 - UNRELEASED YARN-1651. CapacityScheduler side changes to support container resize. (Wangda Tan via jianhe) + + YARN-1897. CLI and core support for signal container functionality. + (Ming Ma via xgong) IMPROVEMENTS diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 08fc289d74..bcd3ef6d64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -53,6 +53,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -437,4 +439,32 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels( public UpdateApplicationPriorityResponse updateApplicationPriority( UpdateApplicationPriorityRequest request) throws YarnException, IOException; + + /** + *

The interface used by clients to request the + * ResourceManager to signal a container. For example, + * the client can send command OUTPUT_THREAD_DUMP to dump threads of the + * container.

+ * + *

The client, via {@link SignalContainerRequest} provides the + * id of the container and the signal command.

+ * + *

In secure mode,the ResourceManager verifies access to the + * application before signaling the container. + * The user needs to have MODIFY_APP permission.

+ * + *

Currently, the ResourceManager returns an empty response + * on success and throws an exception on rejecting the request.

+ * + * @param request request to signal a container + * @return ResourceManager returns an empty response + * on success and throws an exception on rejecting the request + * @throws YarnException + * @throws IOException + */ + @Public + @Unstable + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException, + IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java new file mode 100644 index 0000000000..2a3861a773 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.util.Records; + +/** + *

The request sent by the client to the ResourceManager + * or by the ApplicationMaster to the NodeManager + * to signal a container. + * @see SignalContainerCommand

+ */ +@Public +@Evolving +public abstract class SignalContainerRequest { + + @Public + @Unstable + public static SignalContainerRequest newInstance(ContainerId containerId, + SignalContainerCommand signalContainerCommand) { + SignalContainerRequest request = + Records.newRecord(SignalContainerRequest.class); + request.setContainerId(containerId); + request.setCommand(signalContainerCommand); + return request; + } + + /** + * Get the ContainerId of the container to signal. + * @return ContainerId of the container to signal. + */ + @Public + @Unstable + public abstract ContainerId getContainerId(); + + /** + * Set the ContainerId of the container to signal. + */ + @Public + @Unstable + public abstract void setContainerId(ContainerId containerId); + + /** + * Get the SignalContainerCommand of the signal request. + * @return SignalContainerCommand of the signal request. + */ + @Public + @Unstable + public abstract SignalContainerCommand getCommand(); + + /** + * Set the SignalContainerCommand of the signal request. + */ + @Public + @Unstable + public abstract void setCommand(SignalContainerCommand command); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java new file mode 100644 index 0000000000..0d773b91ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; + +/** + *

The response sent by the ResourceManager to the client + * signalling a container.

+ * + *

Currently it's empty.

+ * + * @see ApplicationClientProtocol#signalContainer(SignalContainerRequest) + */ +@Public +@Evolving +public abstract class SignalContainerResponse { +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java new file mode 100644 index 0000000000..7be6dbdbf6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java @@ -0,0 +1,45 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Enumeration of various signal container commands. + */ +@Public +@Evolving +public enum SignalContainerCommand { + /** + * Used to capture thread dump. + * On Linux, it is equivalent to SIGQUIT. + */ + OUTPUT_THREAD_DUMP, + + /** Gracefully shutdown a container. + * On Linux, it is equivalent to SIGTERM. + */ + GRACEFUL_SHUTDOWN, + + /** Forcefully shutdown a container. + * On Linux, it is equivalent to SIGKILL. + */ + FORCEFUL_SHUTDOWN, +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto index 117c9301bc..a2ab9c2e65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto @@ -56,4 +56,5 @@ service ApplicationClientProtocolService { rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto); rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto); + rpc signalContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 057aeee1ca..687ee89647 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -417,6 +417,13 @@ message QueueUserACLInfoProto { repeated QueueACLProto userAcls = 2; } +enum SignalContainerCommandProto { + OUTPUT_THREAD_DUMP = 1; + GRACEFUL_SHUTDOWN = 2; + FORCEFUL_SHUTDOWN = 3; +} + + //////////////////////////////////////////////////////////////////////// ////// From reservation_protocol ///////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index ff5a12787a..15e99f91ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -225,6 +225,14 @@ message UpdateApplicationPriorityRequestProto { message UpdateApplicationPriorityResponseProto { } +message SignalContainerRequestProto { + required ContainerIdProto container_id = 1; + required SignalContainerCommandProto command = 2; +} + +message SignalContainerResponseProto { +} + ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// ////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index ff90da1a05..7f6a9fcb2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -38,8 +38,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -56,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -683,4 +682,18 @@ public abstract List getClusterNodeLabels() @Unstable public abstract void updateApplicationPriority(ApplicationId applicationId, Priority priority) throws YarnException, IOException; + + /** + *

+ * Signal a container identified by given ID. + *

+ * + * @param containerId + * {@link ContainerId} of the container that needs to be signaled + * @param command the signal container command + * @throws YarnException + * @throws IOException + */ + public abstract void signalContainer(ContainerId containerId, + SignalContainerCommand command) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index bc97a12da5..2bc6143a4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -76,9 +76,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -94,6 +94,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -826,4 +827,14 @@ public void updateApplicationPriority(ApplicationId applicationId, UpdateApplicationPriorityRequest.newInstance(applicationId, priority); rmClient.updateApplicationPriority(request); } + + @Override + public void signalContainer(ContainerId containerId, + SignalContainerCommand command) + throws YarnException, IOException { + LOG.info("Signalling container " + containerId + " with command " + command); + SignalContainerRequest request = + SignalContainerRequest.newInstance(containerId, command); + rmClient.signalContainer(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 55692f1b12..be89ce28e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -42,8 +42,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; @@ -148,6 +150,12 @@ public int run(String[] args) throws Exception { opts.addOption(HELP_CMD, false, "Displays help for all commands."); opts.getOption(STATUS_CMD).setArgName("Container ID"); opts.getOption(LIST_CMD).setArgName("Application Attempt ID"); + opts.addOption(SIGNAL_CMD, true, + "Signal the container. The available signal commands are " + + java.util.Arrays.asList(SignalContainerCommand.values()) + + " Default command is OUTPUT_THREAD_DUMP."); + opts.getOption(SIGNAL_CMD).setArgName("container ID [signal command]"); + opts.getOption(SIGNAL_CMD).setArgs(3); } int exitCode = -1; @@ -254,6 +262,19 @@ public int run(String[] args) throws Exception { } updateApplicationPriority(cliParser.getOptionValue(APP_ID), cliParser.getOptionValue(UPDATE_PRIORITY)); + } else if (cliParser.hasOption(SIGNAL_CMD)) { + if (args.length < 3 || args.length > 4) { + printUsage(title, opts); + return exitCode; + } + final String[] signalArgs = cliParser.getOptionValues(SIGNAL_CMD); + final String containerId = signalArgs[0]; + SignalContainerCommand command = + SignalContainerCommand.OUTPUT_THREAD_DUMP; + if (signalArgs.length == 2) { + command = SignalContainerCommand.valueOf(signalArgs[1]); + } + signalContainer(containerId, command); } else { syserr.println("Invalid Command Usage : "); printUsage(title, opts); @@ -261,6 +282,20 @@ public int run(String[] args) throws Exception { return 0; } + /** + * Signals the containerId + * + * @param containerIdStr the container id + * @param command the signal command + * @throws YarnException + */ + private void signalContainer(String containerIdStr, + SignalContainerCommand command) throws YarnException, IOException { + ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); + sysout.println("Signalling container " + containerIdStr); + client.signalContainer(containerId, command); + } + /** * It prints the usage of the command * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java index 26349fa81a..a0c0148753 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java @@ -35,6 +35,7 @@ public abstract class YarnCLI extends Configured implements Tool { public static final String KILL_CMD = "kill"; public static final String MOVE_TO_QUEUE_CMD = "movetoqueue"; public static final String HELP_CMD = "help"; + public static final String SIGNAL_CMD = "signal"; protected PrintStream sysout; protected PrintStream syserr; protected YarnClient client; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index e584cf9eff..5c2f23f693 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; @@ -97,6 +98,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.AHSClient; @@ -125,6 +127,7 @@ import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; public class TestYarnClient { @@ -1299,4 +1302,26 @@ public void testShouldNotRetryForeverForNonNetworkExceptions() throws Exception } } } + + @Test + public void testSignalContainer() throws Exception { + Configuration conf = new Configuration(); + @SuppressWarnings("resource") + final YarnClient client = new MockYarnClient(); + client.init(conf); + client.start(); + ApplicationId applicationId = ApplicationId.newInstance(1234, 5); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + applicationId, 1); + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); + SignalContainerCommand command = SignalContainerCommand.OUTPUT_THREAD_DUMP; + client.signalContainer(containerId, command); + final ArgumentCaptor signalReqCaptor = + ArgumentCaptor.forClass(SignalContainerRequest.class); + verify(((MockYarnClient) client).getRMClient()) + .signalContainer(signalReqCaptor.capture()); + SignalContainerRequest request = signalReqCaptor.getValue(); + Assert.assertEquals(containerId, request.getContainerId()); + Assert.assertEquals(command, request.getCommand()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index de50467fe5..b72fd2d62d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.io.PrintStream; import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -40,6 +41,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.regex.Pattern; import org.apache.commons.cli.Options; import org.apache.commons.lang.time.DateFormatUtils; @@ -61,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -82,6 +85,8 @@ public class TestYarnCLI { private PrintStream sysOut; ByteArrayOutputStream sysErrStream; private PrintStream sysErr; + private static final Pattern SPACES_PATTERN = + Pattern.compile("\\s+|\\n+|\\t+"); @Before public void setup() { @@ -785,7 +790,7 @@ public void testContainersHelpCommand() throws Exception { Assert.assertTrue(result == 0); verify(spyCli).printUsage(any(String.class), any(Options.class)); Assert.assertEquals(createContainerCLIHelpMessage(), - sysOutStream.toString()); + normalize(sysOutStream.toString())); sysOutStream.reset(); ApplicationId applicationId = ApplicationId.newInstance(1234, 5); @@ -795,7 +800,7 @@ public void testContainersHelpCommand() throws Exception { new String[] {"container", "-list", appAttemptId.toString(), "args" }); verify(spyCli).printUsage(any(String.class), any(Options.class)); Assert.assertEquals(createContainerCLIHelpMessage(), - sysOutStream.toString()); + normalize(sysOutStream.toString())); sysOutStream.reset(); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 7); @@ -803,7 +808,7 @@ public void testContainersHelpCommand() throws Exception { new String[] { "container", "-status", containerId.toString(), "args" }); verify(spyCli).printUsage(any(String.class), any(Options.class)); Assert.assertEquals(createContainerCLIHelpMessage(), - sysOutStream.toString()); + normalize(sysOutStream.toString())); } @Test (timeout = 5000) @@ -1256,8 +1261,8 @@ public void testMissingArguments() throws Exception { sysOutStream.reset(); result = cli.run(new String[] { "container", "-status" }); Assert.assertEquals(result, -1); - Assert.assertEquals(String.format("Missing argument for options%n%1s", - createContainerCLIHelpMessage()), sysOutStream.toString()); + Assert.assertEquals(String.format("Missing argument for options %1s", + createContainerCLIHelpMessage()), normalize(sysOutStream.toString())); sysOutStream.reset(); NodeCLI nodeCLI = new NodeCLI(); @@ -1538,10 +1543,17 @@ private String createContainerCLIHelpMessage() throws IOException { pw.println("usage: container"); pw.println(" -help Displays help for all commands."); pw.println(" -list List containers for application attempt."); + pw.println(" -signal Signal the container."); + pw.println("The available signal commands are "); + pw.println(java.util.Arrays.asList(SignalContainerCommand.values())); + pw.println(" Default command is OUTPUT_THREAD_DUMP."); pw.println(" -status Prints the status of the container."); pw.close(); - String appsHelpStr = baos.toString("UTF-8"); - return appsHelpStr; + try { + return normalize(baos.toString("UTF-8")); + } catch (UnsupportedEncodingException infeasible) { + return infeasible.toString(); + } } private String createNodeCLIHelpMessage() throws IOException { @@ -1560,4 +1572,8 @@ private String createNodeCLIHelpMessage() throws IOException { String nodesHelpStr = baos.toString("UTF-8"); return nodesHelpStr; } + + private static String normalize(String s) { + return SPACES_PATTERN.matcher(s).replaceAll(" "); // single space + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java index 9ccc326a1a..8d7351d465 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java @@ -77,6 +77,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; @@ -125,6 +127,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -527,4 +531,18 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( return null; } } + + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException, IOException { + YarnServiceProtos.SignalContainerRequestProto requestProto = + ((SignalContainerRequestPBImpl) request).getProto(); + try { + return new SignalContainerResponsePBImpl( + proxy.signalContainer(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java index 6ca2136a56..b9485a8048 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportRequestPBImpl; @@ -102,6 +103,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto; @@ -140,8 +142,11 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; @@ -529,4 +534,18 @@ public UpdateApplicationPriorityResponseProto updateApplicationPriority( throw new ServiceException(e); } } + + @Override + public SignalContainerResponseProto signalContainer(RpcController controller, + YarnServiceProtos.SignalContainerRequestProto proto) throws ServiceException { + SignalContainerRequestPBImpl request = new SignalContainerRequestPBImpl(proto); + try { + SignalContainerResponse response = real.signalContainer(request); + return ((SignalContainerResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java new file mode 100644 index 0000000000..5618a7b0ee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SignalContainerCommandProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProtoOrBuilder; + + +public class SignalContainerRequestPBImpl + extends SignalContainerRequest { + SignalContainerRequestProto proto = + SignalContainerRequestProto.getDefaultInstance(); + SignalContainerRequestProto.Builder builder = null; + boolean viaProto = false; + + private ContainerId containerId; + private SignalContainerCommand command = null; + + private static SignalContainerCommand convertFromProtoFormat( + SignalContainerCommandProto p) { + return SignalContainerCommand.valueOf(p.name()); + } + + private static SignalContainerCommandProto convertToProtoFormat( + SignalContainerCommand p) { + return SignalContainerCommandProto.valueOf(p.name()); + } + + public SignalContainerRequestPBImpl() { + builder = SignalContainerRequestProto.newBuilder(); + } + + public SignalContainerRequestPBImpl(SignalContainerRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public SignalContainerRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.containerId != null) { + builder.setContainerId(convertToProtoFormat(this.containerId)); + } + + if (this.command != null) { + builder.setCommand(convertToProtoFormat(this.command)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SignalContainerRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public ContainerId getContainerId() { + SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerId != null) { + return this.containerId; + } + if (!p.hasContainerId()) { + return null; + } + this.containerId = convertFromProtoFormat(p.getContainerId()); + return this.containerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) { + builder.clearContainerId(); + } + this.containerId = containerId; + } + + private void initCommand() { + if (this.command != null) { + return; + } + SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if(p.hasCommand()) { + this.command = convertFromProtoFormat(p.getCommand()); + } + } + + @Override + public SignalContainerCommand getCommand() { + initCommand(); + return command; + } + + @Override + public void setCommand(SignalContainerCommand command) { + maybeInitBuilder(); + if (command == null) { + builder.clearCommand(); + } + this.command = command; + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl)t).getProto(); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java new file mode 100644 index 0000000000..b0aae14468 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; + +public class SignalContainerResponsePBImpl extends SignalContainerResponse { + SignalContainerResponseProto proto = SignalContainerResponseProto.getDefaultInstance(); + SignalContainerResponseProto.Builder builder = null; + boolean viaProto = false; + + public SignalContainerResponsePBImpl() { + builder = SignalContainerResponseProto.newBuilder(); + } + + public SignalContainerResponsePBImpl(SignalContainerResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public SignalContainerResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index c0ccf57268..f8a1320aca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -56,6 +57,8 @@ public interface NodeHeartbeatResponse { void addAllApplicationsToCleanup(List applications); + List getContainersToSignalList(); + void addAllContainersToSignal(List containers); long getNextHeartBeatInterval(); void setNextHeartBeatInterval(long nextHeartBeatInterval); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index dc65141ce5..224e50bcf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -37,6 +39,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; @@ -62,8 +65,8 @@ public class NodeHeartbeatResponsePBImpl extends private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; - private List containersToDecrease = null; + private List containersToSignal = null; public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); @@ -105,6 +108,9 @@ private void mergeLocalToBuilder() { if (this.containersToDecrease != null) { addContainersToDecreaseToProto(); } + if (this.containersToSignal != null) { + addContainersToSignalToProto(); + } } private void addSystemCredentialsToProto() { @@ -571,5 +577,75 @@ public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) { maybeInitBuilder(); this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM); } + + @Override + public List getContainersToSignalList() { + initContainersToSignal(); + return this.containersToSignal; + } + + private void initContainersToSignal() { + if (this.containersToSignal != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainersToSignalList(); + this.containersToSignal = new ArrayList(); + + for (SignalContainerRequestProto c : list) { + this.containersToSignal.add(convertFromProtoFormat(c)); + } + } + + @Override + public void addAllContainersToSignal( + final List containersToSignal) { + if (containersToSignal == null) + return; + initContainersToSignal(); + this.containersToSignal.addAll(containersToSignal); + } + + private void addContainersToSignalToProto() { + maybeInitBuilder(); + builder.clearContainersToSignal(); + if (containersToSignal == null) + return; + + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containersToSignal.iterator(); + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public SignalContainerRequestProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainersToSignal(iterable); + } + + private SignalContainerRequestPBImpl convertFromProtoFormat( + SignalContainerRequestProto p) { + return new SignalContainerRequestPBImpl(p); + } + + private SignalContainerRequestProto convertToProtoFormat( + SignalContainerRequest t) { + return ((SignalContainerRequestPBImpl)t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 2db8919d2d..a54bbdb1de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -24,6 +24,7 @@ package hadoop.yarn; import "yarn_protos.proto"; import "yarn_server_common_protos.proto"; +import "yarn_service_protos.proto"; message NodeLabelsProto { repeated NodeLabelProto nodeLabels = 1; @@ -83,6 +84,7 @@ message NodeHeartbeatResponseProto { repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; repeated ContainerProto containers_to_decrease = 12; + repeated SignalContainerRequestProto containers_to_signal = 13; } message SystemCredentialsForAppsProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java new file mode 100644 index 0000000000..b0dc3af67c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java @@ -0,0 +1,37 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager; + +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; + +public class CMgrSignalContainersEvent extends ContainerManagerEvent { + + private List containerToSignal; + + public CMgrSignalContainersEvent(List containerToSignal) { + super(ContainerManagerEventType.SIGNAL_CONTAINERS); + this.containerToSignal = containerToSignal; + } + + public List getContainersToSignal() { + return this.containerToSignal; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java index fcb0252217..8861bc7577 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java @@ -21,5 +21,6 @@ public enum ContainerManagerEventType { FINISH_APPS, FINISH_CONTAINERS, - DECREASE_CONTAINERS_RESOURCE + DECREASE_CONTAINERS_RESOURCE, + SIGNAL_CONTAINERS } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index f8ce90f42b..1b186c81b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -45,6 +45,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionUtil; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -800,6 +801,16 @@ public void run() { new CMgrDecreaseContainersResourceEvent(containersToDecrease) ); } + + // SignalContainer request originally comes from end users via + // ClientRMProtocol's SignalContainer. Forward the request to + // ContainerManager which will dispatch the event to ContainerLauncher. + List containersToSignal = response + .getContainersToSignalList(); + if (containersToSignal.size() != 0) { + dispatcher.getEventHandler().handle( + new CMgrSignalContainersEvent(containersToSignal)); + } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 39d2983fbc..f44de59719 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -97,6 +98,7 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -121,6 +123,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; @@ -1349,6 +1352,23 @@ public void handle(ContainerManagerEvent event) { } } break; + case SIGNAL_CONTAINERS: + CMgrSignalContainersEvent containersSignalEvent = + (CMgrSignalContainersEvent) event; + for (SignalContainerRequest request : containersSignalEvent + .getContainersToSignal()) { + ContainerId containerId = request.getContainerId(); + Container container = this.context.getContainers().get(containerId); + if (container != null) { + LOG.info(containerId + " signal request by ResourceManager."); + this.dispatcher.getEventHandler().handle( + new SignalContainersLauncherEvent(container, + request.getCommand())); + } else { + LOG.info("Container " + containerId + " no longer exists"); + } + } + break; default: throw new YarnRuntimeException( "Got an unknown ContainerManagerEvent type: " + event.getType()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index bf00d74dce..9718098a6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.ipc.RPCUtil; @@ -461,6 +462,100 @@ public void cleanupContainer() throws IOException { } } + /** + * Send a signal to the container. + * + * + * @throws IOException + */ + @SuppressWarnings("unchecked") // dispatcher not typed + public void signalContainer(SignalContainerCommand command) + throws IOException { + ContainerId containerId = + container.getContainerTokenIdentifier().getContainerID(); + String containerIdStr = ConverterUtils.toString(containerId); + String user = container.getUser(); + Signal signal = translateCommandToSignal(command); + if (signal.equals(Signal.NULL)) { + LOG.info("ignore signal command " + command); + return; + } + + LOG.info("Sending signal " + command + " to container " + containerIdStr); + + boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); + if (!alreadyLaunched) { + LOG.info("Container " + containerIdStr + " not launched." + + " Not sending the signal"); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Getting pid for container " + containerIdStr + + " to send signal to from pid file " + + (pidFilePath != null ? pidFilePath.toString() : "null")); + } + + try { + // get process id from pid file if available + // else if shell is still active, get it from the shell + String processId = null; + if (pidFilePath != null) { + processId = getContainerPid(pidFilePath); + } + + if (processId != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending signal to pid " + processId + + " as user " + user + + " for container " + containerIdStr); + } + + boolean result = exec.signalContainer( + new ContainerSignalContext.Builder() + .setContainer(container) + .setUser(user) + .setPid(processId) + .setSignal(signal) + .build()); + + String diagnostics = "Sent signal " + command + + " (" + signal + ") to pid " + processId + + " as user " + user + + " for container " + containerIdStr + + ", result=" + (result ? "success" : "failed"); + LOG.info(diagnostics); + + dispatcher.getEventHandler().handle( + new ContainerDiagnosticsUpdateEvent(containerId, diagnostics)); + } + } catch (Exception e) { + String message = + "Exception when sending signal to container " + containerIdStr + + ": " + StringUtils.stringifyException(e); + LOG.warn(message); + } + } + + @VisibleForTesting + public static Signal translateCommandToSignal( + SignalContainerCommand command) { + Signal signal = Signal.NULL; + switch (command) { + case OUTPUT_THREAD_DUMP: + // TODO for windows support. + signal = Shell.WINDOWS ? Signal.NULL: Signal.QUIT; + break; + case GRACEFUL_SHUTDOWN: + signal = Signal.TERM; + break; + case FORCEFUL_SHUTDOWN: + signal = Signal.KILL; + break; + } + return signal; + } + /** * Loop through for a time-bounded interval waiting to * read the process id from a file generated by a running process. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index 6950aa9381..3a2649eb63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -142,7 +142,23 @@ public void handle(ContainersLauncherEvent event) { + ". Ignoring."); } break; + case SIGNAL_CONTAINER: + SignalContainersLauncherEvent signalEvent = + (SignalContainersLauncherEvent) event; + ContainerLaunch runningContainer = running.get(containerId); + if (runningContainer == null) { + // Container not launched. So nothing needs to be done. + LOG.info("Container " + containerId + " not running, nothing to signal."); + return; + } + + try { + runningContainer.signalContainer(signalEvent.getCommand()); + } catch (IOException e) { + LOG.warn("Got exception while signaling container " + containerId + + " with command " + signalEvent.getCommand()); + } + break; } } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java index 385b5b2d9d..a88564db85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java @@ -22,4 +22,5 @@ public enum ContainersLauncherEventType { LAUNCH_CONTAINER, RECOVER_CONTAINER, CLEANUP_CONTAINER, // The process(grp) itself. + SIGNAL_CONTAINER, } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java new file mode 100644 index 0000000000..de544f0c30 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java @@ -0,0 +1,38 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; + +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +// This event can be triggered by one of the following flows +// WebUI -> Container +// CLI -> RM -> NM +public class SignalContainersLauncherEvent extends ContainersLauncherEvent{ + + private final SignalContainerCommand command; + public SignalContainersLauncherEvent(Container container, + SignalContainerCommand command) { + super(container, ContainersLauncherEventType.SIGNAL_CONTAINER); + this.command = command; + } + public SignalContainerCommand getCommand() { + return command; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index 75bcdaef9c..3e008854d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -222,6 +222,42 @@ public void testChangeContainerResource() throws Exception { super.testChangeContainerResource(); } + @Override + public void testOutputThreadDumpSignal() throws IOException, + InterruptedException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testOutputThreadDumpSignal"); + super.testOutputThreadDumpSignal(); + } + + @Override + public void testGracefulShutdownSignal() throws IOException, + InterruptedException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testGracefulShutdownSignal"); + super.testGracefulShutdownSignal(); + } + + @Override + public void testForcefulShutdownSignal() throws IOException, + InterruptedException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testForcefulShutdownSignal"); + super.testForcefulShutdownSignal(); + } + private boolean shouldRunTest() { return System .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 70a8f5576d..7b084cfdc4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -61,6 +61,7 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -166,9 +168,11 @@ public static MasterKey createMasterKey() { private class MyResourceTracker implements ResourceTracker { private final Context context; + private boolean signalContainer; - public MyResourceTracker(Context context) { + public MyResourceTracker(Context context, boolean signalContainer) { this.context = context; + this.signalContainer = signalContainer; } @Override @@ -222,17 +226,19 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeStatus.setResponseId(heartBeatID++); Map> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); + List containersToSignal = null; ApplicationId appId1 = ApplicationId.newInstance(0, 1); ApplicationId appId2 = ApplicationId.newInstance(0, 2); + ContainerId firstContainerID = null; if (heartBeatID == 1) { Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); // Give a container to the NM. ApplicationAttemptId appAttemptID = ApplicationAttemptId.newInstance(appId1, 0); - ContainerId firstContainerID = + firstContainerID = ContainerId.newContainerId(appAttemptID, heartBeatID); ContainerLaunchContext launchContext = recordFactory .newRecordInstance(ContainerLaunchContext.class); @@ -259,6 +265,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) this.context.getContainers(); Assert.assertEquals(1, activeContainers.size()); + if (this.signalContainer) { + containersToSignal = new ArrayList(); + SignalContainerRequest signalReq = recordFactory + .newRecordInstance(SignalContainerRequest.class); + signalReq.setContainerId(firstContainerID); + signalReq.setCommand(SignalContainerCommand.OUTPUT_THREAD_DUMP); + containersToSignal.add(signalReq); + } + // Give another container to the NM. ApplicationAttemptId appAttemptID = ApplicationAttemptId.newInstance(appId2, 0); @@ -295,6 +310,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null, 1000L); + if (containersToSignal != null) { + nhResponse.addAllContainersToSignal(containersToSignal); + } return nhResponse; } @@ -306,15 +324,40 @@ public UnRegisterNodeManagerResponse unRegisterNodeManager( } } + private class MyContainerManager extends ContainerManagerImpl { + public boolean signaled = false; + + public MyContainerManager(Context context, ContainerExecutor exec, + DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, + NodeManagerMetrics metrics, + LocalDirsHandlerService dirsHandler) { + super(context, exec, deletionContext, nodeStatusUpdater, + metrics, dirsHandler); + } + + @Override + public void handle(ContainerManagerEvent event) { + if (event.getType() == ContainerManagerEventType.SIGNAL_CONTAINERS) { + signaled = true; + } + } + } + private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl { public ResourceTracker resourceTracker; private Context context; public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + this(context, dispatcher, healthChecker, metrics, false); + } + + public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + boolean signalContainer) { super(context, dispatcher, healthChecker, metrics); this.context = context; - resourceTracker = new MyResourceTracker(this.context); + resourceTracker = new MyResourceTracker(this.context, signalContainer); } @Override @@ -1547,6 +1590,66 @@ public void cleanUpApplicationsOnNMShutDown() { nm.stop(); } + + //Verify that signalContainer request can be dispatched from + //NodeStatusUpdaterImpl to ContainerManagerImpl. + @Test + public void testSignalContainerToContainerManager() throws Exception { + nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new MyNodeStatusUpdater( + context, dispatcher, healthChecker, metrics, true); + } + + @Override + protected ContainerManagerImpl createContainerManager(Context context, + ContainerExecutor exec, DeletionService del, + NodeStatusUpdater nodeStatusUpdater, + ApplicationACLsManager aclsManager, + LocalDirsHandlerService diskhandler) { + return new MyContainerManager(context, exec, del, nodeStatusUpdater, + metrics, diskhandler); + } + }; + + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + + System.out.println(" ----- thread already started.." + + nm.getServiceState()); + + int waitCount = 0; + while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) { + LOG.info("Waiting for NM to start.."); + if (nmStartError != null) { + LOG.error("Error during startup. ", nmStartError); + Assert.fail(nmStartError.getCause().getMessage()); + } + Thread.sleep(1000); + } + if (nm.getServiceState() != STATE.STARTED) { + // NM could have failed. + Assert.fail("NodeManager failed to start"); + } + + waitCount = 0; + while (heartBeatID <= 3 && waitCount++ != 20) { + Thread.sleep(500); + } + Assert.assertFalse(heartBeatID <= 3); + Assert.assertEquals("Number of registered NMs is wrong!!", 1, + this.registeredNodes.size()); + + MyContainerManager containerManager = + (MyContainerManager)nm.getContainerManager(); + Assert.assertTrue(containerManager.signaled); + + nm.stop(); + } + @Test public void testConcurrentAccessToSystemCredentials(){ final Map testCredentials = new HashMap<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index f482784fe9..0069aaa37f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -82,6 +82,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; @@ -464,4 +466,10 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( IOException { return null; } + + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws IOException { +return null; +} } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 39383428b9..532944bf5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -73,6 +73,8 @@ import org.junit.After; import org.junit.Before; +import static org.mockito.Mockito.spy; + public abstract class BaseContainerManagerTest { protected static RecordFactory recordFactory = RecordFactoryProvider @@ -148,7 +150,7 @@ public long getRMIdentifier() { protected ContainerExecutor createContainerExecutor() { DefaultContainerExecutor exec = new DefaultContainerExecutor(); exec.setConf(conf); - return exec; + return spy(exec); } @Before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 3fb4112447..3f5fc825c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -75,22 +77,30 @@ import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; public class TestContainerManager extends BaseContainerManagerTest { @@ -262,7 +272,7 @@ public void testContainerSetup() throws Exception { Assert.assertEquals(null, reader.readLine()); } - @Test + //@Test public void testContainerLaunchAndStop() throws IOException, InterruptedException, YarnException { containerManager.start(); @@ -1173,4 +1183,103 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier, .retrievePassword(containerTokenIdentifier), containerTokenIdentifier); } + + @Test + public void testOutputThreadDumpSignal() throws IOException, + InterruptedException, YarnException { + testContainerLaunchAndSignal(SignalContainerCommand.OUTPUT_THREAD_DUMP); + } + + @Test + public void testGracefulShutdownSignal() throws IOException, + InterruptedException, YarnException { + testContainerLaunchAndSignal(SignalContainerCommand.GRACEFUL_SHUTDOWN); + } + + @Test + public void testForcefulShutdownSignal() throws IOException, + InterruptedException, YarnException { + testContainerLaunchAndSignal(SignalContainerCommand.FORCEFUL_SHUTDOWN); + } + + // Verify signal container request can be delivered from + // NodeStatusUpdaterImpl to ContainerExecutor. + private void testContainerLaunchAndSignal(SignalContainerCommand command) + throws IOException, InterruptedException, YarnException { + + Signal signal = ContainerLaunch.translateCommandToSignal(command); + containerManager.start(); + + File scriptFile = new File(tmpDir, "scriptFile.sh"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + File processStartFile = + new File(tmpDir, "start_file.txt").getAbsoluteFile(); + fileWriter.write("\numask 0"); // So that start file is readable by the test + fileWriter.write("\necho Hello World! > " + processStartFile); + fileWriter.write("\necho $$ >> " + processStartFile); + fileWriter.write("\nexec sleep 1000s"); + fileWriter.close(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + // ////// Construct the Container-id + ContainerId cId = createContainerId(0); + + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + List commands = new ArrayList(); + commands.add("/bin/bash"); + commands.add(scriptFile.getAbsolutePath()); + containerLaunchContext.setCommands(commands); + StartContainerRequest scRequest = + StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), + user, context.getContainerTokenSecretManager())); + List list = new ArrayList(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + int timeoutSecs = 0; + while (!processStartFile.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + Assert.assertTrue("ProcessStartFile doesn't exist!", + processStartFile.exists()); + + // Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent + SignalContainerRequest signalReq = + SignalContainerRequest.newInstance(cId, command); + List reqs = new ArrayList(); + reqs.add(signalReq); + containerManager.handle(new CMgrSignalContainersEvent(reqs)); + + final ArgumentCaptor signalContextCaptor = + ArgumentCaptor.forClass(ContainerSignalContext.class); + if (signal.equals(Signal.NULL)) { + verify(exec, never()).signalContainer(signalContextCaptor.capture()); + } else { + verify(exec, timeout(10000).atLeastOnce()).signalContainer(signalContextCaptor.capture()); + ContainerSignalContext signalContext = signalContextCaptor.getAllValues().get(0); + Assert.assertEquals(cId, signalContext.getContainer().getContainerId()); + Assert.assertEquals(signal, signalContext.getSignal()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index dad86f54c6..a69cc689ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -93,10 +93,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; @@ -139,6 +141,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -1392,4 +1395,66 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( return response; } + /** + * Signal a container. + * After the request passes some sanity check, it will be delivered + * to RMNodeImpl so that the next NM heartbeat will pick up the signal request + */ + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException, IOException { + ContainerId containerId = request.getContainerId(); + + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + throw RPCUtil.getRemoteException(ie); + } + + ApplicationId applicationId = containerId.getApplicationAttemptId(). + getApplicationId(); + RMApp application = this.rmContext.getRMApps().get(applicationId); + if (application == null) { + RMAuditLogger.logFailure(callerUGI.getUserName(), + AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService", + "Trying to signal an absent container", applicationId, containerId); + throw RPCUtil + .getRemoteException("Trying to signal an absent container " + + containerId); + } + + if (!checkAccess(callerUGI, application.getUser(), + ApplicationAccessType.MODIFY_APP, application)) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.SIGNAL_CONTAINER, "User doesn't have permissions to " + + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", + AuditConstants.UNAUTHORIZED_USER, applicationId); + throw RPCUtil.getRemoteException(new AccessControlException("User " + + callerUGI.getShortUserName() + " cannot perform operation " + + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); + } + + RMContainer container = scheduler.getRMContainer(containerId); + if (container != null) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeSignalContainerEvent(container.getContainer().getNodeId(), + request)); + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.SIGNAL_CONTAINER, "ClientRMService", applicationId, + containerId); + } else { + RMAuditLogger.logFailure(callerUGI.getUserName(), + AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService", + "Trying to signal an absent container", applicationId, containerId); + throw RPCUtil + .getRemoteException("Trying to signal an absent container " + + containerId); + } + + return recordFactory + .newRecordInstance(SignalContainerResponse.class); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index cd9a61de20..92745b7f46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -58,6 +58,7 @@ public static class AuditConstants { "Update Application Priority Request"; public static final String CHANGE_CONTAINER_RESOURCE = "AM Changed Container Resource"; + public static final String SIGNAL_CONTAINER = "Signal Container Request"; // Some commonly used descriptions public static final String UNAUTHORIZED_USER = "Unauthorized user"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index abe854485a..b28fef3c92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -44,6 +44,9 @@ public enum RMNodeEventType { CLEANUP_CONTAINER, DECREASE_CONTAINER, + // Source: ClientRMService + SIGNAL_CONTAINER, + // Source: RMAppAttempt FINISHED_CONTAINERS_PULLED_BY_AM, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 33e471417d..e0d27d65b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -122,6 +123,10 @@ public class RMNodeImpl implements RMNode, EventHandler { private final Set containersToClean = new TreeSet( new ContainerIdComparator()); + /* set of containers that need to be signaled */ + private final List containersToSignal = + new ArrayList(); + /* * set of containers to notify NM to remove them from its context. Currently, * this includes containers that were notified to AM about their completion @@ -194,6 +199,8 @@ RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.DECREASE_CONTAINER, new DecreaseContainersTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN, RMNodeEventType.SHUTDOWN, new DeactivateNodeTransition(NodeState.SHUTDOWN)) @@ -288,6 +295,8 @@ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, new AddContainersToBeRemovedFromNMTransition()) + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN, RMNodeEventType.SHUTDOWN, new DeactivateNodeTransition(NodeState.SHUTDOWN)) @@ -491,8 +500,10 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response response.addAllApplicationsToCleanup(this.finishedApplications); response.addContainersToBeRemovedFromNM( new ArrayList(this.containersToBeRemovedFromNM)); + response.addAllContainersToSignal(this.containersToSignal); this.containersToClean.clear(); this.finishedApplications.clear(); + this.containersToSignal.clear(); this.containersToBeRemovedFromNM.clear(); } finally { this.writeLock.unlock(); @@ -1090,6 +1101,16 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + public static class SignalContainerTransition implements + SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + rmNode.containersToSignal.add((( + RMNodeSignalContainerEvent) event).getSignalRequest()); + } + } + @Override public List pullContainerUpdates() { List latestContainerInfoList = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java new file mode 100644 index 0000000000..098c07a390 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; + +public class RMNodeSignalContainerEvent extends RMNodeEvent { + + private SignalContainerRequest signalRequest; + + public RMNodeSignalContainerEvent(NodeId nodeId, + SignalContainerRequest signalRequest) { + super(nodeId, RMNodeEventType.SIGNAL_CONTAINER); + this.signalRequest = signalRequest; + } + + public SignalContainerRequest getSignalRequest() { + return this.signalRequest; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index a066ba4cd7..674529ef7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; @@ -814,4 +816,12 @@ public void clearQueueMetrics(RMApp app) { public RMActiveServices getRMActiveService() { return activeServices; } + + public void signalContainer(ContainerId containerId, SignalContainerCommand command) + throws Exception { + ApplicationClientProtocol client = getClientRMService(); + SignalContainerRequest req = + SignalContainerRequest.newInstance(containerId, command); + client.signalContainer(req); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java new file mode 100644 index 0000000000..16cb866777 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Test; + +public class TestSignalContainer { + + private static final Log LOG = LogFactory + .getLog(TestSignalContainer.class); + + @Test + public void testSignalRequestDeliveryToNM() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + MockRM rm = new MockRM(); + rm.start(); + + MockNM nm1 = rm.registerNode("h1:1234", 5000); + + RMApp app = rm.submitApp(2000); + + //kick the scheduling + nm1.nodeHeartbeat(true); + + RMAppAttempt attempt = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + + //request for containers + final int request = 2; + am.allocate("h1" , 1000, request, new ArrayList()); + + //kick the scheduler + nm1.nodeHeartbeat(true); + List conts = null; + int contReceived = 0; + int waitCount = 0; + while (contReceived < request && waitCount++ < 200) { + LOG.info("Got " + contReceived + " containers. Waiting to get " + + request); + Thread.sleep(100); + conts = am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + contReceived += conts.size(); + } + Assert.assertEquals(request, contReceived); + + for(Container container : conts) { + rm.signalContainer(container.getId(), + SignalContainerCommand.OUTPUT_THREAD_DUMP); + } + + NodeHeartbeatResponse resp; + List contsToSignal; + int signaledConts = 0; + + waitCount = 0; + while ( signaledConts < request && waitCount++ < 200) { + LOG.info("Waiting to get signalcontainer events.. signaledConts: " + + signaledConts); + resp = nm1.nodeHeartbeat(true); + contsToSignal = resp.getContainersToSignalList(); + signaledConts += contsToSignal.size(); + Thread.sleep(100); + } + + // Verify NM receives the expected number of signal container requests. + Assert.assertEquals(request, signaledConts); + + am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + + rm.stop(); + } +} \ No newline at end of file