YARN-10334. Close clients in TestDistributedShell (#2571)

(cherry picked from commit 513f1995ad)
This commit is contained in:
Ahmed Hussein 2020-12-28 14:09:10 -05:00 committed by Masatake Iwasaki
parent a40eabd717
commit 98690134d5
2 changed files with 431 additions and 396 deletions

View File

@ -32,6 +32,7 @@
import java.util.Vector;
import java.util.Arrays;
import java.util.Base64;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
@ -253,6 +254,10 @@ public class Client {
// Command line options
private Options opts;
private final AtomicBoolean stopSignalReceived;
private final AtomicBoolean isRunning;
private final Object objectLock = new Object();
private static final String shellCommandPath = "shellCommands";
private static final String shellArgsPath = "shellArgs";
private static final String appMasterJarPath = "AppMaster.jar";
@ -413,6 +418,8 @@ public Client(Configuration conf) throws Exception {
opts.addOption("application_tags", true, "Application tags.");
opts.addOption("localize_files", true, "List of files, separated by comma"
+ " to be localized for the command");
stopSignalReceived = new AtomicBoolean(false);
isRunning = new AtomicBoolean(false);
}
/**
@ -670,8 +677,8 @@ public boolean init(String[] args) throws ParseException {
* @throws YarnException
*/
public boolean run() throws IOException, YarnException {
LOG.info("Running Client");
isRunning.set(true);
yarnClient.start();
// set the client start time.
clientStartTime = System.currentTimeMillis();
@ -1116,15 +1123,22 @@ private boolean monitorApplication(ApplicationId appId)
boolean res = false;
boolean needForceKill = false;
while (true) {
while (isRunning.get()) {
// Check app status every 1 second.
try {
Thread.sleep(APP_MONITOR_INTERVAL);
synchronized (objectLock) {
objectLock.wait(APP_MONITOR_INTERVAL);
}
needForceKill = stopSignalReceived.get();
} catch (InterruptedException e) {
LOG.warn("Thread sleep in monitoring loop interrupted");
// if the application is to be killed when client times out;
// then set needForceKill to true
break;
} finally {
if (needForceKill) {
break;
}
}
// Get application report for the appId we are interested in
@ -1177,6 +1191,8 @@ private boolean monitorApplication(ApplicationId appId)
forceKillApplication(appId);
}
isRunning.set(false);
return res;
}
@ -1388,4 +1404,31 @@ static Map<String, Long> parseResourcesString(String resourcesStr) {
}
return resources;
}
@VisibleForTesting
protected void sendStopSignal() {
LOG.info("Sending stop Signal to Client");
stopSignalReceived.set(true);
synchronized (objectLock) {
objectLock.notifyAll();
}
int waitCount = 0;
LOG.info("Waiting for Client to exit loop");
while (!isRunning.get()) {
try {
Thread.sleep(50);
} catch (InterruptedException ie) {
// do nothing
} finally {
waitCount++;
if (isRunning.get() || waitCount > 2000) {
break;
}
}
}
LOG.info("Stopping yarnClient within the Client");
yarnClient.stop();
yarnClient.waitForServiceToStop(clientTimeout);
LOG.info("done stopping Client");
}
}