diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index d7114d0481..5da4384b00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -32,6 +32,7 @@ import java.util.Vector; import java.util.Arrays; import java.util.Base64; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; @@ -253,6 +254,10 @@ public class Client { // Command line options private Options opts; + private final AtomicBoolean stopSignalReceived; + private final AtomicBoolean isRunning; + private final Object objectLock = new Object(); + private static final String shellCommandPath = "shellCommands"; private static final String shellArgsPath = "shellArgs"; private static final String appMasterJarPath = "AppMaster.jar"; @@ -413,6 +418,8 @@ public Client(Configuration conf) throws Exception { opts.addOption("application_tags", true, "Application tags."); opts.addOption("localize_files", true, "List of files, separated by comma" + " to be localized for the command"); + stopSignalReceived = new AtomicBoolean(false); + isRunning = new AtomicBoolean(false); } /** @@ -670,8 +677,8 @@ public boolean init(String[] args) throws ParseException { * @throws YarnException */ public boolean run() throws IOException, YarnException { - LOG.info("Running Client"); + isRunning.set(true); yarnClient.start(); // set the client start time. clientStartTime = System.currentTimeMillis(); @@ -1116,15 +1123,22 @@ private boolean monitorApplication(ApplicationId appId) boolean res = false; boolean needForceKill = false; - while (true) { + while (isRunning.get()) { // Check app status every 1 second. try { - Thread.sleep(APP_MONITOR_INTERVAL); + synchronized (objectLock) { + objectLock.wait(APP_MONITOR_INTERVAL); + } + needForceKill = stopSignalReceived.get(); } catch (InterruptedException e) { LOG.warn("Thread sleep in monitoring loop interrupted"); // if the application is to be killed when client times out; // then set needForceKill to true break; + } finally { + if (needForceKill) { + break; + } } // Get application report for the appId we are interested in @@ -1177,6 +1191,8 @@ private boolean monitorApplication(ApplicationId appId) forceKillApplication(appId); } + isRunning.set(false); + return res; } @@ -1388,4 +1404,31 @@ static Map parseResourcesString(String resourcesStr) { } return resources; } + + @VisibleForTesting + protected void sendStopSignal() { + LOG.info("Sending stop Signal to Client"); + stopSignalReceived.set(true); + synchronized (objectLock) { + objectLock.notifyAll(); + } + int waitCount = 0; + LOG.info("Waiting for Client to exit loop"); + while (!isRunning.get()) { + try { + Thread.sleep(50); + } catch (InterruptedException ie) { + // do nothing + } finally { + waitCount++; + if (isRunning.get() || waitCount > 2000) { + break; + } + } + } + LOG.info("Stopping yarnClient within the Client"); + yarnClient.stop(); + yarnClient.waitForServiceToStop(clientTimeout); + LOG.info("done stopping Client"); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 438b12bca4..009ef3beeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -27,6 +27,7 @@ import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -88,6 +90,7 @@ import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; import org.apache.hadoop.yarn.server.timeline.TimelineVersion; @@ -128,6 +131,9 @@ public class TestDistributedShell { private static final float DEFAULT_TIMELINE_VERSION = 1.0f; private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; private static final int MIN_ALLOCATION_MB = 128; + private static final int TEST_TIME_OUT = 150000; + // set the timeout of the yarnClient to be 95% of the globalTimeout. + private static final int TEST_TIME_WINDOW_EXPIRE = (TEST_TIME_OUT * 90) / 100; protected final static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); @@ -135,17 +141,29 @@ public class TestDistributedShell { @Rule public TimelineVersionWatcher timelineVersionWatcher = new TimelineVersionWatcher(); + @Rule - public Timeout globalTimeout = new Timeout(90000); + public Timeout globalTimeout = new Timeout(TEST_TIME_OUT, + TimeUnit.MILLISECONDS); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public TestName name = new TestName(); - private String generateAppName() { - return name.getMethodName().replaceFirst("test", ""); - } + // set the timeout of the yarnClient to be 95% of the globalTimeout. + private final String yarnClientTimeout = + String.valueOf(TEST_TIME_WINDOW_EXPIRE); + + private final String[] commonArgs = { + "--jar", + APPMASTER_JAR, + "--timeout", + yarnClientTimeout, + "--appname", + "" + }; @Before public void setup() throws Exception { @@ -168,6 +186,7 @@ private void setupInternal(int numNodeManager, float timelineVersion, MIN_ALLOCATION_MB); // reduce the teardown waiting time conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 500); conf.set("yarn.log.dir", "target"); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); // mark if we need to launch the v1 timeline server @@ -201,11 +220,10 @@ private void setupInternal(int numNodeManager, float timelineVersion, conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT); } else if (timelineVersion == 1.5f) { - if (hdfsCluster == null) { - HdfsConfiguration hdfsConfig = new HdfsConfiguration(); - hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig) - .numDataNodes(1).build(); - } + HdfsConfiguration hdfsConfig = new HdfsConfiguration(); + hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig) + .numDataNodes(1).build(); + hdfsCluster.waitActive(); fs = hdfsCluster.getFileSystem(); PluginStoreTestUtils.prepareFileSystemForPluginStore(fs); PluginStoreTestUtils.prepareConfiguration(conf, hdfsCluster); @@ -231,39 +249,39 @@ private void setupInternal(int numNodeManager, float timelineVersion, } else { Assert.fail("Wrong timeline version number: " + timelineVersion); } - - if (yarnCluster == null) { - yarnCluster = - new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, - numNodeManager, 1, 1); - yarnCluster.init(conf); - - yarnCluster.start(); - conf.set( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - MiniYARNCluster.getHostname() + ":" - + yarnCluster.getApplicationHistoryServer().getPort()); + yarnCluster = + new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, + numNodeManager, 1, 1); + yarnCluster.init(conf); + yarnCluster.start(); - waitForNMsToRegister(); + conf.set( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + MiniYARNCluster.getHostname() + ":" + + yarnCluster.getApplicationHistoryServer().getPort()); - URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml"); - if (url == null) { - throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath"); - } - Configuration yarnClusterConfig = yarnCluster.getConfig(); - yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, - new File(url.getPath()).getParent()); - //write the document to a buffer (not directly to the file, as that - //can cause the file being written to get read -which will then fail. - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - yarnClusterConfig.writeXml(bytesOut); - bytesOut.close(); - //write the bytes to the file in the classpath - OutputStream os = new FileOutputStream(new File(url.getPath())); - os.write(bytesOut.toByteArray()); - os.close(); + waitForNMsToRegister(); + + URL url = Thread.currentThread().getContextClassLoader().getResource( + "yarn-site.xml"); + if (url == null) { + throw new RuntimeException( + "Could not find 'yarn-site.xml' dummy file in classpath"); } + Configuration yarnClusterConfig = yarnCluster.getConfig(); + yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + new File(url.getPath()).getParent()); + //write the document to a buffer (not directly to the file, as that + //can cause the file being written to get read -which will then fail. + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + yarnClusterConfig.writeXml(bytesOut); + bytesOut.close(); + //write the bytes to the file in the classpath + OutputStream os = new FileOutputStream(url.getPath()); + os.write(bytesOut.toByteArray()); + os.close(); + FileContext fsContext = FileContext.getLocalFSFileContext(); fsContext .delete( @@ -278,6 +296,11 @@ private void setupInternal(int numNodeManager, float timelineVersion, @After public void tearDown() throws IOException { + FileContext fsContext = FileContext.getLocalFSFileContext(); + fsContext + .delete( + new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), + true); if (yarnCluster != null) { try { yarnCluster.stop(); @@ -292,11 +315,6 @@ public void tearDown() throws IOException { hdfsCluster = null; } } - FileContext fsContext = FileContext.getLocalFSFileContext(); - fsContext - .delete( - new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), - true); } @Test @@ -345,9 +363,7 @@ public void testDSShellWithoutDomainV2CustomizedFlow() throws Exception { public void testDSShell(boolean haveDomain, boolean defaultFlow) throws Exception { - String[] args = { - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "2", "--shell_command", @@ -359,8 +375,8 @@ public void testDSShell(boolean haveDomain, boolean defaultFlow) "--container_memory", "128", "--container_vcores", - "1" - }; + "1"); + if (haveDomain) { String[] domainArgs = { "--domain", @@ -391,6 +407,7 @@ public void testDSShell(boolean haveDomain, boolean defaultFlow) } LOG.info("Initializing DS Client"); + YarnClient yarnClient; final Client client = new Client(new Configuration(yarnCluster.getConfig())); boolean initSuccess = client.init(args); Assert.assertTrue(initSuccess); @@ -407,7 +424,7 @@ public void run() { }; t.start(); - YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient = YarnClient.createYarnClient(); yarnClient.init(new Configuration(yarnCluster.getConfig())); yarnClient.start(); @@ -415,15 +432,15 @@ public void run() { String errorMessage = ""; ApplicationId appId = null; ApplicationReport appReport = null; - while(!verified) { + while (!verified) { List apps = yarnClient.getApplications(); - if (apps.size() == 0 ) { + if (apps.size() == 0) { Thread.sleep(10); continue; } appReport = apps.get(0); appId = appReport.getApplicationId(); - if(appReport.getHost().equals("N/A")) { + if (appReport.getHost().equals("N/A")) { Thread.sleep(10); continue; } @@ -436,7 +453,7 @@ public void run() { if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED && appReport.getFinalApplicationStatus() != - FinalApplicationStatus.UNDEFINED) { + FinalApplicationStatus.UNDEFINED) { break; } } @@ -463,11 +480,10 @@ public void run() { } } - TimelineDomain domain = null; if (!isTestingTimelineV2) { checkTimelineV1(haveDomain); } else { - checkTimelineV2(haveDomain, appId, defaultFlow, appReport); + checkTimelineV2(appId, defaultFlow, appReport); } } @@ -489,8 +505,8 @@ private void checkTimelineV1(boolean haveDomain) throws Exception { Assert.assertEquals(1, entitiesAttempts.getEntities().size()); Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents() .size()); - Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType() - .toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString()); + Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType(), + ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString()); if (haveDomain) { Assert.assertEquals(domain.getId(), entitiesAttempts.getEntities().get(0).getDomainId()); @@ -512,8 +528,8 @@ private void checkTimelineV1(boolean haveDomain) throws Exception { null, null, null, null, primaryFilter, null, null, null); Assert.assertNotNull(entities); Assert.assertEquals(2, entities.getEntities().size()); - Assert.assertEquals(entities.getEntities().get(0).getEntityType() - .toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString()); + Assert.assertEquals(entities.getEntities().get(0).getEntityType(), + ApplicationMaster.DSEntity.DS_CONTAINER.toString()); String entityId = entities.getEntities().get(0).getEntityId(); org.apache.hadoop.yarn.api.records.timeline.TimelineEntity entity = @@ -532,7 +548,7 @@ private void checkTimelineV1(boolean haveDomain) throws Exception { } } - private void checkTimelineV2(boolean haveDomain, ApplicationId appId, + private void checkTimelineV2(ApplicationId appId, boolean defaultFlow, ApplicationReport appReport) throws Exception { LOG.info("Started checkTimelineV2 "); // For PoC check using the file-based timeline writer (YARN-3264) @@ -635,7 +651,13 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId, verifyEntityForTimelineV2(appAttemptEntityFile, AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true); } finally { - FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); + try { + FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); + } catch (FileNotFoundException ex) { + // the recursive delete can throw an exception when one of the file + // does not exist. + LOG.warn("Exception deleting a file/subDirectory: {}", ex.getMessage()); + } } } @@ -673,7 +695,7 @@ private void verifyEntityForTimelineV2(File entityFile, String expectedEvent, long actualCount = 0; for (int i = 0; i < checkTimes; i++) { BufferedReader reader = null; - String strLine = null; + String strLine; actualCount = 0; try { reader = new BufferedReader(new FileReader(entityFile)); @@ -710,7 +732,9 @@ private void verifyEntityForTimelineV2(File entityFile, String expectedEvent, } } } finally { - reader.close(); + if (reader != null) { + reader.close(); + } } if (numOfExpectedEvent == actualCount) { break; @@ -727,14 +751,35 @@ private void verifyEntityForTimelineV2(File entityFile, String expectedEvent, * Utility function to merge two String arrays to form a new String array for * our argumemts. * - * @param args - * @param newArgs + * @param args the first set of the arguments. + * @param newArgs the second set of the arguments. * @return a String array consists of {args, newArgs} */ private String[] mergeArgs(String[] args, String[] newArgs) { - List argsList = new ArrayList(Arrays.asList(args)); - argsList.addAll(Arrays.asList(newArgs)); - return argsList.toArray(new String[argsList.size()]); + int length = args.length + newArgs.length; + String[] result = new String[length]; + System.arraycopy(args, 0, result, 0, args.length); + System.arraycopy(newArgs, 0, result, args.length, newArgs.length); + return result; + } + + private String generateAppName(String postFix) { + return name.getMethodName().replaceFirst("test", "") + .concat(postFix == null? "" : "-" + postFix); + } + + private String[] createArguments(String... args) { + String[] res = mergeArgs(commonArgs, args); + // set the application name so we can track down which command is running. + res[commonArgs.length - 1] = generateAppName(null); + return res; + } + + private String[] createArgsWithPostFix(int index, String... args) { + String[] res = mergeArgs(commonArgs, args); + // set the application name so we can track down which command is running. + res[commonArgs.length - 1] = generateAppName(String.valueOf(index)); + return res; } protected String getSleepCommand(int sec) { @@ -745,11 +790,7 @@ protected String getSleepCommand(int sec) { @Test public void testDSRestartWithPreviousRunningContainers() throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "1", "--shell_command", @@ -759,20 +800,20 @@ public void testDSRestartWithPreviousRunningContainers() throws Exception { "--container_memory", "128", "--keep_containers_across_application_attempts" - }; + ); - LOG.info("Initializing DS Client"); - Client client = new Client(TestDSFailedAppMaster.class.getName(), + LOG.info("Initializing DS Client"); + Client client = new Client(TestDSFailedAppMaster.class.getName(), new Configuration(yarnCluster.getConfig())); - client.init(args); - LOG.info("Running DS Client"); - boolean result = client.run(); + client.init(args); - LOG.info("Client run completed. Result=" + result); - // application should succeed - Assert.assertTrue(result); - } + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + // application should succeed + Assert.assertTrue(result); + } /* * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds. @@ -782,11 +823,7 @@ public void testDSRestartWithPreviousRunningContainers() throws Exception { */ @Test public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "1", "--shell_command", @@ -797,22 +834,23 @@ public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { "128", "--attempt_failures_validity_interval", "2500" - }; + ); - LOG.info("Initializing DS Client"); - Configuration conf = yarnCluster.getConfig(); - conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - Client client = new Client(TestDSSleepingAppMaster.class.getName(), - new Configuration(conf)); + LOG.info("Initializing DS Client"); + Configuration config = yarnCluster.getConfig(); + config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + Client client = new Client(TestDSSleepingAppMaster.class.getName(), + new Configuration(config)); - client.init(args); - LOG.info("Running DS Client"); - boolean result = client.run(); + client.init(args); - LOG.info("Client run completed. Result=" + result); - // application should succeed - Assert.assertTrue(result); - } + LOG.info("Running DS Client"); + boolean result = client.run(); + + LOG.info("Client run completed. Result=" + result); + // application should succeed + Assert.assertTrue(result); + } /* * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds. @@ -822,11 +860,7 @@ public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { */ @Test public void testDSAttemptFailuresValidityIntervalFailed() throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "1", "--shell_command", @@ -837,22 +871,23 @@ public void testDSAttemptFailuresValidityIntervalFailed() throws Exception { "128", "--attempt_failures_validity_interval", "15000" - }; + ); - LOG.info("Initializing DS Client"); - Configuration conf = yarnCluster.getConfig(); - conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - Client client = new Client(TestDSSleepingAppMaster.class.getName(), - new Configuration(conf)); + LOG.info("Initializing DS Client"); + Configuration config = yarnCluster.getConfig(); + config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + Client client = new Client(TestDSSleepingAppMaster.class.getName(), + new Configuration(config)); - client.init(args); - LOG.info("Running DS Client"); - boolean result = client.run(); + client.init(args); - LOG.info("Client run completed. Result=" + result); - // application should be failed - Assert.assertFalse(result); - } + LOG.info("Running DS Client"); + boolean result = client.run(); + + LOG.info("Client run completed. Result=" + result); + // application should be failed + Assert.assertFalse(result); + } @Test public void testDSShellWithCustomLogPropertyFile() throws Exception { @@ -871,11 +906,7 @@ public void testDSShellWithCustomLogPropertyFile() throws Exception { // set the output to DEBUG level fileWriter.write("log4j.rootLogger=debug,stdout"); fileWriter.close(); - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "3", "--shell_command", @@ -892,7 +923,7 @@ public void testDSShellWithCustomLogPropertyFile() throws Exception { "128", "--container_vcores", "1" - }; + ); //Before run the DS, the default the log level is INFO final Logger LOG_Client = @@ -908,6 +939,7 @@ public void testDSShellWithCustomLogPropertyFile() throws Exception { new Client(new Configuration(yarnCluster.getConfig())); boolean initSuccess = client.init(args); Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); boolean result = client.run(); LOG.info("Client run completed. Result=" + result); @@ -922,16 +954,12 @@ public void testDSShellWithCustomLogPropertyFile() throws Exception { @Test public void testSpecifyingLogAggregationContext() throws Exception { String regex = ".*(foo|bar)\\d"; - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--shell_command", "echo", "--rolling_log_pattern", regex - }; + ); final Client client = new Client(new Configuration(yarnCluster.getConfig())); Assert.assertTrue(client.init(args)); @@ -946,11 +974,7 @@ public void testSpecifyingLogAggregationContext() throws Exception { public void testDSShellWithCommands() throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "2", "--shell_command", @@ -963,7 +987,7 @@ public void testDSShellWithCommands() throws Exception { "128", "--container_vcores", "1" - }; + ); LOG.info("Initializing DS Client"); final Client client = @@ -971,20 +995,20 @@ public void testDSShellWithCommands() throws Exception { boolean initSuccess = client.init(args); Assert.assertTrue(initSuccess); LOG.info("Running DS Client"); - boolean result = client.run(); - LOG.info("Client run completed. Result=" + result); - List expectedContent = new ArrayList(); - expectedContent.add("output_expected"); - verifyContainerLog(2, expectedContent, false, ""); + try { + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + List expectedContent = new ArrayList<>(); + expectedContent.add("output_expected"); + verifyContainerLog(2, expectedContent, false, ""); + } finally { + client.sendStopSignal(); + } } @Test public void testDSShellWithMultipleArgs() throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "4", "--shell_command", @@ -999,7 +1023,7 @@ public void testDSShellWithMultipleArgs() throws Exception { "128", "--container_vcores", "1" - }; + ); LOG.info("Initializing DS Client"); final Client client = @@ -1007,9 +1031,10 @@ public void testDSShellWithMultipleArgs() throws Exception { boolean initSuccess = client.init(args); Assert.assertTrue(initSuccess); LOG.info("Running DS Client"); + boolean result = client.run(); LOG.info("Client run completed. Result=" + result); - List expectedContent = new ArrayList(); + List expectedContent = new ArrayList<>(); expectedContent.add("HADOOP YARN MAPREDUCE HDFS"); verifyContainerLog(4, expectedContent, false, ""); } @@ -1031,12 +1056,8 @@ public void testDSShellWithShellScript() throws Exception { // set the output to DEBUG level fileWriter.write("echo testDSShellWithShellScript"); fileWriter.close(); - System.out.println(customShellScript.getAbsolutePath()); - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + LOG.info(customShellScript.getAbsolutePath()); + String[] args = createArguments( "--num_containers", "1", "--shell_script", @@ -1049,7 +1070,7 @@ public void testDSShellWithShellScript() throws Exception { "128", "--container_vcores", "1" - }; + ); LOG.info("Initializing DS Client"); final Client client = @@ -1059,7 +1080,7 @@ public void testDSShellWithShellScript() throws Exception { LOG.info("Running DS Client"); boolean result = client.run(); LOG.info("Client run completed. Result=" + result); - List expectedContent = new ArrayList(); + List expectedContent = new ArrayList<>(); expectedContent.add("testDSShellWithShellScript"); verifyContainerLog(1, expectedContent, false, ""); } @@ -1067,7 +1088,7 @@ public void testDSShellWithShellScript() throws Exception { @Test public void testDSShellWithInvalidArgs() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); - + int appNameCounter = 0; LOG.info("Initializing DS Client with no args"); try { client.init(new String[]{}); @@ -1079,9 +1100,7 @@ public void testDSShellWithInvalidArgs() throws Exception { LOG.info("Initializing DS Client with no jar file"); try { - String[] args = { - "--appname", - generateAppName(), + String[] args = createArgsWithPostFix(appNameCounter++, "--num_containers", "2", "--shell_command", @@ -1090,8 +1109,9 @@ public void testDSShellWithInvalidArgs() throws Exception { "512", "--container_memory", "128" - }; - client.init(args); + ); + String[] argsNoJar = Arrays.copyOfRange(args, 2, args.length); + client.init(argsNoJar); Assert.fail("Exception is expected"); } catch (IllegalArgumentException e) { Assert.assertTrue("The throw exception is not expected", @@ -1100,16 +1120,14 @@ public void testDSShellWithInvalidArgs() throws Exception { LOG.info("Initializing DS Client with no shell command"); try { - String[] args = { - "--jar", - APPMASTER_JAR, + String[] args = createArgsWithPostFix(appNameCounter++, "--num_containers", "2", "--master_memory", "512", "--container_memory", "128" - }; + ); client.init(args); Assert.fail("Exception is expected"); } catch (IllegalArgumentException e) { @@ -1119,9 +1137,7 @@ public void testDSShellWithInvalidArgs() throws Exception { LOG.info("Initializing DS Client with invalid no. of containers"); try { - String[] args = { - "--jar", - APPMASTER_JAR, + String[] args = createArgsWithPostFix(appNameCounter++, "--num_containers", "-1", "--shell_command", @@ -1130,7 +1146,7 @@ public void testDSShellWithInvalidArgs() throws Exception { "512", "--container_memory", "128" - }; + ); client.init(args); Assert.fail("Exception is expected"); } catch (IllegalArgumentException e) { @@ -1140,9 +1156,7 @@ public void testDSShellWithInvalidArgs() throws Exception { LOG.info("Initializing DS Client with invalid no. of vcores"); try { - String[] args = { - "--jar", - APPMASTER_JAR, + String[] args = createArgsWithPostFix(appNameCounter++, "--num_containers", "2", "--shell_command", @@ -1155,7 +1169,7 @@ public void testDSShellWithInvalidArgs() throws Exception { "128", "--container_vcores", "1" - }; + ); client.init(args); client.run(); Assert.fail("Exception is expected"); @@ -1166,9 +1180,7 @@ public void testDSShellWithInvalidArgs() throws Exception { LOG.info("Initializing DS Client with --shell_command and --shell_script"); try { - String[] args = { - "--jar", - APPMASTER_JAR, + String[] args = createArgsWithPostFix(appNameCounter++, "--num_containers", "2", "--shell_command", @@ -1183,7 +1195,7 @@ public void testDSShellWithInvalidArgs() throws Exception { "1", "--shell_script", "test.sh" - }; + ); client.init(args); Assert.fail("Exception is expected"); } catch (IllegalArgumentException e) { @@ -1194,9 +1206,7 @@ public void testDSShellWithInvalidArgs() throws Exception { LOG.info("Initializing DS Client without --shell_command and --shell_script"); try { - String[] args = { - "--jar", - APPMASTER_JAR, + String[] args = createArgsWithPostFix(appNameCounter++, "--num_containers", "2", "--master_memory", @@ -1207,7 +1217,7 @@ public void testDSShellWithInvalidArgs() throws Exception { "128", "--container_vcores", "1" - }; + ); client.init(args); Assert.fail("Exception is expected"); } catch (IllegalArgumentException e) { @@ -1218,9 +1228,7 @@ public void testDSShellWithInvalidArgs() throws Exception { LOG.info("Initializing DS Client with invalid container_type argument"); try { - String[] args = { - "--jar", - APPMASTER_JAR, + String[] args = createArgsWithPostFix(appNameCounter++, "--num_containers", "2", "--master_memory", @@ -1235,13 +1243,46 @@ public void testDSShellWithInvalidArgs() throws Exception { "date", "--container_type", "UNSUPPORTED_TYPE" - }; + ); client.init(args); Assert.fail("Exception is expected"); } catch (IllegalArgumentException e) { Assert.assertTrue("The throw exception is not expected", e.getMessage().contains("Invalid container_type: UNSUPPORTED_TYPE")); } + + try { + String[] args = createArgsWithPostFix(appNameCounter++, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_resources", + "memory-mb=invalid" + ); + client.init(args); + Assert.fail("Exception is expected"); + } catch (IllegalArgumentException e) { + // do nothing + LOG.info("IllegalArgumentException exception is expected: {}", + e.getMessage()); + } + + try { + String[] args = createArgsWithPostFix(appNameCounter++, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_resources" + ); + client.init(args); + Assert.fail("Exception is expected"); + } catch (MissingArgumentException e) { + // do nothing + LOG.info("MissingArgumentException exception is expected: {}", + e.getMessage()); + } } @Test @@ -1276,54 +1317,45 @@ protected TimelineWriter createTimelineWriter(Configuration conf, } protected void waitForNMsToRegister() throws Exception { - int sec = 60; - while (sec >= 0) { - if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size() - >= NUM_NMS) { - break; + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + RMContext rmContext = yarnCluster.getResourceManager().getRMContext(); + return (rmContext.getRMNodes().size() >= NUM_NMS); } - Thread.sleep(1000); - sec--; - } + }, 100, 60000); } @Test public void testContainerLaunchFailureHandling() throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, - "--num_containers", - "2", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_memory", - "512", - "--container_memory", - "128" - }; + String[] args = createArguments( + "--num_containers", + "2", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_memory", + "512", + "--container_memory", + "128" + ); LOG.info("Initializing DS Client"); Client client = new Client(ContainerLaunchFailAppMaster.class.getName(), - new Configuration(yarnCluster.getConfig())); + new Configuration(yarnCluster.getConfig())); boolean initSuccess = client.init(args); Assert.assertTrue(initSuccess); LOG.info("Running DS Client"); - boolean result = client.run(); - - LOG.info("Client run completed. Result=" + result); - Assert.assertFalse(result); - + try { + boolean result = client.run(); + Assert.assertFalse(result); + } finally { + client.sendStopSignal(); + } } @Test public void testDebugFlag() throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "2", "--shell_command", @@ -1337,7 +1369,7 @@ public void testDebugFlag() throws Exception { "--container_vcores", "1", "--debug" - }; + ); LOG.info("Initializing DS Client"); Client client = new Client(new Configuration(yarnCluster.getConfig())); @@ -1370,7 +1402,7 @@ private int verifyContainerLog(int containerNum, for (File output : containerFiles[i].listFiles()) { if (output.getName().trim().contains("stdout")) { BufferedReader br = null; - List stdOutContent = new ArrayList(); + List stdOutContent = new ArrayList<>(); try { String sCurrentLine; @@ -1402,13 +1434,13 @@ private int verifyContainerLog(int containerNum, Assert.assertTrue(stdOutContent.containsAll(expectedContent)); } } catch (IOException e) { - e.printStackTrace(); + LOG.error("Exception reading the buffer", e); } finally { try { if (br != null) br.close(); } catch (IOException ex) { - ex.printStackTrace(); + LOG.error("Exception closing the bufferReader", ex); } } } @@ -1419,21 +1451,21 @@ private int verifyContainerLog(int containerNum, @Test public void testDistributedShellResourceProfiles() throws Exception { - String appName = generateAppName(); + int appNameCounter = 0; String[][] args = { - {"--appname", appName + "-0", "--jar", APPMASTER_JAR, + createArgsWithPostFix(appNameCounter++, "--num_containers", "1", "--shell_command", Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile", - "maximum" }, - {"--appname", appName + "-1", "--jar", APPMASTER_JAR, + "maximum"), + createArgsWithPostFix(appNameCounter++, "--num_containers", "1", "--shell_command", Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile", - "default" }, - {"--appname", appName + "-2", "--jar", APPMASTER_JAR, + "default"), + createArgsWithPostFix(appNameCounter++, "--num_containers", "1", "--shell_command", Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile", - "default", "--container_resource_profile", "maximum" } - }; + "default", "--container_resource_profile", "maximum"), + }; for (int i = 0; i < args.length; ++i) { LOG.info("Initializing DS Client"); @@ -1453,11 +1485,7 @@ public void testDistributedShellResourceProfiles() throws Exception { public void testDSShellWithOpportunisticContainers() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); try { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "2", "--master_memory", @@ -1472,24 +1500,24 @@ public void testDSShellWithOpportunisticContainers() throws Exception { "date", "--container_type", "OPPORTUNISTIC" - }; + ); client.init(args); - client.run(); + assertTrue(client.run()); } catch (Exception e) { - Assert.fail("Job execution with opportunistic containers failed."); + LOG.error("Job execution with opportunistic containers failed.", e); + Assert.fail("Exception. " + e.getMessage()); + } finally { + client.sendStopSignal(); } } @Test @TimelineVersion(2.0f) public void testDSShellWithEnforceExecutionType() throws Exception { + YarnClient yarnClient = null; Client client = new Client(new Configuration(yarnCluster.getConfig())); try { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "2", "--master_memory", @@ -1505,7 +1533,7 @@ public void testDSShellWithEnforceExecutionType() throws Exception { "--container_type", "OPPORTUNISTIC", "--enforce_execution_type" - }; + ); client.init(args); final AtomicBoolean result = new AtomicBoolean(false); Thread t = new Thread() { @@ -1519,7 +1547,7 @@ public void run() { }; t.start(); - YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient = YarnClient.createYarnClient(); yarnClient.init(new Configuration(yarnCluster.getConfig())); yarnClient.start(); waitForContainersLaunch(yarnClient, 2); @@ -1543,7 +1571,13 @@ public void run() { } } } catch (Exception e) { - Assert.fail("Job execution with enforce execution type failed."); + LOG.error("Job execution with enforce execution type failed.", e); + Assert.fail("Exception. " + e.getMessage()); + } finally { + client.sendStopSignal(); + if (yarnClient != null) { + yarnClient.stop(); + } } } @@ -1592,26 +1626,22 @@ public void doTestDistributedShellWithResources(boolean largeContainers) .getResourceScheduler().getClusterResource(); String masterMemoryString = "1 Gi"; String containerMemoryString = "512 Mi"; - long masterMemory = 1024; - long containerMemory = 512; + long[] memVars = {1024, 512}; + Assume.assumeTrue("The cluster doesn't have enough memory for this test", - clusterResource.getMemorySize() >= masterMemory + containerMemory); + clusterResource.getMemorySize() >= memVars[0] + memVars[1]); Assume.assumeTrue("The cluster doesn't have enough cores for this test", clusterResource.getVirtualCores() >= 2); if (largeContainers) { - masterMemory = clusterResource.getMemorySize() * 2 / 3; - masterMemory = masterMemory - masterMemory % MIN_ALLOCATION_MB; - masterMemoryString = masterMemory + "Mi"; - containerMemory = clusterResource.getMemorySize() / 3; - containerMemory = containerMemory - containerMemory % MIN_ALLOCATION_MB; - containerMemoryString = String.valueOf(containerMemory); + memVars[0] = clusterResource.getMemorySize() * 2 / 3; + memVars[0] = memVars[0] - memVars[0] % MIN_ALLOCATION_MB; + masterMemoryString = memVars[0] + "Mi"; + memVars[1] = clusterResource.getMemorySize() / 3; + memVars[1] = memVars[1] - memVars[1] % MIN_ALLOCATION_MB; + containerMemoryString = String.valueOf(memVars[1]); } - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "2", "--shell_command", @@ -1619,8 +1649,8 @@ public void doTestDistributedShellWithResources(boolean largeContainers) "--master_resources", "memory=" + masterMemoryString + ",vcores=1", "--container_resources", - "memory=" + containerMemoryString + ",vcores=1", - }; + "memory=" + containerMemoryString + ",vcores=1" + ); LOG.info("Initializing DS Client"); Client client = new Client(new Configuration(yarnCluster.getConfig())); @@ -1642,103 +1672,81 @@ public void run() { yarnClient.init(new Configuration(yarnCluster.getConfig())); yarnClient.start(); - while (true) { - List apps = yarnClient.getApplications(); - if (apps.isEmpty()) { - Thread.sleep(10); - continue; - } - ApplicationReport appReport = apps.get(0); - ApplicationId appId = appReport.getApplicationId(); - List appAttempts = - yarnClient.getApplicationAttempts(appId); - if (appAttempts.isEmpty()) { - Thread.sleep(10); - continue; - } - ApplicationAttemptReport appAttemptReport = appAttempts.get(0); - ContainerId amContainerId = appAttemptReport.getAMContainerId(); - - if (amContainerId == null) { - Thread.sleep(10); - continue; - } - ContainerReport report = yarnClient.getContainerReport(amContainerId); - Resource masterResource = report.getAllocatedResource(); - Assert.assertEquals(masterMemory, masterResource.getMemorySize()); - Assert.assertEquals(1, masterResource.getVirtualCores()); - - List containers = - yarnClient.getContainers(appAttemptReport.getApplicationAttemptId()); - if (containers.size() < 2) { - Thread.sleep(10); - continue; - } - for (ContainerReport container : containers) { - if (!container.getContainerId().equals(amContainerId)) { - Resource containerResource = container.getAllocatedResource(); - Assert.assertEquals(containerMemory, - containerResource.getMemorySize()); - Assert.assertEquals(1, containerResource.getVirtualCores()); + final AtomicBoolean testFailed = new AtomicBoolean(false); + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + if (testFailed.get()) { + return true; + } + List containers; + try { + List apps = yarnClient.getApplications(); + if (apps.isEmpty()) { + return false; + } + ApplicationReport appReport = apps.get(0); + ApplicationId appId = appReport.getApplicationId(); + List appAttempts = + yarnClient.getApplicationAttempts(appId); + if (appAttempts.isEmpty()) { + return false; + } + ApplicationAttemptReport appAttemptReport = appAttempts.get(0); + ContainerId amContainerId = appAttemptReport.getAMContainerId(); + if (amContainerId == null) { + return false; + } + ContainerReport report = yarnClient.getContainerReport( + amContainerId); + Resource masterResource = report.getAllocatedResource(); + Assert.assertEquals(memVars[0], + masterResource.getMemorySize()); + Assert.assertEquals(1, masterResource.getVirtualCores()); + containers = yarnClient.getContainers( + appAttemptReport.getApplicationAttemptId()); + if (containers.size() < 2) { + return false; + } + for (ContainerReport container : containers) { + if (!container.getContainerId().equals(amContainerId)) { + Resource containerResource = container.getAllocatedResource(); + Assert.assertEquals(memVars[1], + containerResource.getMemorySize()); + Assert.assertEquals(1, containerResource.getVirtualCores()); + } + } + return true; + } catch (Exception ex) { + LOG.error("Error waiting for expected results", ex); + testFailed.set(true); + } + return false; } + }, 10, TEST_TIME_WINDOW_EXPIRE); + assertFalse(testFailed.get()); + } finally { + LOG.info("Signaling Client to Stop"); + client.sendStopSignal(); + if (yarnClient != null) { + LOG.info("Stopping yarnClient service"); + yarnClient.stop(); } - - return; } } - @Test(expected=IllegalArgumentException.class) - public void testDistributedShellAMResourcesWithIllegalArguments() - throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, - "--num_containers", - "1", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_resources", - "memory-mb=invalid" - }; - Client client = new Client(new Configuration(yarnCluster.getConfig())); - client.init(args); - } - - @Test(expected=MissingArgumentException.class) - public void testDistributedShellAMResourcesWithMissingArgumentValue() - throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, - "--num_containers", - "1", - "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--master_resources" - }; - Client client = new Client(new Configuration(yarnCluster.getConfig())); - client.init(args); - } - @Test(expected=ResourceNotFoundException.class) public void testDistributedShellAMResourcesWithUnknownResource() throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "1", "--shell_command", Shell.WINDOWS ? "dir" : "ls", "--master_resources", "unknown-resource=5" - }; + ); Client client = new Client(new Configuration(yarnCluster.getConfig())); client.init(args); client.run(); @@ -1747,18 +1755,14 @@ public void testDistributedShellAMResourcesWithUnknownResource() @Test(expected=IllegalArgumentException.class) public void testDistributedShellNonExistentQueue() throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "1", "--shell_command", Shell.WINDOWS ? "dir" : "ls", "--queue", "non-existent-queue" - }; + ); Client client = new Client(new Configuration(yarnCluster.getConfig())); client.init(args); client.run(); @@ -1767,11 +1771,7 @@ public void testDistributedShellNonExistentQueue() @Test public void testDistributedShellWithSingleFileLocalization() throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "1", "--shell_command", @@ -1780,7 +1780,7 @@ public void testDistributedShellWithSingleFileLocalization() "./src/test/resources/a.txt", "--shell_args", "a.txt" - }; + ); Client client = new Client(new Configuration(yarnCluster.getConfig())); client.init(args); @@ -1790,11 +1790,7 @@ public void testDistributedShellWithSingleFileLocalization() @Test public void testDistributedShellWithMultiFileLocalization() throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "1", "--shell_command", @@ -1803,7 +1799,7 @@ public void testDistributedShellWithMultiFileLocalization() "./src/test/resources/a.txt,./src/test/resources/b.txt", "--shell_args", "a.txt b.txt" - }; + ); Client client = new Client(new Configuration(yarnCluster.getConfig())); client.init(args); @@ -1813,11 +1809,7 @@ public void testDistributedShellWithMultiFileLocalization() @Test(expected=UncheckedIOException.class) public void testDistributedShellWithNonExistentFileLocalization() throws Exception { - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "1", "--shell_command", @@ -1826,11 +1818,11 @@ public void testDistributedShellWithNonExistentFileLocalization() "/non/existing/path/file.txt", "--shell_args", "file.txt" - }; + ); Client client = new Client(new Configuration(yarnCluster.getConfig())); client.init(args); - client.run(); + assertTrue(client.run()); } @@ -1838,34 +1830,34 @@ public void testDistributedShellWithNonExistentFileLocalization() public void testDistributedShellCleanup() throws Exception { String appName = "DistributedShellCleanup"; - String[] args = { - "--appname", - generateAppName(), - "--jar", - APPMASTER_JAR, + String[] args = createArguments( "--num_containers", "1", "--shell_command", Shell.WINDOWS ? "dir" : "ls" - }; + ); Configuration config = new Configuration(yarnCluster.getConfig()); Client client = new Client(config); - client.init(args); - client.run(); - ApplicationId appId = client.getAppId(); - String relativePath = - ApplicationMaster.getRelativePath(appName, appId.toString(), ""); - FileSystem fs1 = FileSystem.get(config); - Path path = new Path(fs1.getHomeDirectory(), relativePath); + try { + client.init(args); + client.run(); + ApplicationId appId = client.getAppId(); + String relativePath = + ApplicationMaster.getRelativePath(appName, appId.toString(), ""); + FileSystem fs1 = FileSystem.get(config); + Path path = new Path(fs1.getHomeDirectory(), relativePath); - GenericTestUtils.waitFor(() -> { - try { - return !fs1.exists(path); - } catch (IOException e) { - return false; - } - }, 10, 60000); + GenericTestUtils.waitFor(() -> { + try { + return !fs1.exists(path); + } catch (IOException e) { + return false; + } + }, 10, 60000); - assertFalse("Distributed Shell Cleanup failed", fs1.exists(path)); + assertFalse("Distributed Shell Cleanup failed", fs1.exists(path)); + } finally { + client.sendStopSignal(); + } } }