From 3de66f5c4046cf9e44c1f6b14779bc4f6442e92f Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Tue, 26 Sep 2023 04:52:57 +0800 Subject: [PATCH] YARN-11547. [Federation] Router Supports Remove individual application records from FederationStateStore. (#6055) --- .../hadoop/yarn/server/router/Router.java | 96 ++++++++++++++++--- .../router/TestRouterStoreCommands.java | 77 +++++++++++++++ 2 files changed, 159 insertions(+), 14 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterStoreCommands.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java index 4761866253..e4defc308d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java @@ -28,6 +28,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.MissingArgumentException; import org.apache.commons.lang.time.DurationFormatUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; @@ -37,14 +43,16 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; import org.apache.hadoop.yarn.server.router.cleaner.SubClusterCleaner; import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; @@ -103,6 +111,9 @@ public class Router extends CompositeService { protected String webAppAddress; private static long clusterTimeStamp = System.currentTimeMillis(); private FedAppReportFetcher fetcher = null; + private static final String CMD_FORMAT_STATE_STORE = "-format-state-store"; + private static final String CMD_REMOVE_APPLICATION_FROM_STATE_STORE = + "-remove-application-from-state-store"; /** * Priority of the Router shutdown hook. @@ -191,7 +202,7 @@ protected void serviceStop() throws Exception { } protected void shutDown() { - new Thread(() -> Router.this.stop()).start(); + new Thread(Router.this::stop).start(); } protected RouterClientRMService createClientRMProxyService() { @@ -292,24 +303,14 @@ public static String getProxyHostAndPort(Configuration conf) { public static void main(String[] argv) { Configuration conf = new YarnConfiguration(); - Thread - .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(Router.class, argv, LOG); Router router = new Router(); try { GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); argv = hParser.getRemainingArgs(); if (argv.length > 1) { - if (argv[0].equals("-format-state-store")) { - // TODO: YARN-11548. [Federation] Router Supports Format FederationStateStore. - System.err.println("format-state-store is not yet supported."); - } else if (argv[0].equals("-remove-application-from-state-store") && argv.length == 2) { - // TODO: YARN-11547. [Federation] - // Router Supports Remove individual application records from FederationStateStore. - System.err.println("remove-application-from-state-store is not yet supported."); - } else { - printUsage(System.err); - } + executeRouterCommand(conf, argv); } else { // Remove the old hook if we are rebooting. if (null != routerShutdownHook) { @@ -362,6 +363,73 @@ public FedAppReportFetcher getFetcher() { return fetcher; } + @VisibleForTesting + public static void removeApplication(Configuration conf, String applicationId) + throws Exception { + FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance(conf); + ApplicationId removeAppId = ApplicationId.fromString(applicationId); + LOG.info("Deleting application {} from state store.", removeAppId); + facade.deleteApplicationHomeSubCluster(removeAppId); + LOG.info("Application is deleted from state store"); + } + + private static void handFormatStateStore() { + // TODO: YARN-11548. [Federation] Router Supports Format FederationStateStore. + System.err.println("format-state-store is not yet supported."); + } + + private static void handRemoveApplicationFromStateStore(Configuration conf, + String applicationId) { + try { + removeApplication(conf, applicationId); + System.out.println("Application " + applicationId + " is deleted from state store"); + } catch (Exception e) { + System.err.println("Application " + applicationId + " error, exception = " + e); + } + } + + private static void executeRouterCommand(Configuration conf, String[] args) { + // Step1. Define Options. + Options opts = new Options(); + Option formatStateStoreOpt = new Option("format-state-store", false, + " Formats the FederationStateStore. " + + "This will clear the FederationStateStore and " + + "is useful if past applications are no longer needed. " + + "This should be run only when the Router is not running."); + Option removeApplicationFromStateStoreOpt = new Option("remove-application-from-state-store", + false, " Remove the application from FederationStateStore. " + + " This should be run only when the Router is not running. "); + opts.addOption(formatStateStoreOpt); + opts.addOption(removeApplicationFromStateStoreOpt); + + // Step2. Parse Options. + try { + String cmd = args[0]; + + CommandLine cliParser = new DefaultParser().parse(opts, args); + + if (CMD_FORMAT_STATE_STORE.equals(cmd)) { + handFormatStateStore(); + } else if (CMD_REMOVE_APPLICATION_FROM_STATE_STORE.equals(cmd)) { + if (cliParser.hasOption(removeApplicationFromStateStoreOpt)) { + String applicationId = cliParser.getOptionValue(removeApplicationFromStateStoreOpt); + handRemoveApplicationFromStateStore(conf, applicationId); + } else { + System.err.println("remove-application-from-state-store requires application arg."); + } + } else { + System.out.println("No related commands found."); + printUsage(System.err); + } + } catch (MissingArgumentException ex) { + System.out.println("Missing argument for options."); + printUsage(System.err); + } catch (ParseException e) { + System.out.println("Parsing of a command-line error."); + printUsage(System.err); + } + } + private static void printUsage(PrintStream out) { out.println("Usage: yarn router [-format-state-store] | " + "[-remove-application-from-state-store ]"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterStoreCommands.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterStoreCommands.java new file mode 100644 index 0000000000..04007ca88d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterStoreCommands.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.junit.Before; +import org.junit.Test; + +public class TestRouterStoreCommands { + + //////////////////////////////// + // Router Constants + //////////////////////////////// + private Configuration conf; + private MemoryFederationStateStore stateStore; + private FederationStateStoreFacade facade; + + @Before + public void setup() throws YarnException { + conf = new YarnConfiguration(); + stateStore = new MemoryFederationStateStore(); + stateStore.init(conf); + facade = FederationStateStoreFacade.getInstance(conf); + facade.reinitialize(stateStore, conf); + } + + @Test + public void testRemoveApplicationFromRouterStateStore() throws Exception { + + // We will design such a unit test. + // We will write the applicationId and subCluster into the stateStore, + // and then remove the application through Router.removeApplication. + // At this time, if we continue to query through the stateStore, + // We will get a prompt that application not exists. + + ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); + SubClusterId homeSubCluster = SubClusterId.newInstance("SC-1"); + ApplicationHomeSubCluster applicationHomeSubCluster = + ApplicationHomeSubCluster.newInstance(appId, homeSubCluster); + AddApplicationHomeSubClusterRequest request = + AddApplicationHomeSubClusterRequest.newInstance(applicationHomeSubCluster); + stateStore.addApplicationHomeSubCluster(request); + Router.removeApplication(conf, appId.toString()); + + GetApplicationHomeSubClusterRequest request1 = + GetApplicationHomeSubClusterRequest.newInstance(appId); + + LambdaTestUtils.intercept(YarnException.class, "Application " + appId + " does not exist.", + () -> stateStore.getApplicationHomeSubCluster(request1)); + } +}