diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
index 91c308615e..29266d5e77 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
@@ -63,6 +63,7 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.ClientRMProxy;
@@ -473,4 +474,10 @@ public void updateApplicationPriority(ApplicationId applicationId,
Priority priority) throws YarnException, IOException {
client.updateApplicationPriority(applicationId, priority);
}
+
+ @Override
+ public void signalContainer(ContainerId containerId, SignalContainerCommand command)
+ throws YarnException, IOException {
+ client.signalContainer(containerId, command);
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
index 1bf14080c6..8febec6fcb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
@@ -112,6 +112,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
@@ -453,6 +455,12 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
IOException {
return null;
}
+
+ @Override
+ public SignalContainerResponse signalContainer(
+ SignalContainerRequest request) throws IOException {
+ return null;
+ }
}
class HistoryService extends AMService implements HSClientProtocol {
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9e9522c4ee..bd38c2d2d5 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -222,6 +222,9 @@ Release 2.8.0 - UNRELEASED
YARN-1651. CapacityScheduler side changes to support container resize.
(Wangda Tan via jianhe)
+
+ YARN-1897. CLI and core support for signal container functionality.
+ (Ming Ma via xgong)
IMPROVEMENTS
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
index 08fc289d74..bcd3ef6d64 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
@@ -53,6 +53,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -437,4 +439,32 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels(
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request) throws YarnException,
IOException;
+
+ /**
+ *
The interface used by clients to request the
+ * ResourceManager
to signal a container. For example,
+ * the client can send command OUTPUT_THREAD_DUMP to dump threads of the
+ * container.
+ *
+ * The client, via {@link SignalContainerRequest} provides the
+ * id of the container and the signal command.
+ *
+ * In secure mode,the ResourceManager
verifies access to the
+ * application before signaling the container.
+ * The user needs to have MODIFY_APP
permission.
+ *
+ * Currently, the ResourceManager
returns an empty response
+ * on success and throws an exception on rejecting the request.
+ *
+ * @param request request to signal a container
+ * @return ResourceManager
returns an empty response
+ * on success and throws an exception on rejecting the request
+ * @throws YarnException
+ * @throws IOException
+ */
+ @Public
+ @Unstable
+ public SignalContainerResponse signalContainer(
+ SignalContainerRequest request) throws YarnException,
+ IOException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java
new file mode 100644
index 0000000000..2a3861a773
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The request sent by the client to the ResourceManager
+ * or by the ApplicationMaster
to the NodeManager
+ * to signal a container.
+ * @see SignalContainerCommand
+ */
+@Public
+@Evolving
+public abstract class SignalContainerRequest {
+
+ @Public
+ @Unstable
+ public static SignalContainerRequest newInstance(ContainerId containerId,
+ SignalContainerCommand signalContainerCommand) {
+ SignalContainerRequest request =
+ Records.newRecord(SignalContainerRequest.class);
+ request.setContainerId(containerId);
+ request.setCommand(signalContainerCommand);
+ return request;
+ }
+
+ /**
+ * Get the ContainerId
of the container to signal.
+ * @return ContainerId
of the container to signal.
+ */
+ @Public
+ @Unstable
+ public abstract ContainerId getContainerId();
+
+ /**
+ * Set the ContainerId
of the container to signal.
+ */
+ @Public
+ @Unstable
+ public abstract void setContainerId(ContainerId containerId);
+
+ /**
+ * Get the SignalContainerCommand
of the signal request.
+ * @return SignalContainerCommand
of the signal request.
+ */
+ @Public
+ @Unstable
+ public abstract SignalContainerCommand getCommand();
+
+ /**
+ * Set the SignalContainerCommand
of the signal request.
+ */
+ @Public
+ @Unstable
+ public abstract void setCommand(SignalContainerCommand command);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java
new file mode 100644
index 0000000000..0d773b91ae
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+
+/**
+ * The response sent by the ResourceManager
to the client
+ * signalling a container.
+ *
+ * Currently it's empty.
+ *
+ * @see ApplicationClientProtocol#signalContainer(SignalContainerRequest)
+ */
+@Public
+@Evolving
+public abstract class SignalContainerResponse {
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java
new file mode 100644
index 0000000000..7be6dbdbf6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Enumeration of various signal container commands.
+ */
+@Public
+@Evolving
+public enum SignalContainerCommand {
+ /**
+ * Used to capture thread dump.
+ * On Linux, it is equivalent to SIGQUIT.
+ */
+ OUTPUT_THREAD_DUMP,
+
+ /** Gracefully shutdown a container.
+ * On Linux, it is equivalent to SIGTERM.
+ */
+ GRACEFUL_SHUTDOWN,
+
+ /** Forcefully shutdown a container.
+ * On Linux, it is equivalent to SIGKILL.
+ */
+ FORCEFUL_SHUTDOWN,
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto
index 117c9301bc..a2ab9c2e65 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto
@@ -56,4 +56,5 @@ service ApplicationClientProtocolService {
rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto);
+ rpc signalContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 057aeee1ca..687ee89647 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -417,6 +417,13 @@ message QueueUserACLInfoProto {
repeated QueueACLProto userAcls = 2;
}
+enum SignalContainerCommandProto {
+ OUTPUT_THREAD_DUMP = 1;
+ GRACEFUL_SHUTDOWN = 2;
+ FORCEFUL_SHUTDOWN = 3;
+}
+
+
////////////////////////////////////////////////////////////////////////
////// From reservation_protocol /////////////////////////////////////
////////////////////////////////////////////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index ff5a12787a..15e99f91ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -225,6 +225,14 @@ message UpdateApplicationPriorityRequestProto {
message UpdateApplicationPriorityResponseProto {
}
+message SignalContainerRequestProto {
+ required ContainerIdProto container_id = 1;
+ required SignalContainerCommandProto command = 2;
+}
+
+message SignalContainerResponseProto {
+}
+
//////////////////////////////////////////////////////
/////// client_NM_Protocol ///////////////////////////
//////////////////////////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
index ff90da1a05..7f6a9fcb2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
@@ -38,8 +38,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -56,6 +54,7 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -683,4 +682,18 @@ public abstract List getClusterNodeLabels()
@Unstable
public abstract void updateApplicationPriority(ApplicationId applicationId,
Priority priority) throws YarnException, IOException;
+
+ /**
+ *
+ * Signal a container identified by given ID.
+ *
+ *
+ * @param containerId
+ * {@link ContainerId} of the container that needs to be signaled
+ * @param command the signal container command
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract void signalContainer(ContainerId containerId,
+ SignalContainerCommand command) throws YarnException, IOException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index bc97a12da5..2bc6143a4d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -76,9 +76,9 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -94,6 +94,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -826,4 +827,14 @@ public void updateApplicationPriority(ApplicationId applicationId,
UpdateApplicationPriorityRequest.newInstance(applicationId, priority);
rmClient.updateApplicationPriority(request);
}
+
+ @Override
+ public void signalContainer(ContainerId containerId,
+ SignalContainerCommand command)
+ throws YarnException, IOException {
+ LOG.info("Signalling container " + containerId + " with command " + command);
+ SignalContainerRequest request =
+ SignalContainerRequest.newInstance(containerId, command);
+ rmClient.signalContainer(request);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
index 55692f1b12..be89ce28e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
@@ -42,8 +42,10 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
@@ -148,6 +150,12 @@ public int run(String[] args) throws Exception {
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
opts.getOption(STATUS_CMD).setArgName("Container ID");
opts.getOption(LIST_CMD).setArgName("Application Attempt ID");
+ opts.addOption(SIGNAL_CMD, true,
+ "Signal the container. The available signal commands are " +
+ java.util.Arrays.asList(SignalContainerCommand.values()) +
+ " Default command is OUTPUT_THREAD_DUMP.");
+ opts.getOption(SIGNAL_CMD).setArgName("container ID [signal command]");
+ opts.getOption(SIGNAL_CMD).setArgs(3);
}
int exitCode = -1;
@@ -254,6 +262,19 @@ public int run(String[] args) throws Exception {
}
updateApplicationPriority(cliParser.getOptionValue(APP_ID),
cliParser.getOptionValue(UPDATE_PRIORITY));
+ } else if (cliParser.hasOption(SIGNAL_CMD)) {
+ if (args.length < 3 || args.length > 4) {
+ printUsage(title, opts);
+ return exitCode;
+ }
+ final String[] signalArgs = cliParser.getOptionValues(SIGNAL_CMD);
+ final String containerId = signalArgs[0];
+ SignalContainerCommand command =
+ SignalContainerCommand.OUTPUT_THREAD_DUMP;
+ if (signalArgs.length == 2) {
+ command = SignalContainerCommand.valueOf(signalArgs[1]);
+ }
+ signalContainer(containerId, command);
} else {
syserr.println("Invalid Command Usage : ");
printUsage(title, opts);
@@ -261,6 +282,20 @@ public int run(String[] args) throws Exception {
return 0;
}
+ /**
+ * Signals the containerId
+ *
+ * @param containerIdStr the container id
+ * @param command the signal command
+ * @throws YarnException
+ */
+ private void signalContainer(String containerIdStr,
+ SignalContainerCommand command) throws YarnException, IOException {
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ sysout.println("Signalling container " + containerIdStr);
+ client.signalContainer(containerId, command);
+ }
+
/**
* It prints the usage of the command
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
index 26349fa81a..a0c0148753 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
@@ -35,6 +35,7 @@ public abstract class YarnCLI extends Configured implements Tool {
public static final String KILL_CMD = "kill";
public static final String MOVE_TO_QUEUE_CMD = "movetoqueue";
public static final String HELP_CMD = "help";
+ public static final String SIGNAL_CMD = "signal";
protected PrintStream sysout;
protected PrintStream syserr;
protected YarnClient client;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index e584cf9eff..5c2f23f693 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -77,6 +77,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -97,6 +98,7 @@
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AHSClient;
@@ -125,6 +127,7 @@
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
public class TestYarnClient {
@@ -1299,4 +1302,26 @@ public void testShouldNotRetryForeverForNonNetworkExceptions() throws Exception
}
}
}
+
+ @Test
+ public void testSignalContainer() throws Exception {
+ Configuration conf = new Configuration();
+ @SuppressWarnings("resource")
+ final YarnClient client = new MockYarnClient();
+ client.init(conf);
+ client.start();
+ ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+ applicationId, 1);
+ ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
+ SignalContainerCommand command = SignalContainerCommand.OUTPUT_THREAD_DUMP;
+ client.signalContainer(containerId, command);
+ final ArgumentCaptor signalReqCaptor =
+ ArgumentCaptor.forClass(SignalContainerRequest.class);
+ verify(((MockYarnClient) client).getRMClient())
+ .signalContainer(signalReqCaptor.capture());
+ SignalContainerRequest request = signalReqCaptor.getValue();
+ Assert.assertEquals(containerId, request.getContainerId());
+ Assert.assertEquals(command, request.getCommand());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index de50467fe5..b72fd2d62d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -32,6 +32,7 @@
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -40,6 +41,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.regex.Pattern;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.time.DateFormatUtils;
@@ -61,6 +63,7 @@
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -82,6 +85,8 @@ public class TestYarnCLI {
private PrintStream sysOut;
ByteArrayOutputStream sysErrStream;
private PrintStream sysErr;
+ private static final Pattern SPACES_PATTERN =
+ Pattern.compile("\\s+|\\n+|\\t+");
@Before
public void setup() {
@@ -785,7 +790,7 @@ public void testContainersHelpCommand() throws Exception {
Assert.assertTrue(result == 0);
verify(spyCli).printUsage(any(String.class), any(Options.class));
Assert.assertEquals(createContainerCLIHelpMessage(),
- sysOutStream.toString());
+ normalize(sysOutStream.toString()));
sysOutStream.reset();
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
@@ -795,7 +800,7 @@ public void testContainersHelpCommand() throws Exception {
new String[] {"container", "-list", appAttemptId.toString(), "args" });
verify(spyCli).printUsage(any(String.class), any(Options.class));
Assert.assertEquals(createContainerCLIHelpMessage(),
- sysOutStream.toString());
+ normalize(sysOutStream.toString()));
sysOutStream.reset();
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 7);
@@ -803,7 +808,7 @@ public void testContainersHelpCommand() throws Exception {
new String[] { "container", "-status", containerId.toString(), "args" });
verify(spyCli).printUsage(any(String.class), any(Options.class));
Assert.assertEquals(createContainerCLIHelpMessage(),
- sysOutStream.toString());
+ normalize(sysOutStream.toString()));
}
@Test (timeout = 5000)
@@ -1256,8 +1261,8 @@ public void testMissingArguments() throws Exception {
sysOutStream.reset();
result = cli.run(new String[] { "container", "-status" });
Assert.assertEquals(result, -1);
- Assert.assertEquals(String.format("Missing argument for options%n%1s",
- createContainerCLIHelpMessage()), sysOutStream.toString());
+ Assert.assertEquals(String.format("Missing argument for options %1s",
+ createContainerCLIHelpMessage()), normalize(sysOutStream.toString()));
sysOutStream.reset();
NodeCLI nodeCLI = new NodeCLI();
@@ -1538,10 +1543,17 @@ private String createContainerCLIHelpMessage() throws IOException {
pw.println("usage: container");
pw.println(" -help Displays help for all commands.");
pw.println(" -list List containers for application attempt.");
+ pw.println(" -signal Signal the container.");
+ pw.println("The available signal commands are ");
+ pw.println(java.util.Arrays.asList(SignalContainerCommand.values()));
+ pw.println(" Default command is OUTPUT_THREAD_DUMP.");
pw.println(" -status Prints the status of the container.");
pw.close();
- String appsHelpStr = baos.toString("UTF-8");
- return appsHelpStr;
+ try {
+ return normalize(baos.toString("UTF-8"));
+ } catch (UnsupportedEncodingException infeasible) {
+ return infeasible.toString();
+ }
}
private String createNodeCLIHelpMessage() throws IOException {
@@ -1560,4 +1572,8 @@ private String createNodeCLIHelpMessage() throws IOException {
String nodesHelpStr = baos.toString("UTF-8");
return nodesHelpStr;
}
+
+ private static String normalize(String s) {
+ return SPACES_PATTERN.matcher(s).replaceAll(" "); // single space
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java
index 9ccc326a1a..8d7351d465 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java
@@ -77,6 +77,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
@@ -125,6 +127,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -527,4 +531,18 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
return null;
}
}
+
+ @Override
+ public SignalContainerResponse signalContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ YarnServiceProtos.SignalContainerRequestProto requestProto =
+ ((SignalContainerRequestPBImpl) request).getProto();
+ try {
+ return new SignalContainerResponsePBImpl(
+ proxy.signalContainer(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java
index 6ca2136a56..b9485a8048 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java
@@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportRequestPBImpl;
@@ -102,6 +103,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto;
@@ -140,8 +142,11 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
@@ -529,4 +534,18 @@ public UpdateApplicationPriorityResponseProto updateApplicationPriority(
throw new ServiceException(e);
}
}
+
+ @Override
+ public SignalContainerResponseProto signalContainer(RpcController controller,
+ YarnServiceProtos.SignalContainerRequestProto proto) throws ServiceException {
+ SignalContainerRequestPBImpl request = new SignalContainerRequestPBImpl(proto);
+ try {
+ SignalContainerResponse response = real.signalContainer(request);
+ return ((SignalContainerResponsePBImpl)response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java
new file mode 100644
index 0000000000..5618a7b0ee
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.SignalContainerCommandProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProtoOrBuilder;
+
+
+public class SignalContainerRequestPBImpl
+ extends SignalContainerRequest {
+ SignalContainerRequestProto proto =
+ SignalContainerRequestProto.getDefaultInstance();
+ SignalContainerRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ContainerId containerId;
+ private SignalContainerCommand command = null;
+
+ private static SignalContainerCommand convertFromProtoFormat(
+ SignalContainerCommandProto p) {
+ return SignalContainerCommand.valueOf(p.name());
+ }
+
+ private static SignalContainerCommandProto convertToProtoFormat(
+ SignalContainerCommand p) {
+ return SignalContainerCommandProto.valueOf(p.name());
+ }
+
+ public SignalContainerRequestPBImpl() {
+ builder = SignalContainerRequestProto.newBuilder();
+ }
+
+ public SignalContainerRequestPBImpl(SignalContainerRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SignalContainerRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.containerId != null) {
+ builder.setContainerId(convertToProtoFormat(this.containerId));
+ }
+
+ if (this.command != null) {
+ builder.setCommand(convertToProtoFormat(this.command));
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SignalContainerRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ @Override
+ public ContainerId getContainerId() {
+ SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.containerId != null) {
+ return this.containerId;
+ }
+ if (!p.hasContainerId()) {
+ return null;
+ }
+ this.containerId = convertFromProtoFormat(p.getContainerId());
+ return this.containerId;
+ }
+
+ @Override
+ public void setContainerId(ContainerId containerId) {
+ maybeInitBuilder();
+ if (containerId == null) {
+ builder.clearContainerId();
+ }
+ this.containerId = containerId;
+ }
+
+ private void initCommand() {
+ if (this.command != null) {
+ return;
+ }
+ SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if(p.hasCommand()) {
+ this.command = convertFromProtoFormat(p.getCommand());
+ }
+ }
+
+ @Override
+ public SignalContainerCommand getCommand() {
+ initCommand();
+ return command;
+ }
+
+ @Override
+ public void setCommand(SignalContainerCommand command) {
+ maybeInitBuilder();
+ if (command == null) {
+ builder.clearCommand();
+ }
+ this.command = command;
+ }
+
+ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl)t).getProto();
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java
new file mode 100644
index 0000000000..b0aae14468
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
+
+public class SignalContainerResponsePBImpl extends SignalContainerResponse {
+ SignalContainerResponseProto proto = SignalContainerResponseProto.getDefaultInstance();
+ SignalContainerResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public SignalContainerResponsePBImpl() {
+ builder = SignalContainerResponseProto.newBuilder();
+ }
+
+ public SignalContainerResponsePBImpl(SignalContainerResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SignalContainerResponseProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index c0ccf57268..f8a1320aca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -56,6 +57,8 @@ public interface NodeHeartbeatResponse {
void addAllApplicationsToCleanup(List applications);
+ List getContainersToSignalList();
+ void addAllContainersToSignal(List containers);
long getNextHeartBeatInterval();
void setNextHeartBeatInterval(long nextHeartBeatInterval);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index dc65141ce5..224e50bcf0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -26,6 +26,8 @@
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -37,6 +39,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
@@ -62,8 +65,8 @@ public class NodeHeartbeatResponsePBImpl extends
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
-
private List containersToDecrease = null;
+ private List containersToSignal = null;
public NodeHeartbeatResponsePBImpl() {
builder = NodeHeartbeatResponseProto.newBuilder();
@@ -105,6 +108,9 @@ private void mergeLocalToBuilder() {
if (this.containersToDecrease != null) {
addContainersToDecreaseToProto();
}
+ if (this.containersToSignal != null) {
+ addContainersToSignalToProto();
+ }
}
private void addSystemCredentialsToProto() {
@@ -571,5 +577,75 @@ public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) {
maybeInitBuilder();
this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
}
+
+ @Override
+ public List getContainersToSignalList() {
+ initContainersToSignal();
+ return this.containersToSignal;
+ }
+
+ private void initContainersToSignal() {
+ if (this.containersToSignal != null) {
+ return;
+ }
+ NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List list = p.getContainersToSignalList();
+ this.containersToSignal = new ArrayList();
+
+ for (SignalContainerRequestProto c : list) {
+ this.containersToSignal.add(convertFromProtoFormat(c));
+ }
+ }
+
+ @Override
+ public void addAllContainersToSignal(
+ final List containersToSignal) {
+ if (containersToSignal == null)
+ return;
+ initContainersToSignal();
+ this.containersToSignal.addAll(containersToSignal);
+ }
+
+ private void addContainersToSignalToProto() {
+ maybeInitBuilder();
+ builder.clearContainersToSignal();
+ if (containersToSignal == null)
+ return;
+
+ Iterable iterable =
+ new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+ Iterator iter = containersToSignal.iterator();
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public SignalContainerRequestProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllContainersToSignal(iterable);
+ }
+
+ private SignalContainerRequestPBImpl convertFromProtoFormat(
+ SignalContainerRequestProto p) {
+ return new SignalContainerRequestPBImpl(p);
+ }
+
+ private SignalContainerRequestProto convertToProtoFormat(
+ SignalContainerRequest t) {
+ return ((SignalContainerRequestPBImpl)t).getProto();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 2db8919d2d..a54bbdb1de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -24,6 +24,7 @@ package hadoop.yarn;
import "yarn_protos.proto";
import "yarn_server_common_protos.proto";
+import "yarn_service_protos.proto";
message NodeLabelsProto {
repeated NodeLabelProto nodeLabels = 1;
@@ -83,6 +84,7 @@ message NodeHeartbeatResponseProto {
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
repeated ContainerProto containers_to_decrease = 12;
+ repeated SignalContainerRequestProto containers_to_signal = 13;
}
message SystemCredentialsForAppsProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java
new file mode 100644
index 0000000000..b0dc3af67c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java
@@ -0,0 +1,37 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+
+public class CMgrSignalContainersEvent extends ContainerManagerEvent {
+
+ private List containerToSignal;
+
+ public CMgrSignalContainersEvent(List containerToSignal) {
+ super(ContainerManagerEventType.SIGNAL_CONTAINERS);
+ this.containerToSignal = containerToSignal;
+ }
+
+ public List getContainersToSignal() {
+ return this.containerToSignal;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
index fcb0252217..8861bc7577 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
@@ -21,5 +21,6 @@
public enum ContainerManagerEventType {
FINISH_APPS,
FINISH_CONTAINERS,
- DECREASE_CONTAINERS_RESOURCE
+ DECREASE_CONTAINERS_RESOURCE,
+ SIGNAL_CONTAINERS
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index f8ce90f42b..1b186c81b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -45,6 +45,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionUtil;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -800,6 +801,16 @@ public void run() {
new CMgrDecreaseContainersResourceEvent(containersToDecrease)
);
}
+
+ // SignalContainer request originally comes from end users via
+ // ClientRMProtocol's SignalContainer. Forward the request to
+ // ContainerManager which will dispatch the event to ContainerLauncher.
+ List containersToSignal = response
+ .getContainersToSignalList();
+ if (containersToSignal.size() != 0) {
+ dispatcher.getEventHandler().handle(
+ new CMgrSignalContainersEvent(containersToSignal));
+ }
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 39d2983fbc..f44de59719 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -60,6 +60,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -97,6 +98,7 @@
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -121,6 +123,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
@@ -1349,6 +1352,23 @@ public void handle(ContainerManagerEvent event) {
}
}
break;
+ case SIGNAL_CONTAINERS:
+ CMgrSignalContainersEvent containersSignalEvent =
+ (CMgrSignalContainersEvent) event;
+ for (SignalContainerRequest request : containersSignalEvent
+ .getContainersToSignal()) {
+ ContainerId containerId = request.getContainerId();
+ Container container = this.context.getContainers().get(containerId);
+ if (container != null) {
+ LOG.info(containerId + " signal request by ResourceManager.");
+ this.dispatcher.getEventHandler().handle(
+ new SignalContainersLauncherEvent(container,
+ request.getCommand()));
+ } else {
+ LOG.info("Container " + containerId + " no longer exists");
+ }
+ }
+ break;
default:
throw new YarnRuntimeException(
"Got an unknown ContainerManagerEvent type: " + event.getType());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index bf00d74dce..9718098a6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@@ -461,6 +462,100 @@ public void cleanupContainer() throws IOException {
}
}
+ /**
+ * Send a signal to the container.
+ *
+ *
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ public void signalContainer(SignalContainerCommand command)
+ throws IOException {
+ ContainerId containerId =
+ container.getContainerTokenIdentifier().getContainerID();
+ String containerIdStr = ConverterUtils.toString(containerId);
+ String user = container.getUser();
+ Signal signal = translateCommandToSignal(command);
+ if (signal.equals(Signal.NULL)) {
+ LOG.info("ignore signal command " + command);
+ return;
+ }
+
+ LOG.info("Sending signal " + command + " to container " + containerIdStr);
+
+ boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
+ if (!alreadyLaunched) {
+ LOG.info("Container " + containerIdStr + " not launched."
+ + " Not sending the signal");
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Getting pid for container " + containerIdStr
+ + " to send signal to from pid file "
+ + (pidFilePath != null ? pidFilePath.toString() : "null"));
+ }
+
+ try {
+ // get process id from pid file if available
+ // else if shell is still active, get it from the shell
+ String processId = null;
+ if (pidFilePath != null) {
+ processId = getContainerPid(pidFilePath);
+ }
+
+ if (processId != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending signal to pid " + processId
+ + " as user " + user
+ + " for container " + containerIdStr);
+ }
+
+ boolean result = exec.signalContainer(
+ new ContainerSignalContext.Builder()
+ .setContainer(container)
+ .setUser(user)
+ .setPid(processId)
+ .setSignal(signal)
+ .build());
+
+ String diagnostics = "Sent signal " + command
+ + " (" + signal + ") to pid " + processId
+ + " as user " + user
+ + " for container " + containerIdStr
+ + ", result=" + (result ? "success" : "failed");
+ LOG.info(diagnostics);
+
+ dispatcher.getEventHandler().handle(
+ new ContainerDiagnosticsUpdateEvent(containerId, diagnostics));
+ }
+ } catch (Exception e) {
+ String message =
+ "Exception when sending signal to container " + containerIdStr
+ + ": " + StringUtils.stringifyException(e);
+ LOG.warn(message);
+ }
+ }
+
+ @VisibleForTesting
+ public static Signal translateCommandToSignal(
+ SignalContainerCommand command) {
+ Signal signal = Signal.NULL;
+ switch (command) {
+ case OUTPUT_THREAD_DUMP:
+ // TODO for windows support.
+ signal = Shell.WINDOWS ? Signal.NULL: Signal.QUIT;
+ break;
+ case GRACEFUL_SHUTDOWN:
+ signal = Signal.TERM;
+ break;
+ case FORCEFUL_SHUTDOWN:
+ signal = Signal.KILL;
+ break;
+ }
+ return signal;
+ }
+
/**
* Loop through for a time-bounded interval waiting to
* read the process id from a file generated by a running process.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index 6950aa9381..3a2649eb63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -142,7 +142,23 @@ public void handle(ContainersLauncherEvent event) {
+ ". Ignoring.");
}
break;
+ case SIGNAL_CONTAINER:
+ SignalContainersLauncherEvent signalEvent =
+ (SignalContainersLauncherEvent) event;
+ ContainerLaunch runningContainer = running.get(containerId);
+ if (runningContainer == null) {
+ // Container not launched. So nothing needs to be done.
+ LOG.info("Container " + containerId + " not running, nothing to signal.");
+ return;
+ }
+
+ try {
+ runningContainer.signalContainer(signalEvent.getCommand());
+ } catch (IOException e) {
+ LOG.warn("Got exception while signaling container " + containerId
+ + " with command " + signalEvent.getCommand());
+ }
+ break;
}
}
-
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
index 385b5b2d9d..a88564db85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
@@ -22,4 +22,5 @@ public enum ContainersLauncherEventType {
LAUNCH_CONTAINER,
RECOVER_CONTAINER,
CLEANUP_CONTAINER, // The process(grp) itself.
+ SIGNAL_CONTAINER,
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java
new file mode 100644
index 0000000000..de544f0c30
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+// This event can be triggered by one of the following flows
+// WebUI -> Container
+// CLI -> RM -> NM
+public class SignalContainersLauncherEvent extends ContainersLauncherEvent{
+
+ private final SignalContainerCommand command;
+ public SignalContainersLauncherEvent(Container container,
+ SignalContainerCommand command) {
+ super(container, ContainersLauncherEventType.SIGNAL_CONTAINER);
+ this.command = command;
+ }
+ public SignalContainerCommand getCommand() {
+ return command;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index 75bcdaef9c..3e008854d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -222,6 +222,42 @@ public void testChangeContainerResource() throws Exception {
super.testChangeContainerResource();
}
+ @Override
+ public void testOutputThreadDumpSignal() throws IOException,
+ InterruptedException, YarnException {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testOutputThreadDumpSignal");
+ super.testOutputThreadDumpSignal();
+ }
+
+ @Override
+ public void testGracefulShutdownSignal() throws IOException,
+ InterruptedException, YarnException {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testGracefulShutdownSignal");
+ super.testGracefulShutdownSignal();
+ }
+
+ @Override
+ public void testForcefulShutdownSignal() throws IOException,
+ InterruptedException, YarnException {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testForcefulShutdownSignal");
+ super.testForcefulShutdownSignal();
+ }
+
private boolean shouldRunTest() {
return System
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 70a8f5576d..7b084cfdc4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -61,6 +61,7 @@
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -69,6 +70,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -166,9 +168,11 @@ public static MasterKey createMasterKey() {
private class MyResourceTracker implements ResourceTracker {
private final Context context;
+ private boolean signalContainer;
- public MyResourceTracker(Context context) {
+ public MyResourceTracker(Context context, boolean signalContainer) {
this.context = context;
+ this.signalContainer = signalContainer;
}
@Override
@@ -222,17 +226,19 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
nodeStatus.setResponseId(heartBeatID++);
Map> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
+ List containersToSignal = null;
ApplicationId appId1 = ApplicationId.newInstance(0, 1);
ApplicationId appId2 = ApplicationId.newInstance(0, 2);
+ ContainerId firstContainerID = null;
if (heartBeatID == 1) {
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
// Give a container to the NM.
ApplicationAttemptId appAttemptID =
ApplicationAttemptId.newInstance(appId1, 0);
- ContainerId firstContainerID =
+ firstContainerID =
ContainerId.newContainerId(appAttemptID, heartBeatID);
ContainerLaunchContext launchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
@@ -259,6 +265,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
this.context.getContainers();
Assert.assertEquals(1, activeContainers.size());
+ if (this.signalContainer) {
+ containersToSignal = new ArrayList();
+ SignalContainerRequest signalReq = recordFactory
+ .newRecordInstance(SignalContainerRequest.class);
+ signalReq.setContainerId(firstContainerID);
+ signalReq.setCommand(SignalContainerCommand.OUTPUT_THREAD_DUMP);
+ containersToSignal.add(signalReq);
+ }
+
// Give another container to the NM.
ApplicationAttemptId appAttemptID =
ApplicationAttemptId.newInstance(appId2, 0);
@@ -295,6 +310,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null,
1000L);
+ if (containersToSignal != null) {
+ nhResponse.addAllContainersToSignal(containersToSignal);
+ }
return nhResponse;
}
@@ -306,15 +324,40 @@ public UnRegisterNodeManagerResponse unRegisterNodeManager(
}
}
+ private class MyContainerManager extends ContainerManagerImpl {
+ public boolean signaled = false;
+
+ public MyContainerManager(Context context, ContainerExecutor exec,
+ DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
+ NodeManagerMetrics metrics,
+ LocalDirsHandlerService dirsHandler) {
+ super(context, exec, deletionContext, nodeStatusUpdater,
+ metrics, dirsHandler);
+ }
+
+ @Override
+ public void handle(ContainerManagerEvent event) {
+ if (event.getType() == ContainerManagerEventType.SIGNAL_CONTAINERS) {
+ signaled = true;
+ }
+ }
+ }
+
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker;
private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ this(context, dispatcher, healthChecker, metrics, false);
+ }
+
+ public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
+ boolean signalContainer) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
- resourceTracker = new MyResourceTracker(this.context);
+ resourceTracker = new MyResourceTracker(this.context, signalContainer);
}
@Override
@@ -1547,6 +1590,66 @@ public void cleanUpApplicationsOnNMShutDown() {
nm.stop();
}
+
+ //Verify that signalContainer request can be dispatched from
+ //NodeStatusUpdaterImpl to ContainerManagerImpl.
+ @Test
+ public void testSignalContainerToContainerManager() throws Exception {
+ nm = new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ return new MyNodeStatusUpdater(
+ context, dispatcher, healthChecker, metrics, true);
+ }
+
+ @Override
+ protected ContainerManagerImpl createContainerManager(Context context,
+ ContainerExecutor exec, DeletionService del,
+ NodeStatusUpdater nodeStatusUpdater,
+ ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService diskhandler) {
+ return new MyContainerManager(context, exec, del, nodeStatusUpdater,
+ metrics, diskhandler);
+ }
+ };
+
+ YarnConfiguration conf = createNMConfig();
+ nm.init(conf);
+ nm.start();
+
+ System.out.println(" ----- thread already started.."
+ + nm.getServiceState());
+
+ int waitCount = 0;
+ while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) {
+ LOG.info("Waiting for NM to start..");
+ if (nmStartError != null) {
+ LOG.error("Error during startup. ", nmStartError);
+ Assert.fail(nmStartError.getCause().getMessage());
+ }
+ Thread.sleep(1000);
+ }
+ if (nm.getServiceState() != STATE.STARTED) {
+ // NM could have failed.
+ Assert.fail("NodeManager failed to start");
+ }
+
+ waitCount = 0;
+ while (heartBeatID <= 3 && waitCount++ != 20) {
+ Thread.sleep(500);
+ }
+ Assert.assertFalse(heartBeatID <= 3);
+ Assert.assertEquals("Number of registered NMs is wrong!!", 1,
+ this.registeredNodes.size());
+
+ MyContainerManager containerManager =
+ (MyContainerManager)nm.getContainerManager();
+ Assert.assertTrue(containerManager.signaled);
+
+ nm.stop();
+ }
+
@Test
public void testConcurrentAccessToSystemCredentials(){
final Map testCredentials = new HashMap<>();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
index f482784fe9..0069aaa37f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
@@ -82,6 +82,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
@@ -464,4 +466,10 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
IOException {
return null;
}
+
+ @Override
+ public SignalContainerResponse signalContainer(
+ SignalContainerRequest request) throws IOException {
+return null;
+}
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 39383428b9..532944bf5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -73,6 +73,8 @@
import org.junit.After;
import org.junit.Before;
+import static org.mockito.Mockito.spy;
+
public abstract class BaseContainerManagerTest {
protected static RecordFactory recordFactory = RecordFactoryProvider
@@ -148,7 +150,7 @@ public long getRMIdentifier() {
protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor();
exec.setConf(conf);
- return exec;
+ return spy(exec);
}
@Before
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 3fb4112447..3f5fc825c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -65,6 +66,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -75,22 +77,30 @@
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
public class TestContainerManager extends BaseContainerManagerTest {
@@ -262,7 +272,7 @@ public void testContainerSetup() throws Exception {
Assert.assertEquals(null, reader.readLine());
}
- @Test
+ //@Test
public void testContainerLaunchAndStop() throws IOException,
InterruptedException, YarnException {
containerManager.start();
@@ -1173,4 +1183,103 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
.retrievePassword(containerTokenIdentifier),
containerTokenIdentifier);
}
+
+ @Test
+ public void testOutputThreadDumpSignal() throws IOException,
+ InterruptedException, YarnException {
+ testContainerLaunchAndSignal(SignalContainerCommand.OUTPUT_THREAD_DUMP);
+ }
+
+ @Test
+ public void testGracefulShutdownSignal() throws IOException,
+ InterruptedException, YarnException {
+ testContainerLaunchAndSignal(SignalContainerCommand.GRACEFUL_SHUTDOWN);
+ }
+
+ @Test
+ public void testForcefulShutdownSignal() throws IOException,
+ InterruptedException, YarnException {
+ testContainerLaunchAndSignal(SignalContainerCommand.FORCEFUL_SHUTDOWN);
+ }
+
+ // Verify signal container request can be delivered from
+ // NodeStatusUpdaterImpl to ContainerExecutor.
+ private void testContainerLaunchAndSignal(SignalContainerCommand command)
+ throws IOException, InterruptedException, YarnException {
+
+ Signal signal = ContainerLaunch.translateCommandToSignal(command);
+ containerManager.start();
+
+ File scriptFile = new File(tmpDir, "scriptFile.sh");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+ File processStartFile =
+ new File(tmpDir, "start_file.txt").getAbsoluteFile();
+ fileWriter.write("\numask 0"); // So that start file is readable by the test
+ fileWriter.write("\necho Hello World! > " + processStartFile);
+ fileWriter.write("\necho $$ >> " + processStartFile);
+ fileWriter.write("\nexec sleep 1000s");
+ fileWriter.close();
+
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ // ////// Construct the Container-id
+ ContainerId cId = createContainerId(0);
+
+ URL resource_alpha =
+ ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ LocalResource rsrc_alpha =
+ recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(scriptFile.lastModified());
+ String destinationFile = "dest_file";
+ Map localResources =
+ new HashMap();
+ localResources.put(destinationFile, rsrc_alpha);
+ containerLaunchContext.setLocalResources(localResources);
+ List commands = new ArrayList();
+ commands.add("/bin/bash");
+ commands.add(scriptFile.getAbsolutePath());
+ containerLaunchContext.setCommands(commands);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+ user, context.getContainerTokenSecretManager()));
+ List list = new ArrayList();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
+
+ int timeoutSecs = 0;
+ while (!processStartFile.exists() && timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for process start-file to be created");
+ }
+ Assert.assertTrue("ProcessStartFile doesn't exist!",
+ processStartFile.exists());
+
+ // Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent
+ SignalContainerRequest signalReq =
+ SignalContainerRequest.newInstance(cId, command);
+ List reqs = new ArrayList();
+ reqs.add(signalReq);
+ containerManager.handle(new CMgrSignalContainersEvent(reqs));
+
+ final ArgumentCaptor signalContextCaptor =
+ ArgumentCaptor.forClass(ContainerSignalContext.class);
+ if (signal.equals(Signal.NULL)) {
+ verify(exec, never()).signalContainer(signalContextCaptor.capture());
+ } else {
+ verify(exec, timeout(10000).atLeastOnce()).signalContainer(signalContextCaptor.capture());
+ ContainerSignalContext signalContext = signalContextCaptor.getAllValues().get(0);
+ Assert.assertEquals(cId, signalContext.getContainer().getContainerId());
+ Assert.assertEquals(signal, signalContext.getSignal());
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index dad86f54c6..a69cc689ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -93,10 +93,12 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -139,6 +141,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -1392,4 +1395,66 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
return response;
}
+ /**
+ * Signal a container.
+ * After the request passes some sanity check, it will be delivered
+ * to RMNodeImpl so that the next NM heartbeat will pick up the signal request
+ */
+ @Override
+ public SignalContainerResponse signalContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ ContainerId containerId = request.getContainerId();
+
+ UserGroupInformation callerUGI;
+ try {
+ callerUGI = UserGroupInformation.getCurrentUser();
+ } catch (IOException ie) {
+ LOG.info("Error getting UGI ", ie);
+ throw RPCUtil.getRemoteException(ie);
+ }
+
+ ApplicationId applicationId = containerId.getApplicationAttemptId().
+ getApplicationId();
+ RMApp application = this.rmContext.getRMApps().get(applicationId);
+ if (application == null) {
+ RMAuditLogger.logFailure(callerUGI.getUserName(),
+ AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService",
+ "Trying to signal an absent container", applicationId, containerId);
+ throw RPCUtil
+ .getRemoteException("Trying to signal an absent container "
+ + containerId);
+ }
+
+ if (!checkAccess(callerUGI, application.getUser(),
+ ApplicationAccessType.MODIFY_APP, application)) {
+ RMAuditLogger.logFailure(callerUGI.getShortUserName(),
+ AuditConstants.SIGNAL_CONTAINER, "User doesn't have permissions to "
+ + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
+ AuditConstants.UNAUTHORIZED_USER, applicationId);
+ throw RPCUtil.getRemoteException(new AccessControlException("User "
+ + callerUGI.getShortUserName() + " cannot perform operation "
+ + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
+ }
+
+ RMContainer container = scheduler.getRMContainer(containerId);
+ if (container != null) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeSignalContainerEvent(container.getContainer().getNodeId(),
+ request));
+ RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
+ AuditConstants.SIGNAL_CONTAINER, "ClientRMService", applicationId,
+ containerId);
+ } else {
+ RMAuditLogger.logFailure(callerUGI.getUserName(),
+ AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService",
+ "Trying to signal an absent container", applicationId, containerId);
+ throw RPCUtil
+ .getRemoteException("Trying to signal an absent container "
+ + containerId);
+ }
+
+ return recordFactory
+ .newRecordInstance(SignalContainerResponse.class);
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
index cd9a61de20..92745b7f46 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
@@ -58,6 +58,7 @@ public static class AuditConstants {
"Update Application Priority Request";
public static final String CHANGE_CONTAINER_RESOURCE =
"AM Changed Container Resource";
+ public static final String SIGNAL_CONTAINER = "Signal Container Request";
// Some commonly used descriptions
public static final String UNAUTHORIZED_USER = "Unauthorized user";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
index abe854485a..b28fef3c92 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
@@ -44,6 +44,9 @@ public enum RMNodeEventType {
CLEANUP_CONTAINER,
DECREASE_CONTAINER,
+ // Source: ClientRMService
+ SIGNAL_CONTAINER,
+
// Source: RMAppAttempt
FINISHED_CONTAINERS_PULLED_BY_AM,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 33e471417d..e0d27d65b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -122,6 +123,10 @@ public class RMNodeImpl implements RMNode, EventHandler {
private final Set containersToClean = new TreeSet(
new ContainerIdComparator());
+ /* set of containers that need to be signaled */
+ private final List containersToSignal =
+ new ArrayList();
+
/*
* set of containers to notify NM to remove them from its context. Currently,
* this includes containers that were notified to AM about their completion
@@ -194,6 +199,8 @@ RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.DECREASE_CONTAINER,
new DecreaseContainersTransition())
+ .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+ RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
@@ -288,6 +295,8 @@ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
+ .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+ RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
@@ -491,8 +500,10 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response
response.addAllApplicationsToCleanup(this.finishedApplications);
response.addContainersToBeRemovedFromNM(
new ArrayList(this.containersToBeRemovedFromNM));
+ response.addAllContainersToSignal(this.containersToSignal);
this.containersToClean.clear();
this.finishedApplications.clear();
+ this.containersToSignal.clear();
this.containersToBeRemovedFromNM.clear();
} finally {
this.writeLock.unlock();
@@ -1090,6 +1101,16 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
}
+ public static class SignalContainerTransition implements
+ SingleArcTransition {
+
+ @Override
+ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+ rmNode.containersToSignal.add(((
+ RMNodeSignalContainerEvent) event).getSignalRequest());
+ }
+ }
+
@Override
public List pullContainerUpdates() {
List latestContainerInfoList =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java
new file mode 100644
index 0000000000..098c07a390
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+
+public class RMNodeSignalContainerEvent extends RMNodeEvent {
+
+ private SignalContainerRequest signalRequest;
+
+ public RMNodeSignalContainerEvent(NodeId nodeId,
+ SignalContainerRequest signalRequest) {
+ super(nodeId, RMNodeEventType.SIGNAL_CONTAINER);
+ this.signalRequest = signalRequest;
+ }
+
+ public SignalContainerRequest getSignalRequest() {
+ return this.signalRequest;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index a066ba4cd7..674529ef7b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -52,6 +53,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -814,4 +816,12 @@ public void clearQueueMetrics(RMApp app) {
public RMActiveServices getRMActiveService() {
return activeServices;
}
+
+ public void signalContainer(ContainerId containerId, SignalContainerCommand command)
+ throws Exception {
+ ApplicationClientProtocol client = getClientRMService();
+ SignalContainerRequest req =
+ SignalContainerRequest.newInstance(containerId, command);
+ client.signalContainer(req);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java
new file mode 100644
index 0000000000..16cb866777
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+public class TestSignalContainer {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestSignalContainer.class);
+
+ @Test
+ public void testSignalRequestDeliveryToNM() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ MockRM rm = new MockRM();
+ rm.start();
+
+ MockNM nm1 = rm.registerNode("h1:1234", 5000);
+
+ RMApp app = rm.submitApp(2000);
+
+ //kick the scheduling
+ nm1.nodeHeartbeat(true);
+
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+
+ //request for containers
+ final int request = 2;
+ am.allocate("h1" , 1000, request, new ArrayList());
+
+ //kick the scheduler
+ nm1.nodeHeartbeat(true);
+ List conts = null;
+ int contReceived = 0;
+ int waitCount = 0;
+ while (contReceived < request && waitCount++ < 200) {
+ LOG.info("Got " + contReceived + " containers. Waiting to get "
+ + request);
+ Thread.sleep(100);
+ conts = am.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers();
+ contReceived += conts.size();
+ }
+ Assert.assertEquals(request, contReceived);
+
+ for(Container container : conts) {
+ rm.signalContainer(container.getId(),
+ SignalContainerCommand.OUTPUT_THREAD_DUMP);
+ }
+
+ NodeHeartbeatResponse resp;
+ List contsToSignal;
+ int signaledConts = 0;
+
+ waitCount = 0;
+ while ( signaledConts < request && waitCount++ < 200) {
+ LOG.info("Waiting to get signalcontainer events.. signaledConts: "
+ + signaledConts);
+ resp = nm1.nodeHeartbeat(true);
+ contsToSignal = resp.getContainersToSignalList();
+ signaledConts += contsToSignal.size();
+ Thread.sleep(100);
+ }
+
+ // Verify NM receives the expected number of signal container requests.
+ Assert.assertEquals(request, signaledConts);
+
+ am.unregisterAppAttempt();
+ nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
+ am.waitForState(RMAppAttemptState.FINISHED);
+
+ rm.stop();
+ }
+}
\ No newline at end of file