YARN-11534. Fixed exception handling when container signalling is interrupted (#5864)
This commit is contained in:
parent
130bd033d5
commit
4d4b099309
@ -48,6 +48,7 @@
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@ -787,9 +788,18 @@ public boolean signalContainer(ContainerSignalContext ctx)
|
||||
LOG.warn("Error in signalling container {} with {}; exit = {}",
|
||||
pid, signal, retCode, e);
|
||||
logOutput(e.getOutput());
|
||||
throw new IOException("Problem signalling container " + pid + " with "
|
||||
+ signal + "; output: " + e.getOutput() + " and exitCode: "
|
||||
+ retCode, e);
|
||||
|
||||
// In ContainerExecutionException -1 is the default value for the exit code.
|
||||
// If it remained unset, we can treat the signalling as interrupted.
|
||||
if (retCode == ContainerExecutionException.getDefaultExitCode()) {
|
||||
throw new InterruptedIOException("Signalling container " + pid + " with "
|
||||
+ signal + " is interrupted; output: " + e.getOutput() + " and exitCode: "
|
||||
+ retCode);
|
||||
} else {
|
||||
throw new IOException("Problem signalling container " + pid + " with "
|
||||
+ signal + "; output: " + e.getOutput() + " and exitCode: "
|
||||
+ retCode, e);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ public Integer call() {
|
||||
|
||||
dispatcher.getEventHandler().handle(new ContainerEvent(containerId,
|
||||
ContainerEventType.RECOVER_PAUSED_CONTAINER));
|
||||
boolean notInterrupted = true;
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
File pidFile = locatePidFile(appIdStr, containerIdStr);
|
||||
if (pidFile != null) {
|
||||
@ -87,11 +87,11 @@ public Integer call() {
|
||||
|
||||
} catch (InterruptedException | InterruptedIOException e) {
|
||||
LOG.warn("Interrupted while waiting for exit code from " + containerId);
|
||||
notInterrupted = false;
|
||||
interrupted = true;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to kill the paused container " + containerIdStr, e);
|
||||
} finally {
|
||||
if (notInterrupted) {
|
||||
if (!interrupted) {
|
||||
this.completed.set(true);
|
||||
exec.deactivateContainer(containerId);
|
||||
try {
|
||||
|
@ -74,7 +74,7 @@ public Integer call() {
|
||||
dispatcher.getEventHandler().handle(new ContainerEvent(containerId,
|
||||
ContainerEventType.CONTAINER_LAUNCHED));
|
||||
|
||||
boolean notInterrupted = true;
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
File pidFile = locatePidFile(appIdStr, containerIdStr);
|
||||
if (pidFile != null) {
|
||||
@ -92,11 +92,11 @@ public Integer call() {
|
||||
}
|
||||
} catch (InterruptedException | InterruptedIOException e) {
|
||||
LOG.warn("Interrupted while waiting for exit code from " + containerId);
|
||||
notInterrupted = false;
|
||||
interrupted = true;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to recover container " + containerIdStr, e);
|
||||
} finally {
|
||||
if (notInterrupted) {
|
||||
if (!interrupted) {
|
||||
this.completed.set(true);
|
||||
exec.deactivateContainer(containerId);
|
||||
try {
|
||||
|
@ -88,4 +88,8 @@ public String getErrorOutput() {
|
||||
return errorOutput;
|
||||
}
|
||||
|
||||
public static int getDefaultExitCode() {
|
||||
return EXIT_CODE_UNSET;
|
||||
}
|
||||
|
||||
}
|
@ -25,7 +25,10 @@
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
@ -33,6 +36,8 @@
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntime;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
|
||||
import org.slf4j.Logger;
|
||||
@ -41,6 +46,7 @@
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
@ -725,6 +731,44 @@ public void testGetLocalResources() throws Exception {
|
||||
verify(lce, times(1)).getLocalResources(container);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSignalContainerFailureWhenExitCodeIsPresentInTheException()
|
||||
throws ContainerExecutionException {
|
||||
LinuxContainerRuntime containerRuntime = mock(LinuxContainerRuntime.class);
|
||||
LinuxContainerExecutor containerExecutor = spy(new LinuxContainerExecutor(
|
||||
containerRuntime));
|
||||
ContainerSignalContext signalContext = new ContainerSignalContext.Builder().build();
|
||||
ContainerExecutionException testException =
|
||||
new ContainerExecutionException("exceptionWithExitCode", 123);
|
||||
|
||||
doNothing().when(containerExecutor).verifyUsernamePattern(any());
|
||||
doThrow(testException)
|
||||
.when(containerRuntime)
|
||||
.signalContainer(any(ContainerRuntimeContext.class));
|
||||
|
||||
assertThrows(IOException.class,
|
||||
() -> containerExecutor.signalContainer(signalContext));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSignalContainerFailureWhenExitCodeIsNotPresentInTheException()
|
||||
throws ContainerExecutionException {
|
||||
LinuxContainerRuntime containerRuntime = mock(LinuxContainerRuntime.class);
|
||||
LinuxContainerExecutor containerExecutor = spy(new LinuxContainerExecutor(
|
||||
containerRuntime));
|
||||
ContainerSignalContext signalContext = new ContainerSignalContext.Builder().build();
|
||||
ContainerExecutionException testException =
|
||||
new ContainerExecutionException("exceptionWithoutExitCode");
|
||||
|
||||
doNothing().when(containerExecutor).verifyUsernamePattern(any());
|
||||
doThrow(testException)
|
||||
.when(containerRuntime)
|
||||
.signalContainer(any(ContainerRuntimeContext.class));
|
||||
|
||||
assertThrows(InterruptedIOException.class,
|
||||
() -> containerExecutor.signalContainer(signalContext));
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
private static class TestResourceHandler implements LCEResourcesHandler {
|
||||
static Set<ContainerId> postExecContainers = new HashSet<ContainerId>();
|
||||
|
Loading…
Reference in New Issue
Block a user