YARN-10536. Client in distributedShell swallows interrupt exceptions (#2554)

(cherry picked from commit 7a88f45366)
This commit is contained in:
Ahmed Hussein 2020-12-17 18:13:28 -05:00 committed by Masatake Iwasaki
parent 35740a25d9
commit a40eabd717
2 changed files with 92 additions and 27 deletions

View File

@ -144,6 +144,9 @@ public class Client {
private static final int DEFAULT_CONTAINER_MEMORY = 10; private static final int DEFAULT_CONTAINER_MEMORY = 10;
private static final int DEFAULT_CONTAINER_VCORES = 1; private static final int DEFAULT_CONTAINER_VCORES = 1;
// check the application once per second.
private static final int APP_MONITOR_INTERVAL = 1000;
// Configuration // Configuration
private Configuration conf; private Configuration conf;
private YarnClient yarnClient; private YarnClient yarnClient;
@ -209,7 +212,7 @@ public class Client {
private String rollingFilesPattern = ""; private String rollingFilesPattern = "";
// Start time for client // Start time for client
private final long clientStartTime = System.currentTimeMillis(); private long clientStartTime = System.currentTimeMillis();
// Timeout threshold for client. Kill app after time interval expires. // Timeout threshold for client. Kill app after time interval expires.
private long clientTimeout = 600000; private long clientTimeout = 600000;
@ -670,6 +673,8 @@ public boolean run() throws IOException, YarnException {
LOG.info("Running Client"); LOG.info("Running Client");
yarnClient.start(); yarnClient.start();
// set the client start time.
clientStartTime = System.currentTimeMillis();
YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
LOG.info("Got Cluster metric info from ASM" LOG.info("Got Cluster metric info from ASM"
@ -983,7 +988,6 @@ public boolean run() throws IOException, YarnException {
if (keepContainers) { if (keepContainers) {
vargs.add("--keep_containers_across_application_attempts"); vargs.add("--keep_containers_across_application_attempts");
} }
for (Map.Entry<String, String> entry : shellEnv.entrySet()) { for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
} }
@ -1110,13 +1114,17 @@ void specifyLogAggregationContext(ApplicationSubmissionContext appContext) {
private boolean monitorApplication(ApplicationId appId) private boolean monitorApplication(ApplicationId appId)
throws YarnException, IOException { throws YarnException, IOException {
boolean res = false;
boolean needForceKill = false;
while (true) { while (true) {
// Check app status every 1 second. // Check app status every 1 second.
try { try {
Thread.sleep(1000); Thread.sleep(APP_MONITOR_INTERVAL);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.debug("Thread sleep in monitoring loop interrupted"); LOG.warn("Thread sleep in monitoring loop interrupted");
// if the application is to be killed when client times out;
// then set needForceKill to true
break;
} }
// Get application report for the appId we are interested in // Get application report for the appId we are interested in
@ -1139,22 +1147,20 @@ private boolean monitorApplication(ApplicationId appId)
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
if (YarnApplicationState.FINISHED == state) { if (YarnApplicationState.FINISHED == state) {
if (FinalApplicationStatus.SUCCEEDED == dsStatus) { if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
LOG.info("Application has completed successfully. Breaking monitoring loop"); LOG.info("Application has completed successfully. "
return true; + "Breaking monitoring loop");
} res = true;
else { } else {
LOG.info("Application did finished unsuccessfully. " LOG.info("Application did finished unsuccessfully. "
+ " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() + "YarnState={}, DSFinalStatus={}. Breaking monitoring loop",
+ ". Breaking monitoring loop"); state, dsStatus);
return false;
} }
} break;
else if (YarnApplicationState.KILLED == state } else if (YarnApplicationState.KILLED == state
|| YarnApplicationState.FAILED == state) { || YarnApplicationState.FAILED == state) {
LOG.info("Application did not finish." LOG.info("Application did not finish. YarnState={}, DSFinalStatus={}. "
+ " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() + "Breaking monitoring loop", state, dsStatus);
+ ". Breaking monitoring loop"); break;
return false;
} }
// The value equal or less than 0 means no timeout // The value equal or less than 0 means no timeout
@ -1162,11 +1168,16 @@ else if (YarnApplicationState.KILLED == state
&& System.currentTimeMillis() > (clientStartTime + clientTimeout)) { && System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
LOG.info("Reached client specified timeout for application. " + LOG.info("Reached client specified timeout for application. " +
"Killing application"); "Killing application");
forceKillApplication(appId); needForceKill = true;
return false; break;
} }
} }
if (needForceKill) {
forceKillApplication(appId);
}
return res;
} }
/** /**

View File

@ -107,6 +107,7 @@
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -139,6 +140,13 @@ public class TestDistributedShell {
@Rule @Rule
public TemporaryFolder tmpFolder = new TemporaryFolder(); public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule
public TestName name = new TestName();
private String generateAppName() {
return name.getMethodName().replaceFirst("test", "");
}
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion()); setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion());
@ -737,6 +745,8 @@ protected String getSleepCommand(int sec) {
@Test @Test
public void testDSRestartWithPreviousRunningContainers() throws Exception { public void testDSRestartWithPreviousRunningContainers() throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -772,6 +782,8 @@ public void testDSRestartWithPreviousRunningContainers() throws Exception {
@Test @Test
public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { public void testDSAttemptFailuresValidityIntervalSucess() throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -810,6 +822,8 @@ public void testDSAttemptFailuresValidityIntervalSucess() throws Exception {
@Test @Test
public void testDSAttemptFailuresValidityIntervalFailed() throws Exception { public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -857,6 +871,8 @@ public void testDSShellWithCustomLogPropertyFile() throws Exception {
fileWriter.write("log4j.rootLogger=debug,stdout"); fileWriter.write("log4j.rootLogger=debug,stdout");
fileWriter.close(); fileWriter.close();
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -906,6 +922,8 @@ public void testDSShellWithCustomLogPropertyFile() throws Exception {
public void testSpecifyingLogAggregationContext() throws Exception { public void testSpecifyingLogAggregationContext() throws Exception {
String regex = ".*(foo|bar)\\d"; String regex = ".*(foo|bar)\\d";
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--shell_command", "--shell_command",
@ -928,6 +946,8 @@ public void testSpecifyingLogAggregationContext() throws Exception {
public void testDSShellWithCommands() throws Exception { public void testDSShellWithCommands() throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -960,6 +980,8 @@ public void testDSShellWithCommands() throws Exception {
@Test @Test
public void testDSShellWithMultipleArgs() throws Exception { public void testDSShellWithMultipleArgs() throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1010,6 +1032,8 @@ public void testDSShellWithShellScript() throws Exception {
fileWriter.close(); fileWriter.close();
System.out.println(customShellScript.getAbsolutePath()); System.out.println(customShellScript.getAbsolutePath());
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1055,6 +1079,8 @@ public void testDSShellWithInvalidArgs() throws Exception {
LOG.info("Initializing DS Client with no jar file"); LOG.info("Initializing DS Client with no jar file");
try { try {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--num_containers", "--num_containers",
"2", "2",
"--shell_command", "--shell_command",
@ -1263,6 +1289,8 @@ protected void waitForNMsToRegister() throws Exception {
@Test @Test
public void testContainerLaunchFailureHandling() throws Exception { public void testContainerLaunchFailureHandling() throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1291,6 +1319,8 @@ public void testContainerLaunchFailureHandling() throws Exception {
@Test @Test
public void testDebugFlag() throws Exception { public void testDebugFlag() throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1388,14 +1418,18 @@ private int verifyContainerLog(int containerNum,
@Test @Test
public void testDistributedShellResourceProfiles() throws Exception { public void testDistributedShellResourceProfiles() throws Exception {
String appName = generateAppName();
String[][] args = { String[][] args = {
{"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", {"--appname", appName + "-0", "--jar", APPMASTER_JAR,
"--num_containers", "1", "--shell_command",
Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile", Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile",
"maximum" }, "maximum" },
{"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", {"--appname", appName + "-1", "--jar", APPMASTER_JAR,
"--num_containers", "1", "--shell_command",
Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile", Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
"default" }, "default" },
{"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", {"--appname", appName + "-2", "--jar", APPMASTER_JAR,
"--num_containers", "1", "--shell_command",
Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile", Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
"default", "--container_resource_profile", "maximum" } "default", "--container_resource_profile", "maximum" }
}; };
@ -1419,6 +1453,8 @@ public void testDSShellWithOpportunisticContainers() throws Exception {
Client client = new Client(new Configuration(yarnCluster.getConfig())); Client client = new Client(new Configuration(yarnCluster.getConfig()));
try { try {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1449,6 +1485,8 @@ public void testDSShellWithEnforceExecutionType() throws Exception {
Client client = new Client(new Configuration(yarnCluster.getConfig())); Client client = new Client(new Configuration(yarnCluster.getConfig()));
try { try {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1569,6 +1607,8 @@ public void doTestDistributedShellWithResources(boolean largeContainers)
} }
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1650,6 +1690,8 @@ public void run() {
public void testDistributedShellAMResourcesWithIllegalArguments() public void testDistributedShellAMResourcesWithIllegalArguments()
throws Exception { throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1667,6 +1709,8 @@ public void testDistributedShellAMResourcesWithIllegalArguments()
public void testDistributedShellAMResourcesWithMissingArgumentValue() public void testDistributedShellAMResourcesWithMissingArgumentValue()
throws Exception { throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1683,6 +1727,8 @@ public void testDistributedShellAMResourcesWithMissingArgumentValue()
public void testDistributedShellAMResourcesWithUnknownResource() public void testDistributedShellAMResourcesWithUnknownResource()
throws Exception { throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1701,6 +1747,8 @@ public void testDistributedShellAMResourcesWithUnknownResource()
public void testDistributedShellNonExistentQueue() public void testDistributedShellNonExistentQueue()
throws Exception { throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1719,6 +1767,8 @@ public void testDistributedShellNonExistentQueue()
public void testDistributedShellWithSingleFileLocalization() public void testDistributedShellWithSingleFileLocalization()
throws Exception { throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1740,6 +1790,8 @@ public void testDistributedShellWithSingleFileLocalization()
public void testDistributedShellWithMultiFileLocalization() public void testDistributedShellWithMultiFileLocalization()
throws Exception { throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1761,6 +1813,8 @@ public void testDistributedShellWithMultiFileLocalization()
public void testDistributedShellWithNonExistentFileLocalization() public void testDistributedShellWithNonExistentFileLocalization()
throws Exception { throws Exception {
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
@ -1784,14 +1838,14 @@ public void testDistributedShellCleanup()
throws Exception { throws Exception {
String appName = "DistributedShellCleanup"; String appName = "DistributedShellCleanup";
String[] args = { String[] args = {
"--appname",
generateAppName(),
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
"1", "1",
"--shell_command", "--shell_command",
Shell.WINDOWS ? "dir" : "ls", Shell.WINDOWS ? "dir" : "ls"
"--appname",
appName
}; };
Configuration config = new Configuration(yarnCluster.getConfig()); Configuration config = new Configuration(yarnCluster.getConfig());
Client client = new Client(config); Client client = new Client(config);