From 00b5a27864e0e5af7f55503a1d85dfa347f9ec0c Mon Sep 17 00:00:00 2001 From: HUAN-PING SU Date: Thu, 8 Aug 2019 12:29:24 +0800 Subject: [PATCH] SUBMARINE-72. Kill and destroy the job through the submarine client (#1090) Contributed by kevin su. --- .../hadoop/yarn/submarine/client/cli/Cli.java | 10 +- .../submarine/client/cli/CliConstants.java | 1 + .../yarn/submarine/client/cli/Command.java | 2 +- .../yarn/submarine/client/cli/KillJobCli.java | 113 ++++++++++++++++++ .../client/cli/param/KillJobParameters.java | 19 +++ .../client/cli/param/ParametersHolder.java | 27 +++-- .../client/cli/TestKillJobCliParsing.java | 62 ++++++++++ 7 files changed, 217 insertions(+), 17 deletions(-) create mode 100644 hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/KillJobCli.java create mode 100644 hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/KillJobParameters.java create mode 100644 hadoop-submarine/hadoop-submarine-core/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestKillJobCliParsing.java diff --git a/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java b/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java index 9c5e0aa0cf..593fb89c64 100644 --- a/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java +++ b/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java @@ -14,16 +14,16 @@ package org.apache.hadoop.yarn.submarine.client.cli; +import java.util.Arrays; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.submarine.client.cli.runjob.RunJobCli; import org.apache.hadoop.yarn.submarine.common.ClientContext; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; - public class Cli { private static final Logger LOG = LoggerFactory.getLogger(Cli.class); @@ -35,7 +35,7 @@ public class Cli { helpMsg.append(" job \n"); helpMsg.append(" run : run a job, please see 'job run --help' for usage \n"); helpMsg.append(" show : get status of job, please see 'job show --help' for usage \n"); - + helpMsg.append(" kill : kill a job, please see 'job kill --help' for usage \n"); System.out.println(helpMsg.toString()); } @@ -92,6 +92,8 @@ public class Cli { new RunJobCli(clientContext).run(moduleArgs); } else if (subCmd.equals(CliConstants.SHOW)) { new ShowJobCli(clientContext).run(moduleArgs); + } else if (subCmd.equals(CliConstants.KILL)) { + new KillJobCli(clientContext).run(moduleArgs); } else { printHelp(); throw new IllegalArgumentException("Unknown option for job"); diff --git a/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java b/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java index cd9c8358b4..1f879342a8 100644 --- a/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java +++ b/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.submarine.client.cli; * NOTE: use lowercase + "_" for the option name */ public class CliConstants { + public static final String KILL = "kill"; public static final String RUN = "run"; public static final String SERVE = "serve"; public static final String LIST = "list"; diff --git a/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Command.java b/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Command.java index f534002acc..8f3051f69e 100644 --- a/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Command.java +++ b/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Command.java @@ -20,5 +20,5 @@ package org.apache.hadoop.yarn.submarine.client.cli; * Represents a Submarine command. */ public enum Command { - RUN_JOB, SHOW_JOB + RUN_JOB, SHOW_JOB, KILL_JOB } diff --git a/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/KillJobCli.java b/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/KillJobCli.java new file mode 100644 index 0000000000..98eeea401e --- /dev/null +++ b/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/KillJobCli.java @@ -0,0 +1,113 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.client.cli; + +import static org.apache.hadoop.yarn.client.api.AppAdminClient.DEFAULT_TYPE; + +import java.io.IOException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.yarn.client.api.AppAdminClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.submarine.client.cli.param.KillJobParameters; +import org.apache.hadoop.yarn.submarine.client.cli.param.ParametersHolder; +import org.apache.hadoop.yarn.submarine.common.ClientContext; +import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +public class KillJobCli extends AbstractCli { + private static final Logger LOG = LoggerFactory.getLogger(ShowJobCli.class); + + private Options options; + private ParametersHolder parametersHolder; + + public KillJobCli(ClientContext cliContext) { + super(cliContext); + options = generateOptions(); + } + + public void printUsages() { + new HelpFormatter().printHelp("job kill", options); + } + + private Options generateOptions() { + Options options = new Options(); + options.addOption(CliConstants.NAME, true, "Name of the job"); + options.addOption("h", "help", false, "Print help"); + return options; + } + + private void parseCommandLineAndGetKillJobParameters(String[] args) + throws IOException, YarnException { + // Do parsing + GnuParser parser = new GnuParser(); + CommandLine cli; + try { + cli = parser.parse(options, args); + parametersHolder = + ParametersHolder.createWithCmdLine(cli, Command.KILL_JOB); + parametersHolder.updateParameters(clientContext); + } catch (ParseException e) { + LOG.error(("Error parsing command-line options: " + e.getMessage())); + printUsages(); + } + } + + @VisibleForTesting + protected boolean killJob() throws IOException, YarnException { + String jobName = getParameters().getName(); + AppAdminClient appAdminClient = AppAdminClient + .createAppAdminClient(DEFAULT_TYPE, clientContext.getYarnConfig()); + + if (appAdminClient.actionStop(jobName) != 0) { + LOG.error("appAdminClient fail to stop application"); + return false; + } + if (appAdminClient.actionDestroy(jobName) != 0) { + LOG.error("appAdminClient fail to destroy application"); + return false; + } + + appAdminClient.stop(); + return true; + } + + @VisibleForTesting + public KillJobParameters getParameters() { + return (KillJobParameters) parametersHolder.getParameters(); + } + + @Override + public int run(String[] args) throws ParseException, IOException, + YarnException, InterruptedException, SubmarineException { + if (CliUtils.argsForHelp(args)) { + printUsages(); + return 0; + } + parseCommandLineAndGetKillJobParameters(args); + if (killJob() == true) { + LOG.info("Kill job successfully !"); + } + return 0; + } + +} diff --git a/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/KillJobParameters.java b/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/KillJobParameters.java new file mode 100644 index 0000000000..b761ee23f4 --- /dev/null +++ b/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/KillJobParameters.java @@ -0,0 +1,19 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.client.cli.param; + +public class KillJobParameters extends BaseParameters { + +} diff --git a/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/ParametersHolder.java b/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/ParametersHolder.java index 705f674059..182dda254f 100644 --- a/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/ParametersHolder.java +++ b/hadoop-submarine/hadoop-submarine-core/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/ParametersHolder.java @@ -16,9 +16,16 @@ package org.apache.hadoop.yarn.submarine.client.cli.param; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import static org.apache.hadoop.yarn.submarine.client.cli.runjob.RunJobCli.YAML_PARSE_FAILED; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.ParseException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -39,15 +46,9 @@ import org.apache.hadoop.yarn.submarine.common.ClientContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.apache.hadoop.yarn.submarine.client.cli.runjob.RunJobCli.YAML_PARSE_FAILED; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * This class acts as a wrapper of {@code CommandLine} values along with @@ -104,6 +105,8 @@ public final class ParametersHolder { } } else if (command == Command.SHOW_JOB) { return new ShowJobParameters(); + } else if (command == Command.KILL_JOB) { + return new KillJobParameters(); } else { throw new UnsupportedOperationException(SUPPORTED_COMMANDS_MESSAGE); } diff --git a/hadoop-submarine/hadoop-submarine-core/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestKillJobCliParsing.java b/hadoop-submarine/hadoop-submarine-core/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestKillJobCliParsing.java new file mode 100644 index 0000000000..db654dbf3d --- /dev/null +++ b/hadoop-submarine/hadoop-submarine-core/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestKillJobCliParsing.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.submarine.client.cli; + +import java.io.IOException; + +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.submarine.client.cli.param.KillJobParameters; +import org.apache.hadoop.yarn.submarine.common.MockClientContext; +import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs; +import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestKillJobCliParsing { + @Before + public void before() { + SubmarineLogs.verboseOff(); + } + + @Test + public void testPrintHelp() { + MockClientContext mockClientContext = new MockClientContext(); + KillJobCli killJobCli = new KillJobCli(mockClientContext); + killJobCli.printUsages(); + } + + @Test + public void testKillJob() + throws InterruptedException, SubmarineException, YarnException, + ParseException, IOException { + MockClientContext mockClientContext = new MockClientContext(); + KillJobCli killJobCli = new KillJobCli(mockClientContext) { + @Override + protected boolean killJob() { + // do nothing + return false; + } + }; + killJobCli.run(new String[] { "--name", "my-job" }); + KillJobParameters parameters = killJobCli.getParameters(); + Assert.assertEquals(parameters.getName(), "my-job"); + } +}