From ff583d3fa3325029bc691ec22d817aee37e5e85d Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Fri, 1 Jun 2018 14:07:23 -0700 Subject: [PATCH] YARN-8349. Remove YARN registry entries when a service is killed by the RM. (Billie Rinaldi via wangda) Change-Id: Ia58db3637789a8921482f564aa9bdf99c45cc36c --- hadoop-project/pom.xml | 7 + .../hadoop-yarn-services-api/pom.xml | 16 +++ .../yarn/service/client/ApiServiceClient.java | 11 ++ .../yarn/service/TestCleanupAfterKill.java | 94 ++++++++++++ .../src/test/resources/yarn-site.xml | 19 +++ .../yarn/service/client/ServiceClient.java | 24 ++++ .../hadoop/yarn/service/ServiceTestUtils.java | 135 ++++++++++++++++++ .../yarn/service/TestYarnNativeServices.java | 129 ----------------- .../yarn/client/api/AppAdminClient.java | 12 ++ .../client/binding/RegistryUtils.java | 10 ++ .../resourcemanager/rmapp/RMAppImpl.java | 30 ++++ 11 files changed, 358 insertions(+), 129 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestCleanupAfterKill.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/resources/yarn-site.xml rename hadoop-yarn-project/hadoop-yarn/{hadoop-yarn-client => hadoop-yarn-common}/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java (96%) diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 59a9bd2b89..12897a7f3e 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -450,6 +450,13 @@ ${hadoop.version} + + org.apache.hadoop + hadoop-yarn-services-core + ${hadoop.version} + test-jar + + org.apache.hadoop hadoop-mapreduce-client-jobclient diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/pom.xml index 45168a9fbc..ab76218783 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/pom.xml @@ -139,6 +139,22 @@ junit test + + org.apache.hadoop + hadoop-yarn-services-core + test-jar + test + + + org.apache.hadoop + hadoop-minicluster + test + + + org.apache.curator + curator-test + test + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index a8e2f511f8..18d45fae78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -588,6 +588,17 @@ public int actionUpgradeComponents(String appName, List components) return result; } + @Override + public int actionCleanUp(String appName, String userName) throws + IOException, YarnException { + ServiceClient sc = new ServiceClient(); + sc.init(getConfig()); + sc.start(); + int result = sc.actionCleanUp(appName, userName); + sc.close(); + return result; + } + private static final JsonSerDeser CONTAINER_JSON_SERDE = new JsonSerDeser<>(Container[].class, PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestCleanupAfterKill.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestCleanupAfterKill.java new file mode 100644 index 0000000000..51e834a34d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestCleanupAfterKill.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +/** + * Minicluster test that verifies registry cleanup when app lifetime is + * exceeded. + */ +public class TestCleanupAfterKill extends ServiceTestUtils { + private static final Logger LOG = + LoggerFactory.getLogger(TestCleanupAfterKill.class); + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Before + public void setup() throws Exception { + File tmpYarnDir = new File("target", "tmp"); + FileUtils.deleteQuietly(tmpYarnDir); + } + + @After + public void tearDown() throws IOException { + shutdown(); + } + + @Test(timeout = 200000) + public void testRegistryCleanedOnLifetimeExceeded() throws Exception { + setupInternal(NUM_NMS); + ServiceClient client = createClient(getConf()); + Service exampleApp = createExampleApplication(); + exampleApp.setLifetime(30L); + client.actionCreate(exampleApp); + waitForServiceToBeStable(client, exampleApp); + String serviceZKPath = RegistryUtils.servicePath(RegistryUtils + .currentUser(), YarnServiceConstants.APP_TYPE, exampleApp.getName()); + Assert.assertTrue("Registry ZK service path doesn't exist", + getCuratorService().zkPathExists(serviceZKPath)); + + // wait for app to be killed by RM + ApplicationId exampleAppId = ApplicationId.fromString(exampleApp.getId()); + GenericTestUtils.waitFor(() -> { + try { + ApplicationReport ar = client.getYarnClient() + .getApplicationReport(exampleAppId); + return ar.getYarnApplicationState() == YarnApplicationState.KILLED; + } catch (YarnException | IOException e) { + throw new RuntimeException("while waiting", e); + } + }, 2000, 200000); + Assert.assertFalse("Registry ZK service path still exists after killed", + getCuratorService().zkPathExists(serviceZKPath)); + + LOG.info("Destroy the service"); + Assert.assertEquals(0, client.actionDestroy(exampleApp.getName())); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/resources/yarn-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/resources/yarn-site.xml new file mode 100644 index 0000000000..daac23adcd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/resources/yarn-site.xml @@ -0,0 +1,19 @@ + + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index c86f5de594..3f6e8966e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -308,6 +308,16 @@ public int actionUpgradeComponents(String appName, return actionUpgrade(persistedService, containersToUpgrade); } + @Override + public int actionCleanUp(String appName, String userName) throws + IOException, YarnException { + if (cleanUpRegistry(appName, userName)) { + return EXIT_SUCCESS; + } else { + return EXIT_FALSE; + } + } + public int actionUpgrade(Service service, List compInstances) throws IOException, YarnException { ApplicationReport appReport = @@ -639,9 +649,23 @@ public int actionDestroy(String serviceName) throws YarnException, } } + private boolean cleanUpRegistry(String serviceName, String user) throws + SliderException { + String encodedName = RegistryUtils.registryUser(user); + + String registryPath = RegistryUtils.servicePath(encodedName, + YarnServiceConstants.APP_TYPE, serviceName); + return cleanUpRegistryPath(registryPath, serviceName); + } + private boolean cleanUpRegistry(String serviceName) throws SliderException { String registryPath = ServiceRegistryUtils.registryPathForInstance(serviceName); + return cleanUpRegistryPath(registryPath, serviceName); + } + + private boolean cleanUpRegistryPath(String registryPath, String + serviceName) throws SliderException { try { if (getRegistryClient().exists(registryPath)) { getRegistryClient().delete(registryPath, true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 86b4cea649..3d1412dfe7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.service; import com.google.common.base.Throwables; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingCluster; import org.apache.hadoop.conf.Configuration; @@ -29,13 +31,17 @@ import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.registry.client.impl.zk.CuratorService; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.hadoop.yarn.service.exceptions.SliderException; @@ -60,6 +66,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC; @@ -418,4 +425,132 @@ public java.nio.file.Path getServiceBasePath() { return serviceBasePath; } } + + /** + * Wait until all the containers for all components become ready state. + * + * @param client + * @param exampleApp + * @return all ready containers of a service. + * @throws TimeoutException + * @throws InterruptedException + */ + protected Multimap waitForAllCompToBeReady(ServiceClient + client, Service exampleApp) throws TimeoutException, + InterruptedException { + int expectedTotalContainers = countTotalContainers(exampleApp); + + Multimap allContainers = HashMultimap.create(); + + GenericTestUtils.waitFor(() -> { + try { + Service retrievedApp = client.getStatus(exampleApp.getName()); + int totalReadyContainers = 0; + allContainers.clear(); + LOG.info("Num Components " + retrievedApp.getComponents().size()); + for (Component component : retrievedApp.getComponents()) { + LOG.info("looking for " + component.getName()); + LOG.info(component.toString()); + if (component.getContainers() != null) { + if (component.getContainers().size() == exampleApp + .getComponent(component.getName()).getNumberOfContainers()) { + for (Container container : component.getContainers()) { + LOG.info( + "Container state " + container.getState() + ", component " + + component.getName()); + if (container.getState() == ContainerState.READY) { + totalReadyContainers++; + allContainers.put(component.getName(), container.getId()); + LOG.info("Found 1 ready container " + container.getId()); + } + } + } else { + LOG.info(component.getName() + " Expected number of containers " + + exampleApp.getComponent(component.getName()) + .getNumberOfContainers() + ", current = " + component + .getContainers()); + } + } + } + LOG.info("Exit loop, totalReadyContainers= " + totalReadyContainers + + " expected = " + expectedTotalContainers); + return totalReadyContainers == expectedTotalContainers; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + }, 2000, 200000); + return allContainers; + } + + /** + * Wait until service state becomes stable. A service is stable when all + * requested containers of all components are running and in ready state. + * + * @param client + * @param exampleApp + * @throws TimeoutException + * @throws InterruptedException + */ + protected void waitForServiceToBeStable(ServiceClient client, + Service exampleApp) throws TimeoutException, InterruptedException { + waitForServiceToBeStable(client, exampleApp, 200000); + } + + protected void waitForServiceToBeStable(ServiceClient client, + Service exampleApp, int waitForMillis) + throws TimeoutException, InterruptedException { + waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE, + waitForMillis); + } + + /** + * Wait until service is started. It does not have to reach a stable state. + * + * @param client + * @param exampleApp + * @throws TimeoutException + * @throws InterruptedException + */ + protected void waitForServiceToBeStarted(ServiceClient client, + Service exampleApp) throws TimeoutException, InterruptedException { + waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED); + } + + protected void waitForServiceToBeInState(ServiceClient client, + Service exampleApp, ServiceState desiredState) throws TimeoutException, + InterruptedException { + waitForServiceToBeInState(client, exampleApp, desiredState, 200000); + } + + /** + * Wait until service is started. It does not have to reach a stable state. + * + * @param client + * @param exampleApp + * @throws TimeoutException + * @throws InterruptedException + */ + protected void waitForServiceToBeInState(ServiceClient client, + Service exampleApp, ServiceState desiredState, int waitForMillis) throws + TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> { + try { + Service retrievedApp = client.getStatus(exampleApp.getName()); + System.out.println(retrievedApp); + return retrievedApp.getState() == desiredState; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + }, 2000, waitForMillis); + } + + private int countTotalContainers(Service service) { + int totalContainers = 0; + for (Component component : service.getComponents()) { + totalContainers += component.getNumberOfContainers(); + } + return totalContainers; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index ae209b929e..8b13b2495b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.service; -import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; @@ -36,7 +35,6 @@ import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Configuration; import org.apache.hadoop.yarn.service.api.records.Container; -import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.PlacementConstraint; import org.apache.hadoop.yarn.service.api.records.PlacementPolicy; import org.apache.hadoop.yarn.service.api.records.PlacementScope; @@ -806,131 +804,4 @@ private void checkEachCompInstancesInOrder(Component component, String i++; } } - - /** - * Wait until all the containers for all components become ready state. - * - * @param client - * @param exampleApp - * @return all ready containers of a service. - * @throws TimeoutException - * @throws InterruptedException - */ - private Multimap waitForAllCompToBeReady(ServiceClient client, - Service exampleApp) throws TimeoutException, InterruptedException { - int expectedTotalContainers = countTotalContainers(exampleApp); - - Multimap allContainers = HashMultimap.create(); - - GenericTestUtils.waitFor(() -> { - try { - Service retrievedApp = client.getStatus(exampleApp.getName()); - int totalReadyContainers = 0; - allContainers.clear(); - LOG.info("Num Components " + retrievedApp.getComponents().size()); - for (Component component : retrievedApp.getComponents()) { - LOG.info("looking for " + component.getName()); - LOG.info(component.toString()); - if (component.getContainers() != null) { - if (component.getContainers().size() == exampleApp - .getComponent(component.getName()).getNumberOfContainers()) { - for (Container container : component.getContainers()) { - LOG.info( - "Container state " + container.getState() + ", component " - + component.getName()); - if (container.getState() == ContainerState.READY) { - totalReadyContainers++; - allContainers.put(component.getName(), container.getId()); - LOG.info("Found 1 ready container " + container.getId()); - } - } - } else { - LOG.info(component.getName() + " Expected number of containers " - + exampleApp.getComponent(component.getName()) - .getNumberOfContainers() + ", current = " + component - .getContainers()); - } - } - } - LOG.info("Exit loop, totalReadyContainers= " + totalReadyContainers - + " expected = " + expectedTotalContainers); - return totalReadyContainers == expectedTotalContainers; - } catch (Exception e) { - e.printStackTrace(); - return false; - } - }, 2000, 200000); - return allContainers; - } - - /** - * Wait until service state becomes stable. A service is stable when all - * requested containers of all components are running and in ready state. - * - * @param client - * @param exampleApp - * @throws TimeoutException - * @throws InterruptedException - */ - private void waitForServiceToBeStable(ServiceClient client, - Service exampleApp) throws TimeoutException, InterruptedException { - waitForServiceToBeStable(client, exampleApp, 200000); - } - - private void waitForServiceToBeStable(ServiceClient client, - Service exampleApp, int waitForMillis) - throws TimeoutException, InterruptedException { - waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE, - waitForMillis); - } - - /** - * Wait until service is started. It does not have to reach a stable state. - * - * @param client - * @param exampleApp - * @throws TimeoutException - * @throws InterruptedException - */ - private void waitForServiceToBeStarted(ServiceClient client, - Service exampleApp) throws TimeoutException, InterruptedException { - waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED); - } - - private void waitForServiceToBeInState(ServiceClient client, - Service exampleApp, ServiceState desiredState) throws TimeoutException, - InterruptedException { - waitForServiceToBeInState(client, exampleApp, desiredState, 200000); - } - - /** - * Wait until service is started. It does not have to reach a stable state. - * - * @param client - * @param exampleApp - * @throws TimeoutException - * @throws InterruptedException - */ - private void waitForServiceToBeInState(ServiceClient client, - Service exampleApp, ServiceState desiredState, int waitForMillis) throws - TimeoutException, InterruptedException { - GenericTestUtils.waitFor(() -> { - try { - Service retrievedApp = client.getStatus(exampleApp.getName()); - System.out.println(retrievedApp); - return retrievedApp.getState() == desiredState; - } catch (Exception e) { - e.printStackTrace(); - return false; - } - }, 2000, waitForMillis); - } - - private int countTotalContainers(Service service) { - int totalContainers = 0; - for (Component component : service.getComponents()) { - totalContainers += component.getNumberOfContainers(); - } - return totalContainers; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java similarity index 96% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java index 91f899c82a..3cd1a78710 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -270,4 +270,16 @@ public abstract int actionUpgradeInstances(String appName, public abstract int actionUpgradeComponents(String appName, List components) throws IOException, YarnException; + /** + * Operation to be performed by the RM after an application has completed. + * + * @param appName the name of the application. + * @param userName the name of the user. + * @return exit code + */ + @Public + @Unstable + public abstract int actionCleanUp(String appName, String userName) throws + IOException, YarnException; + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java index 4ef7b8d404..fcfc5bf570 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java @@ -296,6 +296,16 @@ public static String getCurrentUsernameUnencoded(String env_hadoop_username) { */ public static String currentUser() { String shortUserName = currentUsernameUnencoded(); + return registryUser(shortUserName); + } + + /** + * Convert the given user name formatted for the registry. + * + * @param shortUserName + * @return converted user name + */ + public static String registryUser(String shortUserName) { String encodedName = encodeForRegistry(shortUserName); // DNS name doesn't allow "_", replace it with "-" encodedName = RegistryUtils.convertUsername(encodedName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 6aee813296..73191562c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -1470,6 +1471,33 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } + /** + * Attempt to perform a type-specific cleanup after application has completed. + * + * @param app application to clean up + */ + static void appAdminClientCleanUp(RMAppImpl app) { + try { + AppAdminClient client = AppAdminClient.createAppAdminClient(app + .applicationType, app.conf); + int result = client.actionCleanUp(app.name, app.user); + if (result == 0) { + LOG.info("Type-specific cleanup of application " + app.applicationId + + " of type " + app.applicationType + " succeeded"); + } else { + LOG.warn("Type-specific cleanup of application " + app.applicationId + + " of type " + app.applicationType + " did not succeed with exit" + + " code " + result); + } + } catch (IllegalArgumentException e) { + // no AppAdminClient class has been specified for the application type, + // so this does not need to be logged + } catch (Exception e) { + LOG.warn("Could not run type-specific cleanup on application " + + app.applicationId + " of type " + app.applicationType, e); + } + } + private static class FinalTransition extends RMAppTransition { private final RMAppState finalState; @@ -1504,6 +1532,8 @@ public void transition(RMAppImpl app, RMAppEvent event) { .appFinished(app, finalState, app.finishTime); // set the memory free app.clearUnusedFields(); + + appAdminClientCleanUp(app); }; }