YARN-11547. [Federation] Router Supports Remove individual application records from FederationStateStore. (#6055)
This commit is contained in:
parent
f51162d70b
commit
3de66f5c40
@ -28,6 +28,12 @@
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.DefaultParser;
|
||||
import org.apache.commons.cli.Option;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.cli.MissingArgumentException;
|
||||
import org.apache.commons.lang.time.DurationFormatUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -37,14 +43,16 @@
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
|
||||
import org.apache.hadoop.yarn.server.router.cleaner.SubClusterCleaner;
|
||||
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
|
||||
@ -103,6 +111,9 @@ public class Router extends CompositeService {
|
||||
protected String webAppAddress;
|
||||
private static long clusterTimeStamp = System.currentTimeMillis();
|
||||
private FedAppReportFetcher fetcher = null;
|
||||
private static final String CMD_FORMAT_STATE_STORE = "-format-state-store";
|
||||
private static final String CMD_REMOVE_APPLICATION_FROM_STATE_STORE =
|
||||
"-remove-application-from-state-store";
|
||||
|
||||
/**
|
||||
* Priority of the Router shutdown hook.
|
||||
@ -191,7 +202,7 @@ protected void serviceStop() throws Exception {
|
||||
}
|
||||
|
||||
protected void shutDown() {
|
||||
new Thread(() -> Router.this.stop()).start();
|
||||
new Thread(Router.this::stop).start();
|
||||
}
|
||||
|
||||
protected RouterClientRMService createClientRMProxyService() {
|
||||
@ -292,24 +303,14 @@ public static String getProxyHostAndPort(Configuration conf) {
|
||||
|
||||
public static void main(String[] argv) {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
Thread
|
||||
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
|
||||
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
|
||||
StringUtils.startupShutdownMessage(Router.class, argv, LOG);
|
||||
Router router = new Router();
|
||||
try {
|
||||
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
|
||||
argv = hParser.getRemainingArgs();
|
||||
if (argv.length > 1) {
|
||||
if (argv[0].equals("-format-state-store")) {
|
||||
// TODO: YARN-11548. [Federation] Router Supports Format FederationStateStore.
|
||||
System.err.println("format-state-store is not yet supported.");
|
||||
} else if (argv[0].equals("-remove-application-from-state-store") && argv.length == 2) {
|
||||
// TODO: YARN-11547. [Federation]
|
||||
// Router Supports Remove individual application records from FederationStateStore.
|
||||
System.err.println("remove-application-from-state-store is not yet supported.");
|
||||
} else {
|
||||
printUsage(System.err);
|
||||
}
|
||||
executeRouterCommand(conf, argv);
|
||||
} else {
|
||||
// Remove the old hook if we are rebooting.
|
||||
if (null != routerShutdownHook) {
|
||||
@ -362,6 +363,73 @@ public FedAppReportFetcher getFetcher() {
|
||||
return fetcher;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static void removeApplication(Configuration conf, String applicationId)
|
||||
throws Exception {
|
||||
FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance(conf);
|
||||
ApplicationId removeAppId = ApplicationId.fromString(applicationId);
|
||||
LOG.info("Deleting application {} from state store.", removeAppId);
|
||||
facade.deleteApplicationHomeSubCluster(removeAppId);
|
||||
LOG.info("Application is deleted from state store");
|
||||
}
|
||||
|
||||
private static void handFormatStateStore() {
|
||||
// TODO: YARN-11548. [Federation] Router Supports Format FederationStateStore.
|
||||
System.err.println("format-state-store is not yet supported.");
|
||||
}
|
||||
|
||||
private static void handRemoveApplicationFromStateStore(Configuration conf,
|
||||
String applicationId) {
|
||||
try {
|
||||
removeApplication(conf, applicationId);
|
||||
System.out.println("Application " + applicationId + " is deleted from state store");
|
||||
} catch (Exception e) {
|
||||
System.err.println("Application " + applicationId + " error, exception = " + e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void executeRouterCommand(Configuration conf, String[] args) {
|
||||
// Step1. Define Options.
|
||||
Options opts = new Options();
|
||||
Option formatStateStoreOpt = new Option("format-state-store", false,
|
||||
" Formats the FederationStateStore. " +
|
||||
"This will clear the FederationStateStore and " +
|
||||
"is useful if past applications are no longer needed. " +
|
||||
"This should be run only when the Router is not running.");
|
||||
Option removeApplicationFromStateStoreOpt = new Option("remove-application-from-state-store",
|
||||
false, " Remove the application from FederationStateStore. " +
|
||||
" This should be run only when the Router is not running. ");
|
||||
opts.addOption(formatStateStoreOpt);
|
||||
opts.addOption(removeApplicationFromStateStoreOpt);
|
||||
|
||||
// Step2. Parse Options.
|
||||
try {
|
||||
String cmd = args[0];
|
||||
|
||||
CommandLine cliParser = new DefaultParser().parse(opts, args);
|
||||
|
||||
if (CMD_FORMAT_STATE_STORE.equals(cmd)) {
|
||||
handFormatStateStore();
|
||||
} else if (CMD_REMOVE_APPLICATION_FROM_STATE_STORE.equals(cmd)) {
|
||||
if (cliParser.hasOption(removeApplicationFromStateStoreOpt)) {
|
||||
String applicationId = cliParser.getOptionValue(removeApplicationFromStateStoreOpt);
|
||||
handRemoveApplicationFromStateStore(conf, applicationId);
|
||||
} else {
|
||||
System.err.println("remove-application-from-state-store requires application arg.");
|
||||
}
|
||||
} else {
|
||||
System.out.println("No related commands found.");
|
||||
printUsage(System.err);
|
||||
}
|
||||
} catch (MissingArgumentException ex) {
|
||||
System.out.println("Missing argument for options.");
|
||||
printUsage(System.err);
|
||||
} catch (ParseException e) {
|
||||
System.out.println("Parsing of a command-line error.");
|
||||
printUsage(System.err);
|
||||
}
|
||||
}
|
||||
|
||||
private static void printUsage(PrintStream out) {
|
||||
out.println("Usage: yarn router [-format-state-store] | " +
|
||||
"[-remove-application-from-state-store <appId>]");
|
||||
|
@ -0,0 +1,77 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.router;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestRouterStoreCommands {
|
||||
|
||||
////////////////////////////////
|
||||
// Router Constants
|
||||
////////////////////////////////
|
||||
private Configuration conf;
|
||||
private MemoryFederationStateStore stateStore;
|
||||
private FederationStateStoreFacade facade;
|
||||
|
||||
@Before
|
||||
public void setup() throws YarnException {
|
||||
conf = new YarnConfiguration();
|
||||
stateStore = new MemoryFederationStateStore();
|
||||
stateStore.init(conf);
|
||||
facade = FederationStateStoreFacade.getInstance(conf);
|
||||
facade.reinitialize(stateStore, conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveApplicationFromRouterStateStore() throws Exception {
|
||||
|
||||
// We will design such a unit test.
|
||||
// We will write the applicationId and subCluster into the stateStore,
|
||||
// and then remove the application through Router.removeApplication.
|
||||
// At this time, if we continue to query through the stateStore,
|
||||
// We will get a prompt that application not exists.
|
||||
|
||||
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
|
||||
SubClusterId homeSubCluster = SubClusterId.newInstance("SC-1");
|
||||
ApplicationHomeSubCluster applicationHomeSubCluster =
|
||||
ApplicationHomeSubCluster.newInstance(appId, homeSubCluster);
|
||||
AddApplicationHomeSubClusterRequest request =
|
||||
AddApplicationHomeSubClusterRequest.newInstance(applicationHomeSubCluster);
|
||||
stateStore.addApplicationHomeSubCluster(request);
|
||||
Router.removeApplication(conf, appId.toString());
|
||||
|
||||
GetApplicationHomeSubClusterRequest request1 =
|
||||
GetApplicationHomeSubClusterRequest.newInstance(appId);
|
||||
|
||||
LambdaTestUtils.intercept(YarnException.class, "Application " + appId + " does not exist.",
|
||||
() -> stateStore.getApplicationHomeSubCluster(request1));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user