MAPREDUCE-5044. Have AM trigger jstack on task attempts that timeout before killing them. (Eric Payne and Gera Shegalov via mingma)

This commit is contained in:
Ming Ma 2016-06-06 14:30:51 -07:00
parent 35f255b03b
commit 4a1cedc010
34 changed files with 361 additions and 42 deletions

View File

@ -20,6 +20,10 @@
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -255,6 +259,30 @@ public void run() {
} else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
if (event.getDumpContainerThreads()) {
try {
// Construct full thread dump header
System.out.println(new java.util.Date());
RuntimeMXBean rtBean = ManagementFactory.getRuntimeMXBean();
System.out.println("Full thread dump " + rtBean.getVmName()
+ " (" + rtBean.getVmVersion()
+ " " + rtBean.getSystemProperties().get("java.vm.info")
+ "):\n");
// Dump threads' states and stacks
ThreadMXBean tmxBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] tInfos = tmxBean.dumpAllThreads(
tmxBean.isObjectMonitorUsageSupported(),
tmxBean.isSynchronizerUsageSupported());
for (ThreadInfo ti : tInfos) {
System.out.println(ti.toString());
}
} catch (Throwable t) {
// Failure to dump stack shouldn't cause method failure.
System.out.println("Could not create full thread dump: "
+ t.getMessage());
}
}
// cancel (and interrupt) the current running task associated with the
// event
TaskAttemptId taId = event.getTaskAttemptID();

View File

@ -2115,7 +2115,7 @@ public void transition(TaskAttemptImpl taskAttempt,
taskAttempt.attemptId,
taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(),
taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP, false));
taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
taskAttempt.attemptId, false));
@ -2179,7 +2179,8 @@ private static void sendContainerCleanup(TaskAttemptImpl taskAttempt,
taskAttempt.container.getId(), StringInterner
.weakIntern(taskAttempt.container.getNodeId().toString()),
taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP,
event.getType() == TaskAttemptEventType.TA_TIMED_OUT));
}
/**

View File

@ -30,17 +30,29 @@ public class ContainerLauncherEvent
private ContainerId containerID;
private String containerMgrAddress;
private Token containerToken;
private boolean dumpContainerThreads;
public ContainerLauncherEvent(TaskAttemptId taskAttemptID,
ContainerId containerID,
String containerMgrAddress,
Token containerToken,
ContainerLauncher.EventType type) {
this(taskAttemptID, containerID, containerMgrAddress, containerToken, type,
false);
}
public ContainerLauncherEvent(TaskAttemptId taskAttemptID,
ContainerId containerID,
String containerMgrAddress,
Token containerToken,
ContainerLauncher.EventType type,
boolean dumpContainerThreads) {
super(type);
this.taskAttemptID = taskAttemptID;
this.containerID = containerID;
this.containerMgrAddress = containerMgrAddress;
this.containerToken = containerToken;
this.dumpContainerThreads = dumpContainerThreads;
}
public TaskAttemptId getTaskAttemptID() {
@ -59,6 +71,10 @@ public Token getContainerToken() {
return containerToken;
}
public boolean getDumpContainerThreads() {
return dumpContainerThreads;
}
@Override
public String toString() {
return super.toString() + " for container " + containerID + " taskAttempt "
@ -77,6 +93,8 @@ public int hashCode() {
+ ((containerToken == null) ? 0 : containerToken.hashCode());
result = prime * result
+ ((taskAttemptID == null) ? 0 : taskAttemptID.hashCode());
result = prime * result
+ (dumpContainerThreads ? 1 : 0);
return result;
}
@ -109,7 +127,8 @@ public boolean equals(Object obj) {
return false;
} else if (!taskAttemptID.equals(other.taskAttemptID))
return false;
return true;
return dumpContainerThreads == other.dumpContainerThreads;
}
}

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -53,6 +54,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -192,8 +194,12 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
}
}
public void kill() {
kill(false);
}
@SuppressWarnings("unchecked")
public synchronized void kill() {
public synchronized void kill(boolean dumpThreads) {
if(this.state == ContainerState.PREP) {
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
@ -204,6 +210,13 @@ public synchronized void kill() {
try {
proxy = getCMProxy(this.containerMgrAddress, this.containerID);
if (dumpThreads) {
final SignalContainerRequest request = SignalContainerRequest
.newInstance(containerID,
SignalContainerCommand.OUTPUT_THREAD_DUMP);
proxy.getContainerManagementProtocol().signalToContainer(request);
}
// kill the remote container if already launched
List<ContainerId> ids = new ArrayList<ContainerId>();
ids.add(this.containerID);
@ -381,7 +394,7 @@ public void run() {
break;
case CONTAINER_REMOTE_CLEANUP:
c.kill();
c.kill(event.getDumpContainerThreads());
break;
case CONTAINER_COMPLETED:

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -58,6 +57,8 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -73,6 +74,7 @@
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
@ -460,5 +462,11 @@ public IncreaseContainersResourceResponse increaseContainersResource(
"Dummy function cause"));
throw new IOException(e);
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
return null;
}
}
}

View File

@ -50,6 +50,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@ -465,6 +467,12 @@ public IncreaseContainersResourceResponse increaseContainersResource(
@Override
public void close() throws IOException {
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
return null;
}
}
@SuppressWarnings("serial")

View File

@ -496,8 +496,9 @@ public Priority updateApplicationPriority(ApplicationId applicationId,
}
@Override
public void signalContainer(ContainerId containerId, SignalContainerCommand command)
public void signalToContainer(ContainerId containerId,
SignalContainerCommand command)
throws YarnException, IOException {
client.signalContainer(containerId, command);
client.signalToContainer(containerId, command);
}
}

View File

@ -481,7 +481,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
}
@Override
public SignalContainerResponse signalContainer(
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws IOException {
return null;
}

View File

@ -24,6 +24,7 @@
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
@ -980,6 +981,124 @@ public void testDistributedCache() throws Exception {
_testDistributedCache(remoteJobJarPath.toUri().toString());
}
@Test(timeout = 120000)
public void testThreadDumpOnTaskTimeout() throws IOException,
InterruptedException, ClassNotFoundException {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
final SleepJob sleepJob = new SleepJob();
final JobConf sleepConf = new JobConf(mrCluster.getConfig());
sleepConf.setLong(MRJobConfig.TASK_TIMEOUT, 3 * 1000L);
sleepConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
sleepJob.setConf(sleepConf);
if (this instanceof TestUberAM) {
sleepConf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS,
30 * 1000);
}
// sleep for 10 seconds to trigger a kill with thread dump
final Job job = sleepJob.createJob(1, 0, 10 * 60 * 1000L, 1, 0L, 0);
job.setJarByClass(SleepJob.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.waitForCompletion(true);
final JobId jobId = TypeConverter.toYarn(job.getJobID());
final ApplicationId appID = jobId.getAppId();
int pollElapsed = 0;
while (true) {
Thread.sleep(1000);
pollElapsed += 1000;
if (TERMINAL_RM_APP_STATES.contains(mrCluster.getResourceManager()
.getRMContext().getRMApps().get(appID).getState())) {
break;
}
if (pollElapsed >= 60000) {
LOG.warn("application did not reach terminal state within 60 seconds");
break;
}
}
// Job finished, verify logs
//
final String appIdStr = appID.toString();
final String appIdSuffix = appIdStr.substring("application_".length(),
appIdStr.length());
final String containerGlob = "container_" + appIdSuffix + "_*_*";
final String syslogGlob = appIdStr
+ Path.SEPARATOR + containerGlob
+ Path.SEPARATOR + TaskLog.LogName.SYSLOG;
int numAppMasters = 0;
int numMapTasks = 0;
for (int i = 0; i < NUM_NODE_MGRS; i++) {
final Configuration nmConf = mrCluster.getNodeManager(i).getConfig();
for (String logDir :
nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)) {
final Path absSyslogGlob =
new Path(logDir + Path.SEPARATOR + syslogGlob);
LOG.info("Checking for glob: " + absSyslogGlob);
for (FileStatus syslog : localFs.globStatus(absSyslogGlob)) {
boolean foundAppMaster = false;
boolean foundThreadDump = false;
// Determine the container type
final BufferedReader syslogReader = new BufferedReader(
new InputStreamReader(localFs.open(syslog.getPath())));
try {
for (String line; (line = syslogReader.readLine()) != null; ) {
if (line.contains(MRAppMaster.class.getName())) {
foundAppMaster = true;
break;
}
}
} finally {
syslogReader.close();
}
// Check for thread dump in stdout
final Path stdoutPath = new Path(syslog.getPath().getParent(),
TaskLog.LogName.STDOUT.toString());
final BufferedReader stdoutReader = new BufferedReader(
new InputStreamReader(localFs.open(stdoutPath)));
try {
for (String line; (line = stdoutReader.readLine()) != null; ) {
if (line.contains("Full thread dump")) {
foundThreadDump = true;
break;
}
}
} finally {
stdoutReader.close();
}
if (foundAppMaster) {
numAppMasters++;
if (this instanceof TestUberAM) {
Assert.assertTrue("No thread dump", foundThreadDump);
} else {
Assert.assertFalse("Unexpected thread dump", foundThreadDump);
}
} else {
numMapTasks++;
Assert.assertTrue("No thread dump", foundThreadDump);
}
}
}
}
// Make sure we checked non-empty set
//
Assert.assertEquals("No AppMaster log found!", 1, numAppMasters);
if (sleepConf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false)) {
Assert.assertSame("MapTask log with uber found!", 0, numMapTasks);
} else {
Assert.assertSame("No MapTask log found!", 1, numMapTasks);
}
}
private Path createTempFile(String filename, String contents)
throws IOException {
Path path = new Path(TEST_ROOT_DIR, filename);

View File

@ -563,7 +563,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
*/
@Public
@Unstable
public SignalContainerResponse signalContainer(
SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException,
IOException;
}

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -194,4 +196,7 @@ GetContainerStatusesResponse getContainerStatuses(
IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException,
IOException;
SignalContainerResponse signalToContainer(SignalContainerRequest request)
throws YarnException, IOException;
}

View File

@ -28,7 +28,7 @@
*
* <p>Currently it's empty.</p>
*
* @see ApplicationClientProtocol#signalContainer(SignalContainerRequest)
* @see ApplicationClientProtocol#signalToContainer(SignalContainerRequest)
*/
@Public
@Evolving

View File

@ -59,5 +59,5 @@ service ApplicationClientProtocolService {
rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto);
rpc signalContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
}

View File

@ -35,4 +35,5 @@ service ContainerManagementProtocolService {
rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto);
rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto);
rpc increaseContainersResource(IncreaseContainersResourceRequestProto) returns (IncreaseContainersResourceResponseProto);
rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
}

View File

@ -777,6 +777,6 @@ public abstract Priority updateApplicationPriority(
* @throws YarnException
* @throws IOException
*/
public abstract void signalContainer(ContainerId containerId,
public abstract void signalToContainer(ContainerId containerId,
SignalContainerCommand command) throws YarnException, IOException;
}

View File

@ -858,12 +858,12 @@ public Priority updateApplicationPriority(ApplicationId applicationId,
}
@Override
public void signalContainer(ContainerId containerId,
public void signalToContainer(ContainerId containerId,
SignalContainerCommand command)
throws YarnException, IOException {
LOG.info("Signalling container " + containerId + " with command " + command);
SignalContainerRequest request =
SignalContainerRequest.newInstance(containerId, command);
rmClient.signalContainer(request);
rmClient.signalToContainer(request);
}
}

View File

@ -284,7 +284,7 @@ public int run(String[] args) throws Exception {
if (signalArgs.length == 2) {
command = SignalContainerCommand.valueOf(signalArgs[1]);
}
signalContainer(containerId, command);
signalToContainer(containerId, command);
} else {
syserr.println("Invalid Command Usage : ");
printUsage(title, opts);
@ -299,11 +299,11 @@ public int run(String[] args) throws Exception {
* @param command the signal command
* @throws YarnException
*/
private void signalContainer(String containerIdStr,
private void signalToContainer(String containerIdStr,
SignalContainerCommand command) throws YarnException, IOException {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
sysout.println("Signalling container " + containerIdStr);
client.signalContainer(containerId, command);
client.signalToContainer(containerId, command);
}
/**

View File

@ -1689,11 +1689,11 @@ public void testSignalContainer() throws Exception {
applicationId, 1);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
SignalContainerCommand command = SignalContainerCommand.OUTPUT_THREAD_DUMP;
client.signalContainer(containerId, command);
client.signalToContainer(containerId, command);
final ArgumentCaptor<SignalContainerRequest> signalReqCaptor =
ArgumentCaptor.forClass(SignalContainerRequest.class);
verify(((MockYarnClient) client).getRMClient())
.signalContainer(signalReqCaptor.capture());
.signalToContainer(signalReqCaptor.capture());
SignalContainerRequest request = signalReqCaptor.getValue();
Assert.assertEquals(containerId, request.getContainerId());
Assert.assertEquals(command, request.getCommand());

View File

@ -21,6 +21,11 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.ContainerManagementProtocol.ContainerManagementProtocolService;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@Private
@Unstable
@ -29,4 +34,6 @@
protocolVersion = 1)
public interface ContainerManagementProtocolPB extends ContainerManagementProtocolService.BlockingInterface {
SignalContainerResponseProto signalToContainer(RpcController arg0,
SignalContainerRequestProto proto) throws ServiceException;
}

View File

@ -588,13 +588,13 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
}
@Override
public SignalContainerResponse signalContainer(
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
YarnServiceProtos.SignalContainerRequestProto requestProto =
((SignalContainerRequestPBImpl) request).getProto();
try {
return new SignalContainerResponsePBImpl(
proxy.signalContainer(null, requestProto));
proxy.signalToContainer(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;

View File

@ -34,6 +34,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@ -42,6 +44,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
@ -50,6 +54,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
@ -148,4 +153,18 @@ public IncreaseContainersResourceResponse increaseContainersResource(
return null;
}
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
SignalContainerRequestProto requestProto =
((SignalContainerRequestPBImpl) request).getProto();
try {
return new SignalContainerResponsePBImpl(
proxy.signalToContainer(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
}

View File

@ -596,11 +596,12 @@ public UpdateApplicationPriorityResponseProto updateApplicationPriority(
}
@Override
public SignalContainerResponseProto signalContainer(RpcController controller,
public SignalContainerResponseProto signalToContainer(
RpcController controller,
YarnServiceProtos.SignalContainerRequestProto proto) throws ServiceException {
SignalContainerRequestPBImpl request = new SignalContainerRequestPBImpl(proto);
try {
SignalContainerResponse response = real.signalContainer(request);
SignalContainerResponse response = real.signalToContainer(request);
return ((SignalContainerResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);

View File

@ -25,12 +25,15 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
@ -40,6 +43,8 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
@ -116,4 +121,19 @@ public IncreaseContainersResourceResponseProto increaseContainersResource(
throw new ServiceException(e);
}
}
@Override
public SignalContainerResponseProto signalToContainer(RpcController arg0,
SignalContainerRequestProto proto) throws ServiceException {
final SignalContainerRequestPBImpl request =
new SignalContainerRequestPBImpl(proto);
try {
final SignalContainerResponse response = real.signalToContainer(request);
return ((SignalContainerResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -35,6 +35,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -174,5 +176,13 @@ public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException, IOException {
return null;
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
final Exception e = new Exception("Dummy function", new Exception(
"Dummy function cause"));
throw new YarnException(e);
}
}
}

View File

@ -29,6 +29,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@ -158,5 +160,11 @@ public IncreaseContainersResourceResponse increaseContainersResource(
}
throw new YarnException("Shouldn't happen!!");
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
return null;
}
}
}

View File

@ -38,6 +38,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -227,6 +229,14 @@ public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException, IOException {
return null;
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException {
final Exception e = new Exception(EXCEPTION_MSG,
new Exception(EXCEPTION_CAUSE));
throw new YarnException(e);
}
}
public static ContainerTokenIdentifier newContainerTokenIdentifier(

View File

@ -60,11 +60,13 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@ -147,6 +149,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerManagerImpl extends CompositeService implements
@ -1380,16 +1383,7 @@ public void handle(ContainerManagerEvent event) {
(CMgrSignalContainersEvent) event;
for (SignalContainerRequest request : containersSignalEvent
.getContainersToSignal()) {
ContainerId containerId = request.getContainerId();
Container container = this.context.getContainers().get(containerId);
if (container != null) {
LOG.info(containerId + " signal request by ResourceManager.");
this.dispatcher.getEventHandler().handle(
new SignalContainersLauncherEvent(container,
request.getCommand()));
} else {
LOG.info("Container " + containerId + " no longer exists");
}
internalSignalToContainer(request, "ResourceManager");
}
break;
default:
@ -1440,4 +1434,28 @@ protected boolean isServiceStopped() {
public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) {
LOG.trace("Implementation does not support queuing of Containers !!");
}
@SuppressWarnings("unchecked")
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
internalSignalToContainer(request, "Application Master");
return new SignalContainerResponsePBImpl();
}
@SuppressWarnings("unchecked")
private void internalSignalToContainer(SignalContainerRequest request,
String sentBy) {
ContainerId containerId = request.getContainerId();
Container container = this.context.getContainers().get(containerId);
if (container != null) {
LOG.info(containerId + " signal request " + request.getCommand()
+ " by " + sentBy);
this.dispatcher.getEventHandler().handle(
new SignalContainersLauncherEvent(container,
request.getCommand()));
} else {
LOG.info("Container " + containerId + " no longer exists");
}
}
}

View File

@ -486,7 +486,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
}
@Override
public SignalContainerResponse signalContainer(
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws IOException {
return null;
}

View File

@ -1635,7 +1635,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
*/
@SuppressWarnings("unchecked")
@Override
public SignalContainerResponse signalContainer(
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
ContainerId containerId = request.getContainerId();

View File

@ -1005,11 +1005,11 @@ public RMActiveServices getRMActiveService() {
return activeServices;
}
public void signalContainer(ContainerId containerId, SignalContainerCommand command)
throws Exception {
public void signalToContainer(ContainerId containerId,
SignalContainerCommand command) throws Exception {
ApplicationClientProtocol client = getClientRMService();
SignalContainerRequest req =
SignalContainerRequest.newInstance(containerId, command);
client.signalContainer(req);
client.signalToContainer(req);
}
}

View File

@ -28,13 +28,14 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -317,4 +318,10 @@ public IncreaseContainersResourceResponse increaseContainersResource(
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
return nodeStatus;
}
@Override
public synchronized SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
throw new YarnException("Not supported yet!");
}
}

View File

@ -46,6 +46,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@ -138,6 +140,12 @@ public Credentials getContainerCredentials() throws IOException {
credentials.readTokenStorageStream(buf);
return credentials;
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
return null;
}
}
public static class MockRMWithAMS extends MockRMWithCustomAMLauncher {

View File

@ -37,6 +37,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -148,6 +150,12 @@ public IncreaseContainersResourceResponse increaseContainersResource(
throws YarnException {
return null;
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
return null;
}
}
@Test

View File

@ -83,7 +83,7 @@ public void testSignalRequestDeliveryToNM() throws Exception {
Assert.assertEquals(request, contReceived);
for(Container container : conts) {
rm.signalContainer(container.getId(),
rm.signalToContainer(container.getId(),
SignalContainerCommand.OUTPUT_THREAD_DUMP);
}