YARN-1897. CLI and core support for signal container functionality. Contributed by Ming Ma

This commit is contained in:
Xuan 2015-10-02 18:50:47 -07:00
parent fdf02d1f26
commit 8f08532bde
43 changed files with 1403 additions and 20 deletions

View File

@ -63,6 +63,7 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.ClientRMProxy;
@ -473,4 +474,10 @@ public void updateApplicationPriority(ApplicationId applicationId,
Priority priority) throws YarnException, IOException {
client.updateApplicationPriority(applicationId, priority);
}
@Override
public void signalContainer(ContainerId containerId, SignalContainerCommand command)
throws YarnException, IOException {
client.signalContainer(containerId, command);
}
}

View File

@ -112,6 +112,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
@ -453,6 +455,12 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
IOException {
return null;
}
@Override
public SignalContainerResponse signalContainer(
SignalContainerRequest request) throws IOException {
return null;
}
}
class HistoryService extends AMService implements HSClientProtocol {

View File

@ -222,6 +222,9 @@ Release 2.8.0 - UNRELEASED
YARN-1651. CapacityScheduler side changes to support container resize.
(Wangda Tan via jianhe)
YARN-1897. CLI and core support for signal container functionality.
(Ming Ma via xgong)
IMPROVEMENTS

View File

@ -53,6 +53,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -437,4 +439,32 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels(
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request) throws YarnException,
IOException;
/**
* <p>The interface used by clients to request the
* <code>ResourceManager</code> to signal a container. For example,
* the client can send command OUTPUT_THREAD_DUMP to dump threads of the
* container.</p>
*
* <p>The client, via {@link SignalContainerRequest} provides the
* id of the container and the signal command. </p>
*
* <p> In secure mode,the <code>ResourceManager</code> verifies access to the
* application before signaling the container.
* The user needs to have <code>MODIFY_APP</code> permission.</p>
*
* <p>Currently, the <code>ResourceManager</code> returns an empty response
* on success and throws an exception on rejecting the request.</p>
*
* @param request request to signal a container
* @return <code>ResourceManager</code> returns an empty response
* on success and throws an exception on rejecting the request
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
public SignalContainerResponse signalContainer(
SignalContainerRequest request) throws YarnException,
IOException;
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>The request sent by the client to the <code>ResourceManager</code>
* or by the <code>ApplicationMaster</code> to the <code>NodeManager</code>
* to signal a container.
* @see SignalContainerCommand </p>
*/
@Public
@Evolving
public abstract class SignalContainerRequest {
@Public
@Unstable
public static SignalContainerRequest newInstance(ContainerId containerId,
SignalContainerCommand signalContainerCommand) {
SignalContainerRequest request =
Records.newRecord(SignalContainerRequest.class);
request.setContainerId(containerId);
request.setCommand(signalContainerCommand);
return request;
}
/**
* Get the <code>ContainerId</code> of the container to signal.
* @return <code>ContainerId</code> of the container to signal.
*/
@Public
@Unstable
public abstract ContainerId getContainerId();
/**
* Set the <code>ContainerId</code> of the container to signal.
*/
@Public
@Unstable
public abstract void setContainerId(ContainerId containerId);
/**
* Get the <code>SignalContainerCommand</code> of the signal request.
* @return <code>SignalContainerCommand</code> of the signal request.
*/
@Public
@Unstable
public abstract SignalContainerCommand getCommand();
/**
* Set the <code>SignalContainerCommand</code> of the signal request.
*/
@Public
@Unstable
public abstract void setCommand(SignalContainerCommand command);
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
/**
* <p>The response sent by the <code>ResourceManager</code> to the client
* signalling a container.</p>
*
* <p>Currently it's empty.</p>
*
* @see ApplicationClientProtocol#signalContainer(SignalContainerRequest)
*/
@Public
@Evolving
public abstract class SignalContainerResponse {
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
/**
* Enumeration of various signal container commands.
*/
@Public
@Evolving
public enum SignalContainerCommand {
/**
* Used to capture thread dump.
* On Linux, it is equivalent to SIGQUIT.
*/
OUTPUT_THREAD_DUMP,
/** Gracefully shutdown a container.
* On Linux, it is equivalent to SIGTERM.
*/
GRACEFUL_SHUTDOWN,
/** Forcefully shutdown a container.
* On Linux, it is equivalent to SIGKILL.
*/
FORCEFUL_SHUTDOWN,
}

View File

@ -56,4 +56,5 @@ service ApplicationClientProtocolService {
rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto);
rpc signalContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
}

View File

@ -417,6 +417,13 @@ message QueueUserACLInfoProto {
repeated QueueACLProto userAcls = 2;
}
enum SignalContainerCommandProto {
OUTPUT_THREAD_DUMP = 1;
GRACEFUL_SHUTDOWN = 2;
FORCEFUL_SHUTDOWN = 3;
}
////////////////////////////////////////////////////////////////////////
////// From reservation_protocol /////////////////////////////////////
////////////////////////////////////////////////////////////////////////

View File

@ -225,6 +225,14 @@ message UpdateApplicationPriorityRequestProto {
message UpdateApplicationPriorityResponseProto {
}
message SignalContainerRequestProto {
required ContainerIdProto container_id = 1;
required SignalContainerCommandProto command = 2;
}
message SignalContainerResponseProto {
}
//////////////////////////////////////////////////////
/////// client_NM_Protocol ///////////////////////////
//////////////////////////////////////////////////////

View File

@ -38,8 +38,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -56,6 +54,7 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -683,4 +682,18 @@ public abstract List<NodeLabel> getClusterNodeLabels()
@Unstable
public abstract void updateApplicationPriority(ApplicationId applicationId,
Priority priority) throws YarnException, IOException;
/**
* <p>
* Signal a container identified by given ID.
* </p>
*
* @param containerId
* {@link ContainerId} of the container that needs to be signaled
* @param command the signal container command
* @throws YarnException
* @throws IOException
*/
public abstract void signalContainer(ContainerId containerId,
SignalContainerCommand command) throws YarnException, IOException;
}

View File

@ -76,9 +76,9 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -94,6 +94,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -826,4 +827,14 @@ public void updateApplicationPriority(ApplicationId applicationId,
UpdateApplicationPriorityRequest.newInstance(applicationId, priority);
rmClient.updateApplicationPriority(request);
}
@Override
public void signalContainer(ContainerId containerId,
SignalContainerCommand command)
throws YarnException, IOException {
LOG.info("Signalling container " + containerId + " with command " + command);
SignalContainerRequest request =
SignalContainerRequest.newInstance(containerId, command);
rmClient.signalContainer(request);
}
}

View File

@ -42,8 +42,10 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
@ -148,6 +150,12 @@ public int run(String[] args) throws Exception {
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
opts.getOption(STATUS_CMD).setArgName("Container ID");
opts.getOption(LIST_CMD).setArgName("Application Attempt ID");
opts.addOption(SIGNAL_CMD, true,
"Signal the container. The available signal commands are " +
java.util.Arrays.asList(SignalContainerCommand.values()) +
" Default command is OUTPUT_THREAD_DUMP.");
opts.getOption(SIGNAL_CMD).setArgName("container ID [signal command]");
opts.getOption(SIGNAL_CMD).setArgs(3);
}
int exitCode = -1;
@ -254,6 +262,19 @@ public int run(String[] args) throws Exception {
}
updateApplicationPriority(cliParser.getOptionValue(APP_ID),
cliParser.getOptionValue(UPDATE_PRIORITY));
} else if (cliParser.hasOption(SIGNAL_CMD)) {
if (args.length < 3 || args.length > 4) {
printUsage(title, opts);
return exitCode;
}
final String[] signalArgs = cliParser.getOptionValues(SIGNAL_CMD);
final String containerId = signalArgs[0];
SignalContainerCommand command =
SignalContainerCommand.OUTPUT_THREAD_DUMP;
if (signalArgs.length == 2) {
command = SignalContainerCommand.valueOf(signalArgs[1]);
}
signalContainer(containerId, command);
} else {
syserr.println("Invalid Command Usage : ");
printUsage(title, opts);
@ -261,6 +282,20 @@ public int run(String[] args) throws Exception {
return 0;
}
/**
* Signals the containerId
*
* @param containerIdStr the container id
* @param command the signal command
* @throws YarnException
*/
private void signalContainer(String containerIdStr,
SignalContainerCommand command) throws YarnException, IOException {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
sysout.println("Signalling container " + containerIdStr);
client.signalContainer(containerId, command);
}
/**
* It prints the usage of the command
*

View File

@ -35,6 +35,7 @@ public abstract class YarnCLI extends Configured implements Tool {
public static final String KILL_CMD = "kill";
public static final String MOVE_TO_QUEUE_CMD = "movetoqueue";
public static final String HELP_CMD = "help";
public static final String SIGNAL_CMD = "signal";
protected PrintStream sysout;
protected PrintStream syserr;
protected YarnClient client;

View File

@ -77,6 +77,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@ -97,6 +98,7 @@
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AHSClient;
@ -125,6 +127,7 @@
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class TestYarnClient {
@ -1299,4 +1302,26 @@ public void testShouldNotRetryForeverForNonNetworkExceptions() throws Exception
}
}
}
@Test
public void testSignalContainer() throws Exception {
Configuration conf = new Configuration();
@SuppressWarnings("resource")
final YarnClient client = new MockYarnClient();
client.init(conf);
client.start();
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
applicationId, 1);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
SignalContainerCommand command = SignalContainerCommand.OUTPUT_THREAD_DUMP;
client.signalContainer(containerId, command);
final ArgumentCaptor<SignalContainerRequest> signalReqCaptor =
ArgumentCaptor.forClass(SignalContainerRequest.class);
verify(((MockYarnClient) client).getRMClient())
.signalContainer(signalReqCaptor.capture());
SignalContainerRequest request = signalReqCaptor.getValue();
Assert.assertEquals(containerId, request.getContainerId());
Assert.assertEquals(command, request.getCommand());
}
}

View File

@ -32,6 +32,7 @@
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@ -40,6 +41,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.time.DateFormatUtils;
@ -61,6 +63,7 @@
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
@ -82,6 +85,8 @@ public class TestYarnCLI {
private PrintStream sysOut;
ByteArrayOutputStream sysErrStream;
private PrintStream sysErr;
private static final Pattern SPACES_PATTERN =
Pattern.compile("\\s+|\\n+|\\t+");
@Before
public void setup() {
@ -785,7 +790,7 @@ public void testContainersHelpCommand() throws Exception {
Assert.assertTrue(result == 0);
verify(spyCli).printUsage(any(String.class), any(Options.class));
Assert.assertEquals(createContainerCLIHelpMessage(),
sysOutStream.toString());
normalize(sysOutStream.toString()));
sysOutStream.reset();
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
@ -795,7 +800,7 @@ public void testContainersHelpCommand() throws Exception {
new String[] {"container", "-list", appAttemptId.toString(), "args" });
verify(spyCli).printUsage(any(String.class), any(Options.class));
Assert.assertEquals(createContainerCLIHelpMessage(),
sysOutStream.toString());
normalize(sysOutStream.toString()));
sysOutStream.reset();
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 7);
@ -803,7 +808,7 @@ public void testContainersHelpCommand() throws Exception {
new String[] { "container", "-status", containerId.toString(), "args" });
verify(spyCli).printUsage(any(String.class), any(Options.class));
Assert.assertEquals(createContainerCLIHelpMessage(),
sysOutStream.toString());
normalize(sysOutStream.toString()));
}
@Test (timeout = 5000)
@ -1256,8 +1261,8 @@ public void testMissingArguments() throws Exception {
sysOutStream.reset();
result = cli.run(new String[] { "container", "-status" });
Assert.assertEquals(result, -1);
Assert.assertEquals(String.format("Missing argument for options%n%1s",
createContainerCLIHelpMessage()), sysOutStream.toString());
Assert.assertEquals(String.format("Missing argument for options %1s",
createContainerCLIHelpMessage()), normalize(sysOutStream.toString()));
sysOutStream.reset();
NodeCLI nodeCLI = new NodeCLI();
@ -1538,10 +1543,17 @@ private String createContainerCLIHelpMessage() throws IOException {
pw.println("usage: container");
pw.println(" -help Displays help for all commands.");
pw.println(" -list <Application Attempt ID> List containers for application attempt.");
pw.println(" -signal <container ID [signal command]> Signal the container.");
pw.println("The available signal commands are ");
pw.println(java.util.Arrays.asList(SignalContainerCommand.values()));
pw.println(" Default command is OUTPUT_THREAD_DUMP.");
pw.println(" -status <Container ID> Prints the status of the container.");
pw.close();
String appsHelpStr = baos.toString("UTF-8");
return appsHelpStr;
try {
return normalize(baos.toString("UTF-8"));
} catch (UnsupportedEncodingException infeasible) {
return infeasible.toString();
}
}
private String createNodeCLIHelpMessage() throws IOException {
@ -1560,4 +1572,8 @@ private String createNodeCLIHelpMessage() throws IOException {
String nodesHelpStr = baos.toString("UTF-8");
return nodesHelpStr;
}
private static String normalize(String s) {
return SPACES_PATTERN.matcher(s).replaceAll(" "); // single space
}
}

View File

@ -77,6 +77,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
@ -125,6 +127,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -527,4 +531,18 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
return null;
}
}
@Override
public SignalContainerResponse signalContainer(
SignalContainerRequest request) throws YarnException, IOException {
YarnServiceProtos.SignalContainerRequestProto requestProto =
((SignalContainerRequestPBImpl) request).getProto();
try {
return new SignalContainerResponsePBImpl(
proxy.signalContainer(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
}

View File

@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportRequestPBImpl;
@ -102,6 +103,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto;
@ -140,8 +142,11 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
@ -529,4 +534,18 @@ public UpdateApplicationPriorityResponseProto updateApplicationPriority(
throw new ServiceException(e);
}
}
@Override
public SignalContainerResponseProto signalContainer(RpcController controller,
YarnServiceProtos.SignalContainerRequestProto proto) throws ServiceException {
SignalContainerRequestPBImpl request = new SignalContainerRequestPBImpl(proto);
try {
SignalContainerResponse response = real.signalContainer(request);
return ((SignalContainerResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SignalContainerCommandProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProtoOrBuilder;
public class SignalContainerRequestPBImpl
extends SignalContainerRequest {
SignalContainerRequestProto proto =
SignalContainerRequestProto.getDefaultInstance();
SignalContainerRequestProto.Builder builder = null;
boolean viaProto = false;
private ContainerId containerId;
private SignalContainerCommand command = null;
private static SignalContainerCommand convertFromProtoFormat(
SignalContainerCommandProto p) {
return SignalContainerCommand.valueOf(p.name());
}
private static SignalContainerCommandProto convertToProtoFormat(
SignalContainerCommand p) {
return SignalContainerCommandProto.valueOf(p.name());
}
public SignalContainerRequestPBImpl() {
builder = SignalContainerRequestProto.newBuilder();
}
public SignalContainerRequestPBImpl(SignalContainerRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public SignalContainerRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.containerId != null) {
builder.setContainerId(convertToProtoFormat(this.containerId));
}
if (this.command != null) {
builder.setCommand(convertToProtoFormat(this.command));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = SignalContainerRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public ContainerId getContainerId() {
SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.containerId != null) {
return this.containerId;
}
if (!p.hasContainerId()) {
return null;
}
this.containerId = convertFromProtoFormat(p.getContainerId());
return this.containerId;
}
@Override
public void setContainerId(ContainerId containerId) {
maybeInitBuilder();
if (containerId == null) {
builder.clearContainerId();
}
this.containerId = containerId;
}
private void initCommand() {
if (this.command != null) {
return;
}
SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
if(p.hasCommand()) {
this.command = convertFromProtoFormat(p.getCommand());
}
}
@Override
public SignalContainerCommand getCommand() {
initCommand();
return command;
}
@Override
public void setCommand(SignalContainerCommand command) {
maybeInitBuilder();
if (command == null) {
builder.clearCommand();
}
this.command = command;
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
public class SignalContainerResponsePBImpl extends SignalContainerResponse {
SignalContainerResponseProto proto = SignalContainerResponseProto.getDefaultInstance();
SignalContainerResponseProto.Builder builder = null;
boolean viaProto = false;
public SignalContainerResponsePBImpl() {
builder = SignalContainerResponseProto.newBuilder();
}
public SignalContainerResponsePBImpl(SignalContainerResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public SignalContainerResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
}

View File

@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -56,6 +57,8 @@ public interface NodeHeartbeatResponse {
void addAllApplicationsToCleanup(List<ApplicationId> applications);
List<SignalContainerRequest> getContainersToSignalList();
void addAllContainersToSignal(List<SignalContainerRequest> containers);
long getNextHeartBeatInterval();
void setNextHeartBeatInterval(long nextHeartBeatInterval);

View File

@ -26,6 +26,8 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -37,6 +39,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
@ -62,8 +65,8 @@ public class NodeHeartbeatResponsePBImpl extends
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
private List<Container> containersToDecrease = null;
private List<SignalContainerRequest> containersToSignal = null;
public NodeHeartbeatResponsePBImpl() {
builder = NodeHeartbeatResponseProto.newBuilder();
@ -105,6 +108,9 @@ private void mergeLocalToBuilder() {
if (this.containersToDecrease != null) {
addContainersToDecreaseToProto();
}
if (this.containersToSignal != null) {
addContainersToSignalToProto();
}
}
private void addSystemCredentialsToProto() {
@ -571,5 +577,75 @@ public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) {
maybeInitBuilder();
this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
}
@Override
public List<SignalContainerRequest> getContainersToSignalList() {
initContainersToSignal();
return this.containersToSignal;
}
private void initContainersToSignal() {
if (this.containersToSignal != null) {
return;
}
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<SignalContainerRequestProto> list = p.getContainersToSignalList();
this.containersToSignal = new ArrayList<SignalContainerRequest>();
for (SignalContainerRequestProto c : list) {
this.containersToSignal.add(convertFromProtoFormat(c));
}
}
@Override
public void addAllContainersToSignal(
final List<SignalContainerRequest> containersToSignal) {
if (containersToSignal == null)
return;
initContainersToSignal();
this.containersToSignal.addAll(containersToSignal);
}
private void addContainersToSignalToProto() {
maybeInitBuilder();
builder.clearContainersToSignal();
if (containersToSignal == null)
return;
Iterable<SignalContainerRequestProto> iterable =
new Iterable<SignalContainerRequestProto>() {
@Override
public Iterator<SignalContainerRequestProto> iterator() {
return new Iterator<SignalContainerRequestProto>() {
Iterator<SignalContainerRequest> iter = containersToSignal.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public SignalContainerRequestProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainersToSignal(iterable);
}
private SignalContainerRequestPBImpl convertFromProtoFormat(
SignalContainerRequestProto p) {
return new SignalContainerRequestPBImpl(p);
}
private SignalContainerRequestProto convertToProtoFormat(
SignalContainerRequest t) {
return ((SignalContainerRequestPBImpl)t).getProto();
}
}

View File

@ -24,6 +24,7 @@ package hadoop.yarn;
import "yarn_protos.proto";
import "yarn_server_common_protos.proto";
import "yarn_service_protos.proto";
message NodeLabelsProto {
repeated NodeLabelProto nodeLabels = 1;
@ -83,6 +84,7 @@ message NodeHeartbeatResponseProto {
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
repeated ContainerProto containers_to_decrease = 12;
repeated SignalContainerRequestProto containers_to_signal = 13;
}
message SystemCredentialsForAppsProto {

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
public class CMgrSignalContainersEvent extends ContainerManagerEvent {
private List<SignalContainerRequest> containerToSignal;
public CMgrSignalContainersEvent(List<SignalContainerRequest> containerToSignal) {
super(ContainerManagerEventType.SIGNAL_CONTAINERS);
this.containerToSignal = containerToSignal;
}
public List<SignalContainerRequest> getContainersToSignal() {
return this.containerToSignal;
}
}

View File

@ -21,5 +21,6 @@
public enum ContainerManagerEventType {
FINISH_APPS,
FINISH_CONTAINERS,
DECREASE_CONTAINERS_RESOURCE
DECREASE_CONTAINERS_RESOURCE,
SIGNAL_CONTAINERS
}

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -800,6 +801,16 @@ public void run() {
new CMgrDecreaseContainersResourceEvent(containersToDecrease)
);
}
// SignalContainer request originally comes from end users via
// ClientRMProtocol's SignalContainer. Forward the request to
// ContainerManager which will dispatch the event to ContainerLauncher.
List<SignalContainerRequest> containersToSignal = response
.getContainersToSignalList();
if (containersToSignal.size() != 0) {
dispatcher.getEventHandler().handle(
new CMgrSignalContainersEvent(containersToSignal));
}
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(

View File

@ -60,6 +60,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -97,6 +98,7 @@
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -121,6 +123,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
@ -1349,6 +1352,23 @@ public void handle(ContainerManagerEvent event) {
}
}
break;
case SIGNAL_CONTAINERS:
CMgrSignalContainersEvent containersSignalEvent =
(CMgrSignalContainersEvent) event;
for (SignalContainerRequest request : containersSignalEvent
.getContainersToSignal()) {
ContainerId containerId = request.getContainerId();
Container container = this.context.getContainers().get(containerId);
if (container != null) {
LOG.info(containerId + " signal request by ResourceManager.");
this.dispatcher.getEventHandler().handle(
new SignalContainersLauncherEvent(container,
request.getCommand()));
} else {
LOG.info("Container " + containerId + " no longer exists");
}
}
break;
default:
throw new YarnRuntimeException(
"Got an unknown ContainerManagerEvent type: " + event.getType());

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@ -461,6 +462,100 @@ public void cleanupContainer() throws IOException {
}
}
/**
* Send a signal to the container.
*
*
* @throws IOException
*/
@SuppressWarnings("unchecked") // dispatcher not typed
public void signalContainer(SignalContainerCommand command)
throws IOException {
ContainerId containerId =
container.getContainerTokenIdentifier().getContainerID();
String containerIdStr = ConverterUtils.toString(containerId);
String user = container.getUser();
Signal signal = translateCommandToSignal(command);
if (signal.equals(Signal.NULL)) {
LOG.info("ignore signal command " + command);
return;
}
LOG.info("Sending signal " + command + " to container " + containerIdStr);
boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
if (!alreadyLaunched) {
LOG.info("Container " + containerIdStr + " not launched."
+ " Not sending the signal");
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Getting pid for container " + containerIdStr
+ " to send signal to from pid file "
+ (pidFilePath != null ? pidFilePath.toString() : "null"));
}
try {
// get process id from pid file if available
// else if shell is still active, get it from the shell
String processId = null;
if (pidFilePath != null) {
processId = getContainerPid(pidFilePath);
}
if (processId != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending signal to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr);
}
boolean result = exec.signalContainer(
new ContainerSignalContext.Builder()
.setContainer(container)
.setUser(user)
.setPid(processId)
.setSignal(signal)
.build());
String diagnostics = "Sent signal " + command
+ " (" + signal + ") to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr
+ ", result=" + (result ? "success" : "failed");
LOG.info(diagnostics);
dispatcher.getEventHandler().handle(
new ContainerDiagnosticsUpdateEvent(containerId, diagnostics));
}
} catch (Exception e) {
String message =
"Exception when sending signal to container " + containerIdStr
+ ": " + StringUtils.stringifyException(e);
LOG.warn(message);
}
}
@VisibleForTesting
public static Signal translateCommandToSignal(
SignalContainerCommand command) {
Signal signal = Signal.NULL;
switch (command) {
case OUTPUT_THREAD_DUMP:
// TODO for windows support.
signal = Shell.WINDOWS ? Signal.NULL: Signal.QUIT;
break;
case GRACEFUL_SHUTDOWN:
signal = Signal.TERM;
break;
case FORCEFUL_SHUTDOWN:
signal = Signal.KILL;
break;
}
return signal;
}
/**
* Loop through for a time-bounded interval waiting to
* read the process id from a file generated by a running process.

View File

@ -142,7 +142,23 @@ public void handle(ContainersLauncherEvent event) {
+ ". Ignoring.");
}
break;
case SIGNAL_CONTAINER:
SignalContainersLauncherEvent signalEvent =
(SignalContainersLauncherEvent) event;
ContainerLaunch runningContainer = running.get(containerId);
if (runningContainer == null) {
// Container not launched. So nothing needs to be done.
LOG.info("Container " + containerId + " not running, nothing to signal.");
return;
}
try {
runningContainer.signalContainer(signalEvent.getCommand());
} catch (IOException e) {
LOG.warn("Got exception while signaling container " + containerId
+ " with command " + signalEvent.getCommand());
}
break;
}
}
}

View File

@ -22,4 +22,5 @@ public enum ContainersLauncherEventType {
LAUNCH_CONTAINER,
RECOVER_CONTAINER,
CLEANUP_CONTAINER, // The process(grp) itself.
SIGNAL_CONTAINER,
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
// This event can be triggered by one of the following flows
// WebUI -> Container
// CLI -> RM -> NM
public class SignalContainersLauncherEvent extends ContainersLauncherEvent{
private final SignalContainerCommand command;
public SignalContainersLauncherEvent(Container container,
SignalContainerCommand command) {
super(container, ContainersLauncherEventType.SIGNAL_CONTAINER);
this.command = command;
}
public SignalContainerCommand getCommand() {
return command;
}
}

View File

@ -222,6 +222,42 @@ public void testChangeContainerResource() throws Exception {
super.testChangeContainerResource();
}
@Override
public void testOutputThreadDumpSignal() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testOutputThreadDumpSignal");
super.testOutputThreadDumpSignal();
}
@Override
public void testGracefulShutdownSignal() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testGracefulShutdownSignal");
super.testGracefulShutdownSignal();
}
@Override
public void testForcefulShutdownSignal() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testForcefulShutdownSignal");
super.testForcefulShutdownSignal();
}
private boolean shouldRunTest() {
return System
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;

View File

@ -61,6 +61,7 @@
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -69,6 +70,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -166,9 +168,11 @@ public static MasterKey createMasterKey() {
private class MyResourceTracker implements ResourceTracker {
private final Context context;
private boolean signalContainer;
public MyResourceTracker(Context context) {
public MyResourceTracker(Context context, boolean signalContainer) {
this.context = context;
this.signalContainer = signalContainer;
}
@Override
@ -222,17 +226,19 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
nodeStatus.setResponseId(heartBeatID++);
Map<ApplicationId, List<ContainerStatus>> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
List<SignalContainerRequest> containersToSignal = null;
ApplicationId appId1 = ApplicationId.newInstance(0, 1);
ApplicationId appId2 = ApplicationId.newInstance(0, 2);
ContainerId firstContainerID = null;
if (heartBeatID == 1) {
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
// Give a container to the NM.
ApplicationAttemptId appAttemptID =
ApplicationAttemptId.newInstance(appId1, 0);
ContainerId firstContainerID =
firstContainerID =
ContainerId.newContainerId(appAttemptID, heartBeatID);
ContainerLaunchContext launchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
@ -259,6 +265,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
this.context.getContainers();
Assert.assertEquals(1, activeContainers.size());
if (this.signalContainer) {
containersToSignal = new ArrayList<SignalContainerRequest>();
SignalContainerRequest signalReq = recordFactory
.newRecordInstance(SignalContainerRequest.class);
signalReq.setContainerId(firstContainerID);
signalReq.setCommand(SignalContainerCommand.OUTPUT_THREAD_DUMP);
containersToSignal.add(signalReq);
}
// Give another container to the NM.
ApplicationAttemptId appAttemptID =
ApplicationAttemptId.newInstance(appId2, 0);
@ -295,6 +310,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null,
1000L);
if (containersToSignal != null) {
nhResponse.addAllContainersToSignal(containersToSignal);
}
return nhResponse;
}
@ -306,15 +324,40 @@ public UnRegisterNodeManagerResponse unRegisterNodeManager(
}
}
private class MyContainerManager extends ContainerManagerImpl {
public boolean signaled = false;
public MyContainerManager(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics,
LocalDirsHandlerService dirsHandler) {
super(context, exec, deletionContext, nodeStatusUpdater,
metrics, dirsHandler);
}
@Override
public void handle(ContainerManagerEvent event) {
if (event.getType() == ContainerManagerEventType.SIGNAL_CONTAINERS) {
signaled = true;
}
}
}
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker;
private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
this(context, dispatcher, healthChecker, metrics, false);
}
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
boolean signalContainer) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
resourceTracker = new MyResourceTracker(this.context);
resourceTracker = new MyResourceTracker(this.context, signalContainer);
}
@Override
@ -1547,6 +1590,66 @@ public void cleanUpApplicationsOnNMShutDown() {
nm.stop();
}
//Verify that signalContainer request can be dispatched from
//NodeStatusUpdaterImpl to ContainerManagerImpl.
@Test
public void testSignalContainerToContainerManager() throws Exception {
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new MyNodeStatusUpdater(
context, dispatcher, healthChecker, metrics, true);
}
@Override
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService diskhandler) {
return new MyContainerManager(context, exec, del, nodeStatusUpdater,
metrics, diskhandler);
}
};
YarnConfiguration conf = createNMConfig();
nm.init(conf);
nm.start();
System.out.println(" ----- thread already started.."
+ nm.getServiceState());
int waitCount = 0;
while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) {
LOG.info("Waiting for NM to start..");
if (nmStartError != null) {
LOG.error("Error during startup. ", nmStartError);
Assert.fail(nmStartError.getCause().getMessage());
}
Thread.sleep(1000);
}
if (nm.getServiceState() != STATE.STARTED) {
// NM could have failed.
Assert.fail("NodeManager failed to start");
}
waitCount = 0;
while (heartBeatID <= 3 && waitCount++ != 20) {
Thread.sleep(500);
}
Assert.assertFalse(heartBeatID <= 3);
Assert.assertEquals("Number of registered NMs is wrong!!", 1,
this.registeredNodes.size());
MyContainerManager containerManager =
(MyContainerManager)nm.getContainerManager();
Assert.assertTrue(containerManager.signaled);
nm.stop();
}
@Test
public void testConcurrentAccessToSystemCredentials(){
final Map<ApplicationId, ByteBuffer> testCredentials = new HashMap<>();

View File

@ -82,6 +82,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
@ -464,4 +466,10 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
IOException {
return null;
}
@Override
public SignalContainerResponse signalContainer(
SignalContainerRequest request) throws IOException {
return null;
}
}

View File

@ -73,6 +73,8 @@
import org.junit.After;
import org.junit.Before;
import static org.mockito.Mockito.spy;
public abstract class BaseContainerManagerTest {
protected static RecordFactory recordFactory = RecordFactoryProvider
@ -148,7 +150,7 @@ public long getRMIdentifier() {
protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor();
exec.setConf(conf);
return exec;
return spy(exec);
}
@Before

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -65,6 +66,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -75,22 +77,30 @@
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
public class TestContainerManager extends BaseContainerManagerTest {
@ -262,7 +272,7 @@ public void testContainerSetup() throws Exception {
Assert.assertEquals(null, reader.readLine());
}
@Test
//@Test
public void testContainerLaunchAndStop() throws IOException,
InterruptedException, YarnException {
containerManager.start();
@ -1173,4 +1183,103 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
.retrievePassword(containerTokenIdentifier),
containerTokenIdentifier);
}
@Test
public void testOutputThreadDumpSignal() throws IOException,
InterruptedException, YarnException {
testContainerLaunchAndSignal(SignalContainerCommand.OUTPUT_THREAD_DUMP);
}
@Test
public void testGracefulShutdownSignal() throws IOException,
InterruptedException, YarnException {
testContainerLaunchAndSignal(SignalContainerCommand.GRACEFUL_SHUTDOWN);
}
@Test
public void testForcefulShutdownSignal() throws IOException,
InterruptedException, YarnException {
testContainerLaunchAndSignal(SignalContainerCommand.FORCEFUL_SHUTDOWN);
}
// Verify signal container request can be delivered from
// NodeStatusUpdaterImpl to ContainerExecutor.
private void testContainerLaunchAndSignal(SignalContainerCommand command)
throws IOException, InterruptedException, YarnException {
Signal signal = ContainerLaunch.translateCommandToSignal(command);
containerManager.start();
File scriptFile = new File(tmpDir, "scriptFile.sh");
PrintWriter fileWriter = new PrintWriter(scriptFile);
File processStartFile =
new File(tmpDir, "start_file.txt").getAbsoluteFile();
fileWriter.write("\numask 0"); // So that start file is readable by the test
fileWriter.write("\necho Hello World! > " + processStartFile);
fileWriter.write("\necho $$ >> " + processStartFile);
fileWriter.write("\nexec sleep 1000s");
fileWriter.close();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
// ////// Construct the Container-id
ContainerId cId = createContainerId(0);
URL resource_alpha =
ConverterUtils.getYarnUrlFromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource rsrc_alpha =
recordFactory.newRecordInstance(LocalResource.class);
rsrc_alpha.setResource(resource_alpha);
rsrc_alpha.setSize(-1);
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
rsrc_alpha.setType(LocalResourceType.FILE);
rsrc_alpha.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
List<String> commands = new ArrayList<String>();
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
int timeoutSecs = 0;
while (!processStartFile.exists() && timeoutSecs++ < 20) {
Thread.sleep(1000);
LOG.info("Waiting for process start-file to be created");
}
Assert.assertTrue("ProcessStartFile doesn't exist!",
processStartFile.exists());
// Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent
SignalContainerRequest signalReq =
SignalContainerRequest.newInstance(cId, command);
List<SignalContainerRequest> reqs = new ArrayList<SignalContainerRequest>();
reqs.add(signalReq);
containerManager.handle(new CMgrSignalContainersEvent(reqs));
final ArgumentCaptor<ContainerSignalContext> signalContextCaptor =
ArgumentCaptor.forClass(ContainerSignalContext.class);
if (signal.equals(Signal.NULL)) {
verify(exec, never()).signalContainer(signalContextCaptor.capture());
} else {
verify(exec, timeout(10000).atLeastOnce()).signalContainer(signalContextCaptor.capture());
ContainerSignalContext signalContext = signalContextCaptor.getAllValues().get(0);
Assert.assertEquals(cId, signalContext.getContainer().getContainerId());
Assert.assertEquals(signal, signalContext.getSignal());
}
}
}

View File

@ -93,10 +93,12 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@ -139,6 +141,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@ -1392,4 +1395,66 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
return response;
}
/**
* Signal a container.
* After the request passes some sanity check, it will be delivered
* to RMNodeImpl so that the next NM heartbeat will pick up the signal request
*/
@Override
public SignalContainerResponse signalContainer(
SignalContainerRequest request) throws YarnException, IOException {
ContainerId containerId = request.getContainerId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
ApplicationId applicationId = containerId.getApplicationAttemptId().
getApplicationId();
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService",
"Trying to signal an absent container", applicationId, containerId);
throw RPCUtil
.getRemoteException("Trying to signal an absent container "
+ containerId);
}
if (!checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.MODIFY_APP, application)) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.SIGNAL_CONTAINER, "User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER, applicationId);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
RMContainer container = scheduler.getRMContainer(containerId);
if (container != null) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeSignalContainerEvent(container.getContainer().getNodeId(),
request));
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.SIGNAL_CONTAINER, "ClientRMService", applicationId,
containerId);
} else {
RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService",
"Trying to signal an absent container", applicationId, containerId);
throw RPCUtil
.getRemoteException("Trying to signal an absent container "
+ containerId);
}
return recordFactory
.newRecordInstance(SignalContainerResponse.class);
}
}

View File

@ -58,6 +58,7 @@ public static class AuditConstants {
"Update Application Priority Request";
public static final String CHANGE_CONTAINER_RESOURCE =
"AM Changed Container Resource";
public static final String SIGNAL_CONTAINER = "Signal Container Request";
// Some commonly used descriptions
public static final String UNAUTHORIZED_USER = "Unauthorized user";

View File

@ -44,6 +44,9 @@ public enum RMNodeEventType {
CLEANUP_CONTAINER,
DECREASE_CONTAINER,
// Source: ClientRMService
SIGNAL_CONTAINER,
// Source: RMAppAttempt
FINISHED_CONTAINERS_PULLED_BY_AM,

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -122,6 +123,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
new ContainerIdComparator());
/* set of containers that need to be signaled */
private final List<SignalContainerRequest> containersToSignal =
new ArrayList<SignalContainerRequest>();
/*
* set of containers to notify NM to remove them from its context. Currently,
* this includes containers that were notified to AM about their completion
@ -194,6 +199,8 @@ RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.DECREASE_CONTAINER,
new DecreaseContainersTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
@ -288,6 +295,8 @@ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
@ -491,8 +500,10 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response
response.addAllApplicationsToCleanup(this.finishedApplications);
response.addContainersToBeRemovedFromNM(
new ArrayList<ContainerId>(this.containersToBeRemovedFromNM));
response.addAllContainersToSignal(this.containersToSignal);
this.containersToClean.clear();
this.finishedApplications.clear();
this.containersToSignal.clear();
this.containersToBeRemovedFromNM.clear();
} finally {
this.writeLock.unlock();
@ -1090,6 +1101,16 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
}
public static class SignalContainerTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.containersToSignal.add(((
RMNodeSignalContainerEvent) event).getSignalRequest());
}
}
@Override
public List<UpdatedContainerInfo> pullContainerUpdates() {
List<UpdatedContainerInfo> latestContainerInfoList =

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
public class RMNodeSignalContainerEvent extends RMNodeEvent {
private SignalContainerRequest signalRequest;
public RMNodeSignalContainerEvent(NodeId nodeId,
SignalContainerRequest signalRequest) {
super(nodeId, RMNodeEventType.SIGNAL_CONTAINER);
this.signalRequest = signalRequest;
}
public SignalContainerRequest getSignalRequest() {
return this.signalRequest;
}
}

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -52,6 +53,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
@ -814,4 +816,12 @@ public void clearQueueMetrics(RMApp app) {
public RMActiveServices getRMActiveService() {
return activeServices;
}
public void signalContainer(ContainerId containerId, SignalContainerCommand command)
throws Exception {
ApplicationClientProtocol client = getClientRMService();
SignalContainerRequest req =
SignalContainerRequest.newInstance(containerId, command);
client.signalContainer(req);
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Test;
public class TestSignalContainer {
private static final Log LOG = LogFactory
.getLog(TestSignalContainer.class);
@Test
public void testSignalRequestDeliveryToNM() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM();
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 5000);
RMApp app = rm.submitApp(2000);
//kick the scheduling
nm1.nodeHeartbeat(true);
RMAppAttempt attempt = app.getCurrentAppAttempt();
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
//request for containers
final int request = 2;
am.allocate("h1" , 1000, request, new ArrayList<ContainerId>());
//kick the scheduler
nm1.nodeHeartbeat(true);
List<Container> conts = null;
int contReceived = 0;
int waitCount = 0;
while (contReceived < request && waitCount++ < 200) {
LOG.info("Got " + contReceived + " containers. Waiting to get "
+ request);
Thread.sleep(100);
conts = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
contReceived += conts.size();
}
Assert.assertEquals(request, contReceived);
for(Container container : conts) {
rm.signalContainer(container.getId(),
SignalContainerCommand.OUTPUT_THREAD_DUMP);
}
NodeHeartbeatResponse resp;
List<SignalContainerRequest> contsToSignal;
int signaledConts = 0;
waitCount = 0;
while ( signaledConts < request && waitCount++ < 200) {
LOG.info("Waiting to get signalcontainer events.. signaledConts: "
+ signaledConts);
resp = nm1.nodeHeartbeat(true);
contsToSignal = resp.getContainersToSignalList();
signaledConts += contsToSignal.size();
Thread.sleep(100);
}
// Verify NM receives the expected number of signal container requests.
Assert.assertEquals(request, signaledConts);
am.unregisterAppAttempt();
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);
rm.stop();
}
}