YARN-11709. NodeManager should be marked unhealthy on localizer config issues (#7043)

This commit is contained in:
Benjamin Teke 2024-09-17 14:24:11 +02:00 committed by GitHub
parent 182feb11a0
commit d1311e52f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 89 additions and 10 deletions

View File

@ -171,9 +171,10 @@ public Path localizeClasspathJar(Path jarPath, Path target, String owner)
* for starting a localizer. * for starting a localizer.
* @throws IOException for most application init failures * @throws IOException for most application init failures
* @throws InterruptedException if application init thread is halted by NM * @throws InterruptedException if application init thread is halted by NM
* @throws ConfigurationException if config error was found
*/ */
public abstract void startLocalizer(LocalizerStartContext ctx) public abstract void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException; throws IOException, InterruptedException, ConfigurationException;
/** /**
* Prepare the container prior to the launch environment being written. * Prepare the container prior to the launch environment being written.

View File

@ -389,7 +389,7 @@ public void stop() {
@Override @Override
public void startLocalizer(LocalizerStartContext ctx) public void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException { throws IOException, InterruptedException, ConfigurationException {
Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens(); Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
InetSocketAddress nmAddr = ctx.getNmAddr(); InetSocketAddress nmAddr = ctx.getNmAddr();
String user = ctx.getUser(); String user = ctx.getUser();
@ -440,9 +440,9 @@ public void startLocalizer(LocalizerStartContext ctx)
localizerArgs = replaceWithContainerLogDir(localizerArgs, containerLogDir); localizerArgs = replaceWithContainerLogDir(localizerArgs, containerLogDir);
initializeContainerOp.appendArgs(localizerArgs); initializeContainerOp.appendArgs(localizerArgs);
Configuration conf = super.getConf();
try { try {
Configuration conf = super.getConf();
PrivilegedOperationExecutor privilegedOperationExecutor = PrivilegedOperationExecutor privilegedOperationExecutor =
getPrivilegedOperationExecutor(); getPrivilegedOperationExecutor();
@ -452,7 +452,26 @@ public void startLocalizer(LocalizerStartContext ctx)
} catch (PrivilegedOperationException e) { } catch (PrivilegedOperationException e) {
int exitCode = e.getExitCode(); int exitCode = e.getExitCode();
LOG.warn("Exit code from container {} startLocalizer is : {}", LOG.warn("Exit code from container {} startLocalizer is : {}",
locId, exitCode, e); locId, exitCode, e);
if (exitCode ==
ExitCode.INVALID_CONTAINER_EXEC_PERMISSIONS.getExitCode() ||
exitCode == ExitCode.INVALID_CONFIG_FILE.getExitCode()) {
throw new ConfigurationException("Application " + appId + " initialization failed" +
" (exitCode=" + exitCode + ") with an unrecoverable config error. " +
"Output: " + e.getOutput(), e);
}
// Check if the failure was due to a missing container-executor binary
Throwable cause = e.getCause() != null ? e.getCause() : e;
if (cause instanceof IOException) {
IOException io = (IOException) cause;
if (io.getMessage().contains("No such file or directory")) {
throw new ConfigurationException("Application " + appId + " initialization failed" +
"(exitCode=" + exitCode + "). Container executor not found at "
+ getContainerExecutorExecutablePath(conf), e);
}
}
throw new IOException("Application " + appId + " initialization failed" + throw new IOException("Application " + appId + " initialization failed" +
" (exitCode=" + exitCode + ") with output: " + e.getOutput(), e); " (exitCode=" + exitCode + ") with output: " + e.getOutput(), e);

View File

@ -20,6 +20,7 @@
import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -1255,7 +1256,7 @@ public void run() {
try { try {
// Get nmPrivateDir // Get nmPrivateDir
nmPrivateCTokensPath = dirsHandler.getLocalPathForWrite( nmPrivateCTokensPath = dirsHandler.getLocalPathForWrite(
NM_PRIVATE_DIR + Path.SEPARATOR + tokenFileName); NM_PRIVATE_DIR + Path.SEPARATOR + tokenFileName);
// 0) init queue, etc. // 0) init queue, etc.
// 1) write credentials to private dir // 1) write credentials to private dir
@ -1275,10 +1276,13 @@ public void run() {
throw new IOException("All disks failed. " throw new IOException("All disks failed. "
+ dirsHandler.getDisksHealthReport(false)); + dirsHandler.getDisksHealthReport(false));
} }
// TODO handle ExitCodeException separately? // TODO handle ExitCodeException separately?
} catch (FSError fe) { } catch (ConfigurationException e) {
exception = fe; exception = e;
} catch (Exception e) { LOG.error("Failed to launch localizer for {}, due to configuration error. " +
"Marking the node unhealthy.", localizerId, e);
nmContext.getNodeStatusUpdater().reportException(e);
} catch (Exception | FSError e) {
exception = e; exception = e;
} finally { } finally {
if (exception != null) { if (exception != null) {

View File

@ -336,7 +336,7 @@ public void testStartLocalizer() throws IOException {
assertThat(result.get(23)).isEqualTo("8040"); assertThat(result.get(23)).isEqualTo("8040");
assertThat(result.get(24)).isEqualTo("nmPrivateCTokensPath"); assertThat(result.get(24)).isEqualTo("nmPrivateCTokensPath");
} catch (InterruptedException e) { } catch (ConfigurationException | InterruptedException e) {
LOG.error("Error:"+e.getMessage(),e); LOG.error("Error:"+e.getMessage(),e);
Assert.fail(); Assert.fail();
} }
@ -643,6 +643,61 @@ protected PrivilegedOperationExecutor getPrivilegedOperationExecutor() {
e.getMessage().contains("exitCode")); e.getMessage().contains("exitCode"));
} }
final int[] exitCodesToThrow = {
LinuxContainerExecutor.ExitCode.INVALID_CONTAINER_EXEC_PERMISSIONS.getExitCode(),
LinuxContainerExecutor.ExitCode.INVALID_CONFIG_FILE.getExitCode(),
};
for (int i = 0; i < exitCodesToThrow.length; i++) {
int exitCode = exitCodesToThrow[i];
doThrow(new PrivilegedOperationException("invalid config", exitCode, null, null))
.when(spyPrivilegedExecutor).executePrivilegedOperation(
any(), any(PrivilegedOperation.class),
any(), any(), anyBoolean(), anyBoolean());
try {
lce.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateCTokensPath)
.setNmAddr(address)
.setUser(appSubmitter)
.setAppId(appId.toString())
.setLocId("12345")
.setDirsHandler(dirService)
.build());
Assert.fail("startLocalizer should have thrown a ConfigurationException");
} catch (ConfigurationException e) {
assertTrue("Unexpected exception " + e,
e.getMessage().contains("exitCode=" + exitCode));
}
}
doThrow(new PrivilegedOperationException("IO error",
new IOException("No such file or directory")))
.when(spyPrivilegedExecutor).executePrivilegedOperation(
any(), any(PrivilegedOperation.class),
any(), any(), anyBoolean(), anyBoolean());
try {
lce.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateCTokensPath)
.setNmAddr(address)
.setUser(appSubmitter)
.setAppId(appId.toString())
.setLocId("12345")
.setDirsHandler(dirService)
.build());
Assert.fail("startLocalizer should have thrown a ConfigurationException");
} catch (ConfigurationException e) {
assertTrue("Unexpected exception " + e,
e.getMessage().contains("Container executor not found"));
}
doThrow(new PrivilegedOperationException("interrupted"))
.when(spyPrivilegedExecutor).executePrivilegedOperation(
any(), any(PrivilegedOperation.class),
any(), any(), anyBoolean(), anyBoolean());
lce.activateContainer(cid, new Path(workDir, "pid.txt")); lce.activateContainer(cid, new Path(workDir, "pid.txt"));
lce.launchContainer(new ContainerStartContext.Builder() lce.launchContainer(new ContainerStartContext.Builder()
.setContainer(container) .setContainer(container)