From e557c6bd8de2811a561210f672f47b4d07a9d5c6 Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Tue, 21 Aug 2018 19:49:26 -0400 Subject: [PATCH] YARN-8298. Added express upgrade for YARN service. Contributed by Chandni Singh --- .../yarn/service/client/ApiServiceClient.java | 20 ++ .../hadoop/yarn/service/webapp/ApiServer.java | 12 +- .../hadoop/yarn/service/ClientAMService.java | 2 +- .../hadoop/yarn/service/ServiceEvent.java | 25 ++ .../hadoop/yarn/service/ServiceManager.java | 127 ++++++-- .../hadoop/yarn/service/ServiceScheduler.java | 15 +- .../service/api/records/ServiceState.java | 2 +- .../yarn/service/client/ServiceClient.java | 120 ++++--- .../yarn/service/component/Component.java | 16 +- .../service/component/ComponentEvent.java | 10 + .../component/instance/ComponentInstance.java | 5 + .../yarn/service/utils/ServiceApiUtil.java | 44 +++ .../src/main/proto/ClientAMProtocol.proto | 1 + .../yarn/service/TestServiceManager.java | 295 +++++++++++------- .../yarn/service/TestYarnNativeServices.java | 35 +++ .../{ => utils}/TestServiceApiUtil.java | 102 +++++- .../yarn/client/cli/ApplicationCLI.java | 20 +- .../hadoop/yarn/client/cli/TestYarnCLI.java | 4 + .../yarn/client/api/AppAdminClient.java | 12 + 19 files changed, 669 insertions(+), 198 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/{ => utils}/TestServiceApiUtil.java (87%) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index 9229446810..ca6cc508b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -600,6 +600,26 @@ public String getStatusString(String appIdOrName) throws IOException, return output; } + @Override + public int actionUpgradeExpress(String appName, File path) + throws IOException, YarnException { + int result; + try { + Service service = + loadAppJsonFromLocalFS(path.getAbsolutePath(), appName, null, null); + service.setState(ServiceState.EXPRESS_UPGRADING); + String buffer = jsonSerDeser.toJson(service); + LOG.info("Upgrade in progress. Please wait.."); + ClientResponse response = getApiClient(getServicePath(appName)) + .put(ClientResponse.class, buffer); + result = processResponse(response); + } catch (Exception e) { + LOG.error("Failed to upgrade application: ", e); + result = EXIT_EXCEPTION_THROWN; + } + return result; + } + @Override public int initiateUpgrade(String appName, String fileName, boolean autoFinalize) throws IOException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index 4db0ac8f40..cd6f0d79e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -440,7 +440,8 @@ public Response updateService(@Context HttpServletRequest request, if (updateServiceData.getState() != null && ( updateServiceData.getState() == ServiceState.UPGRADING || updateServiceData.getState() == - ServiceState.UPGRADING_AUTO_FINALIZE)) { + ServiceState.UPGRADING_AUTO_FINALIZE) || + updateServiceData.getState() == ServiceState.EXPRESS_UPGRADING) { return upgradeService(updateServiceData, ugi); } @@ -690,7 +691,11 @@ private Response upgradeService(Service service, ServiceClient sc = getServiceClient(); sc.init(YARN_CONFIG); sc.start(); - sc.initiateUpgrade(service); + if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) { + sc.actionUpgradeExpress(service); + } else { + sc.initiateUpgrade(service); + } sc.close(); return null; }); @@ -706,7 +711,8 @@ private Response processComponentsUpgrade(UserGroupInformation ugi, String serviceName, Set compNames) throws YarnException, IOException, InterruptedException { Service service = getServiceFromClient(ugi, serviceName); - if (service.getState() != ServiceState.UPGRADING) { + if (!service.getState().equals(ServiceState.UPGRADING) && + !service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { throw new YarnException( String.format("The upgrade of service %s has not been initiated.", service.getName())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index 5bf183319f..2ef8f7ee7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -166,7 +166,7 @@ public UpgradeServiceResponseProto upgrade( LOG.info("Upgrading service to version {} by {}", request.getVersion(), UserGroupInformation.getCurrentUser()); context.getServiceManager().processUpgradeRequest(request.getVersion(), - request.getAutoFinalize()); + request.getAutoFinalize(), request.getExpressUpgrade()); return UpgradeServiceResponseProto.newBuilder().build(); } catch (Exception ex) { return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java index 0196be2a98..3a55472c0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.service.api.records.Component; + +import java.util.Queue; /** * Events are handled by {@link ServiceManager} to manage the service @@ -29,6 +32,8 @@ public class ServiceEvent extends AbstractEvent { private final ServiceEventType type; private String version; private boolean autoFinalize; + private boolean expressUpgrade; + private Queue compsToUpgradeInOrder; public ServiceEvent(ServiceEventType serviceEventType) { super(serviceEventType); @@ -56,4 +61,24 @@ public ServiceEvent setAutoFinalize(boolean autoFinalize) { this.autoFinalize = autoFinalize; return this; } + + public boolean isExpressUpgrade() { + return expressUpgrade; + } + + public ServiceEvent setExpressUpgrade(boolean expressUpgrade) { + this.expressUpgrade = expressUpgrade; + return this; + } + + public Queue getCompsToUpgradeInOrder() { + return compsToUpgradeInOrder; + } + + public ServiceEvent setCompsToUpgradeInOrder( + Queue compsToUpgradeInOrder) { + this.compsToUpgradeInOrder = compsToUpgradeInOrder; + return this; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java index 05ecb3fc9b..04454b1d29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.Component; @@ -40,8 +41,11 @@ import java.io.IOException; import java.text.MessageFormat; import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; @@ -67,6 +71,8 @@ public class ServiceManager implements EventHandler { private final SliderFileSystem fs; private String upgradeVersion; + private Queue compsToUpgradeInOrder; private static final StateMachineFactory STATE_MACHINE_FACTORY = @@ -141,14 +147,20 @@ private static class StartUpgradeTransition implements @Override public State transition(ServiceManager serviceManager, ServiceEvent event) { + serviceManager.upgradeVersion = event.getVersion(); try { - if (!event.isAutoFinalize()) { - serviceManager.serviceSpec.setState(ServiceState.UPGRADING); + if (event.isExpressUpgrade()) { + serviceManager.serviceSpec.setState(ServiceState.EXPRESS_UPGRADING); + serviceManager.compsToUpgradeInOrder = event + .getCompsToUpgradeInOrder(); + serviceManager.upgradeNextCompIfAny(); + } else if (event.isAutoFinalize()) { + serviceManager.serviceSpec.setState(ServiceState + .UPGRADING_AUTO_FINALIZE); } else { serviceManager.serviceSpec.setState( - ServiceState.UPGRADING_AUTO_FINALIZE); + ServiceState.UPGRADING); } - serviceManager.upgradeVersion = event.getVersion(); return State.UPGRADING; } catch (Throwable e) { LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(), @@ -169,8 +181,19 @@ public State transition(ServiceManager serviceManager, if (currState.equals(ServiceState.STABLE)) { return State.STABLE; } + if (currState.equals(ServiceState.EXPRESS_UPGRADING)) { + org.apache.hadoop.yarn.service.api.records.Component component = + serviceManager.compsToUpgradeInOrder.peek(); + if (!component.getState().equals(ComponentState.NEEDS_UPGRADE) && + !component.getState().equals(ComponentState.UPGRADING)) { + serviceManager.compsToUpgradeInOrder.remove(); + } + serviceManager.upgradeNextCompIfAny(); + } if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) || - event.getType().equals(ServiceEventType.START)) { + event.getType().equals(ServiceEventType.START) || + (currState.equals(ServiceState.EXPRESS_UPGRADING) && + serviceManager.compsToUpgradeInOrder.isEmpty())) { ServiceState targetState = checkIfStable(serviceManager.serviceSpec); if (targetState.equals(ServiceState.STABLE)) { if (serviceManager.finalizeUpgrade()) { @@ -184,6 +207,19 @@ public State transition(ServiceManager serviceManager, } } + private void upgradeNextCompIfAny() { + if (!compsToUpgradeInOrder.isEmpty()) { + org.apache.hadoop.yarn.service.api.records.Component component = + compsToUpgradeInOrder.peek(); + + ComponentEvent needUpgradeEvent = new ComponentEvent( + component.getName(), ComponentEventType.UPGRADE).setTargetSpec( + component).setUpgradeVersion(upgradeVersion).setExpressUpgrade(true); + context.scheduler.getDispatcher().getEventHandler().handle( + needUpgradeEvent); + } + } + /** * @return whether finalization of upgrade was successful. */ @@ -250,23 +286,18 @@ public void checkAndUpdateServiceState() { } void processUpgradeRequest(String upgradeVersion, - boolean autoFinalize) throws IOException { + boolean autoFinalize, boolean expressUpgrade) throws IOException { Service targetSpec = ServiceApiUtil.loadServiceUpgrade( context.fs, context.service.getName(), upgradeVersion); List - compsThatNeedUpgrade = componentsFinder. + compsNeedUpgradeList = componentsFinder. findTargetComponentSpecs(context.service, targetSpec); - ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE) - .setVersion(upgradeVersion) - .setAutoFinalize(autoFinalize); - context.scheduler.getDispatcher().getEventHandler().handle(event); - if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) { - if (autoFinalize) { - event.setAutoFinalize(true); - } - compsThatNeedUpgrade.forEach(component -> { + // remove all components from need upgrade list if there restart policy + // doesn't all upgrade. + if (compsNeedUpgradeList != null) { + compsNeedUpgradeList.removeIf(component -> { org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum restartPolicy = component.getRestartPolicy(); @@ -274,25 +305,65 @@ void processUpgradeRequest(String upgradeVersion, Component.getRestartPolicyHandler(restartPolicy); // Do not allow upgrades for components which have NEVER/ON_FAILURE // restart policy - if (restartPolicyHandler.allowUpgrades()) { + if (!restartPolicyHandler.allowUpgrades()) { + LOG.info("The component {} has a restart policy that doesnt " + + "allow upgrades {} ", component.getName(), + component.getRestartPolicy().toString()); + return true; + } + + return false; + }); + } + + ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE) + .setVersion(upgradeVersion) + .setAutoFinalize(autoFinalize) + .setExpressUpgrade(expressUpgrade); + + if (expressUpgrade) { + // In case of express upgrade components need to be upgraded in order. + // Once the service manager gets notified that a component finished + // upgrading, it then issues event to upgrade the next component. + Map + compsNeedUpgradeByName = new HashMap<>(); + if (compsNeedUpgradeList != null) { + compsNeedUpgradeList.forEach(component -> + compsNeedUpgradeByName.put(component.getName(), component)); + } + List resolvedComps = ServiceApiUtil + .resolveCompsDependency(targetSpec); + + Queue + orderedCompUpgrade = new LinkedList<>(); + resolvedComps.forEach(compName -> { + org.apache.hadoop.yarn.service.api.records.Component component = + compsNeedUpgradeByName.get(compName); + if (component != null ) { + orderedCompUpgrade.add(component); + } + }); + event.setCompsToUpgradeInOrder(orderedCompUpgrade); + } + + context.scheduler.getDispatcher().getEventHandler().handle(event); + + if (compsNeedUpgradeList != null && !compsNeedUpgradeList.isEmpty()) { + if (!expressUpgrade) { + compsNeedUpgradeList.forEach(component -> { ComponentEvent needUpgradeEvent = new ComponentEvent( component.getName(), ComponentEventType.UPGRADE).setTargetSpec( component).setUpgradeVersion(event.getVersion()); context.scheduler.getDispatcher().getEventHandler().handle( needUpgradeEvent); - } else { - LOG.info("The component {} has a restart " - + "policy that doesnt allow upgrades {} ", component.getName(), - component.getRestartPolicy().toString()); - } - }); - } else { + + }); + } + } else if (autoFinalize) { // nothing to upgrade if upgrade auto finalize is requested, trigger a // state check. - if (autoFinalize) { - context.scheduler.getDispatcher().getEventHandler().handle( - new ServiceEvent(ServiceEventType.CHECK_STABLE)); - } + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.CHECK_STABLE)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 0801ad052d..384659f8ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -219,7 +219,7 @@ public void buildInstance(ServiceContext context, Configuration configuration) nmClient.getClient().cleanupRunningContainersOnStop(false); addIfService(nmClient); - dispatcher = new AsyncDispatcher("Component dispatcher"); + dispatcher = createAsyncDispatcher(); dispatcher.register(ServiceEventType.class, new ServiceEventHandler()); dispatcher.register(ComponentEventType.class, new ComponentEventHandler()); @@ -253,6 +253,9 @@ public void buildInstance(ServiceContext context, Configuration configuration) YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS, YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS, app.getConfiguration(), getConfig()); + + serviceManager = createServiceManager(); + context.setServiceManager(serviceManager); } protected YarnRegistryViewForProviders createYarnRegistryOperations( @@ -262,6 +265,14 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( context.attemptId); } + protected ServiceManager createServiceManager() { + return new ServiceManager(context); + } + + protected AsyncDispatcher createAsyncDispatcher() { + return new AsyncDispatcher("Component dispatcher"); + } + protected NMClientAsync createNMClient() { return NMClientAsync.createNMClientAsync(new NMClientCallback()); } @@ -344,8 +355,6 @@ public void serviceStart() throws Exception { // Since AM has been started and registered, the service is in STARTED state app.setState(ServiceState.STARTED); - serviceManager = new ServiceManager(context); - context.setServiceManager(serviceManager); // recover components based on containers sent from RM recoverComponents(response); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index b6ae38bdee..0b3c0377fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -30,5 +30,5 @@ @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") public enum ServiceState { ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING, - UPGRADING_AUTO_FINALIZE; + UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 5668d9fa3a..a27ed87aa6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service.client; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -215,6 +216,84 @@ public int actionBuild(Service service) return EXIT_SUCCESS; } + private ApplicationReport upgradePrecheck(Service service) + throws YarnException, IOException { + boolean upgradeEnabled = getConfig().getBoolean( + YARN_SERVICE_UPGRADE_ENABLED, YARN_SERVICE_UPGRADE_ENABLED_DEFAULT); + if (!upgradeEnabled) { + throw new YarnException(ErrorStrings.SERVICE_UPGRADE_DISABLED); + } + Service persistedService = ServiceApiUtil.loadService(fs, + service.getName()); + if (!StringUtils.isEmpty(persistedService.getId())) { + cachedAppInfo.put(persistedService.getName(), + new AppInfo(ApplicationId.fromString(persistedService.getId()), + persistedService.getKerberosPrincipal().getPrincipalName())); + } + + if (persistedService.getVersion().equals(service.getVersion())) { + String message = service.getName() + " is already at version " + + service.getVersion() + ". There is nothing to upgrade."; + LOG.error(message); + throw new YarnException(message); + } + + Service liveService = getStatus(service.getName()); + if (!liveService.getState().equals(ServiceState.STABLE)) { + String message = service.getName() + " is at " + liveService.getState() + + " state and upgrade can only be initiated when service is STABLE."; + LOG.error(message); + throw new YarnException(message); + } + + Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true); + ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); + ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service); + + ApplicationReport appReport = yarnClient + .getApplicationReport(getAppId(service.getName())); + if (StringUtils.isEmpty(appReport.getHost())) { + throw new YarnException(service.getName() + " AM hostname is empty"); + } + return appReport; + } + + @Override + public int actionUpgradeExpress(String appName, File path) + throws IOException, YarnException { + Service service = + loadAppJsonFromLocalFS(path.getAbsolutePath(), appName, null, null); + service.setState(ServiceState.UPGRADING_AUTO_FINALIZE); + actionUpgradeExpress(service); + return EXIT_SUCCESS; + } + + public int actionUpgradeExpress(Service service) throws YarnException, + IOException { + ApplicationReport appReport = upgradePrecheck(service); + ClientAMProtocol proxy = createAMProxy(service.getName(), appReport); + UpgradeServiceRequestProto.Builder requestBuilder = + UpgradeServiceRequestProto.newBuilder(); + requestBuilder.setVersion(service.getVersion()); + if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { + requestBuilder.setAutoFinalize(true); + } + if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) { + requestBuilder.setExpressUpgrade(true); + requestBuilder.setAutoFinalize(true); + } + UpgradeServiceResponseProto responseProto = proxy.upgrade( + requestBuilder.build()); + if (responseProto.hasError()) { + LOG.error("Service {} express upgrade to version {} failed because {}", + service.getName(), service.getVersion(), responseProto.getError()); + throw new YarnException("Failed to express upgrade service " + + service.getName() + " to version " + service.getVersion() + + " because " + responseProto.getError()); + } + return EXIT_SUCCESS; + } + @Override public int initiateUpgrade(String appName, String fileName, boolean autoFinalize) @@ -231,46 +310,7 @@ public int initiateUpgrade(String appName, String fileName, public int initiateUpgrade(Service service) throws YarnException, IOException { - boolean upgradeEnabled = getConfig().getBoolean( - YARN_SERVICE_UPGRADE_ENABLED, - YARN_SERVICE_UPGRADE_ENABLED_DEFAULT); - if (!upgradeEnabled) { - throw new YarnException(ErrorStrings.SERVICE_UPGRADE_DISABLED); - } - Service persistedService = - ServiceApiUtil.loadService(fs, service.getName()); - if (!StringUtils.isEmpty(persistedService.getId())) { - cachedAppInfo.put(persistedService.getName(), new AppInfo( - ApplicationId.fromString(persistedService.getId()), - persistedService.getKerberosPrincipal().getPrincipalName())); - } - - if (persistedService.getVersion().equals(service.getVersion())) { - String message = - service.getName() + " is already at version " + service.getVersion() - + ". There is nothing to upgrade."; - LOG.error(message); - throw new YarnException(message); - } - - Service liveService = getStatus(service.getName()); - if (!liveService.getState().equals(ServiceState.STABLE)) { - String message = service.getName() + " is at " + - liveService.getState() - + " state and upgrade can only be initiated when service is STABLE."; - LOG.error(message); - throw new YarnException(message); - } - - Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true); - ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); - ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service); - - ApplicationReport appReport = - yarnClient.getApplicationReport(getAppId(service.getName())); - if (StringUtils.isEmpty(appReport.getHost())) { - throw new YarnException(service.getName() + " AM hostname is empty"); - } + ApplicationReport appReport = upgradePrecheck(service); ClientAMProtocol proxy = createAMProxy(service.getName(), appReport); UpgradeServiceRequestProto.Builder requestBuilder = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 41a2fcd104..acf3404fe9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import static org.apache.hadoop.yarn.service.api.records.Component @@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; import org.apache.hadoop.yarn.service.ContainerFailureTracker; import org.apache.hadoop.yarn.service.ServiceContext; @@ -546,13 +548,21 @@ private static class ComponentNeedsUpgradeTransition extends BaseTransition { @Override public void transition(Component component, ComponentEvent event) { component.upgradeInProgress.set(true); + component.upgradeEvent = event; component.componentSpec.setState(org.apache.hadoop.yarn.service.api. records.ComponentState.NEEDS_UPGRADE); component.numContainersThatNeedUpgrade.set( component.componentSpec.getNumberOfContainers()); - component.componentSpec.getContainers().forEach(container -> - container.setState(ContainerState.NEEDS_UPGRADE)); - component.upgradeEvent = event; + component.componentSpec.getContainers().forEach(container -> { + container.setState(ContainerState.NEEDS_UPGRADE); + if (event.isExpressUpgrade()) { + ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( + ContainerId.fromString(container.getId()), + ComponentInstanceEventType.UPGRADE); + LOG.info("Upgrade container {}", container.getId()); + component.dispatcher.getEventHandler().handle(upgradeEvent); + } + }); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java index 84caa77b20..643961d505 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java @@ -35,6 +35,7 @@ public class ComponentEvent extends AbstractEvent { private ContainerId containerId; private org.apache.hadoop.yarn.service.api.records.Component targetSpec; private String upgradeVersion; + private boolean expressUpgrade; public ContainerId getContainerId() { return containerId; @@ -113,4 +114,13 @@ public ComponentEvent setUpgradeVersion(String upgradeVersion) { this.upgradeVersion = upgradeVersion; return this; } + + public boolean isExpressUpgrade() { + return expressUpgrade; + } + + public ComponentEvent setExpressUpgrade(boolean expressUpgrade) { + this.expressUpgrade = expressUpgrade; + return this; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 11a6caa901..ed5e68e98f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -380,6 +380,11 @@ private static class ContainerUpgradeTransition extends BaseTransition { @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { + if (!compInstance.containerSpec.getState().equals( + ContainerState.NEEDS_UPGRADE)) { + //nothing to upgrade. this may happen with express upgrade. + return; + } compInstance.containerSpec.setState(ContainerState.UPGRADING); compInstance.component.decContainersReady(false); ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index 9219569327..b588e88ae7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -638,6 +638,32 @@ public static List validateAndResolveCompsUpgrade( return containerNeedUpgrade; } + /** + * Validates the components that are requested are stable for upgrade. + * It returns the instances of the components which are in ready state. + */ + public static List validateAndResolveCompsStable( + Service liveService, Collection compNames) throws YarnException { + Preconditions.checkNotNull(compNames); + HashSet requestedComps = Sets.newHashSet(compNames); + List containerNeedUpgrade = new ArrayList<>(); + for (Component liveComp : liveService.getComponents()) { + if (requestedComps.contains(liveComp.getName())) { + if (!liveComp.getState().equals(ComponentState.STABLE)) { + // Nothing to upgrade + throw new YarnException(String.format( + ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName())); + } + liveComp.getContainers().forEach(liveContainer -> { + if (liveContainer.getState().equals(ContainerState.READY)) { + containerNeedUpgrade.add(liveContainer); + } + }); + } + } + return containerNeedUpgrade; + } + private static String parseComponentName(String componentInstanceName) throws YarnException { int idx = componentInstanceName.lastIndexOf('-'); @@ -651,4 +677,22 @@ private static String parseComponentName(String componentInstanceName) public static String $(String s) { return "${" + s +"}"; } + + public static List resolveCompsDependency(Service service) { + List components = new ArrayList(); + for (Component component : service.getComponents()) { + int depSize = component.getDependencies().size(); + if (!components.contains(component.getName())) { + components.add(component.getName()); + } + if (depSize != 0) { + for (String depComp : component.getDependencies()) { + if (!components.contains(depComp)) { + components.add(0, depComp); + } + } + } + } + return components; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto index 6166dedd1d..169f765b8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto @@ -66,6 +66,7 @@ message StopResponseProto { message UpgradeServiceRequestProto { optional string version = 1; optional bool autoFinalize = 2; + optional bool expressUpgrade = 3; } message UpgradeServiceResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java index fc509f1942..a37cabe38c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java @@ -19,23 +19,26 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.exceptions.SliderException; -import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import java.io.IOException; -import java.util.Map; - -import static org.mockito.Mockito.mock; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeoutException; /** * Tests for {@link ServiceManager}. @@ -46,117 +49,120 @@ public class TestServiceManager { public ServiceTestUtils.ServiceFSWatcher rule = new ServiceTestUtils.ServiceFSWatcher(); - @Test - public void testUpgrade() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager("testUpgrade"); - upgrade(serviceManager, "v2", false, false); + @Test (timeout = TIMEOUT) + public void testUpgrade() throws Exception { + ServiceContext context = createServiceContext("testUpgrade"); + initUpgrade(context, "v2", false, false, false); Assert.assertEquals("service not upgraded", ServiceState.UPGRADING, - serviceManager.getServiceSpec().getState()); + context.getServiceManager().getServiceSpec().getState()); } - @Test + @Test (timeout = TIMEOUT) public void testRestartNothingToUpgrade() - throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager( + throws Exception { + ServiceContext context = createServiceContext( "testRestartNothingToUpgrade"); - upgrade(serviceManager, "v2", false, false); + initUpgrade(context, "v2", false, false, false); + ServiceManager manager = context.getServiceManager(); + //make components stable by upgrading all instances + upgradeAllInstances(context); - //make components stable - serviceManager.getServiceSpec().getComponents().forEach(comp -> { - comp.setState(ComponentState.STABLE); - }); - serviceManager.handle(new ServiceEvent(ServiceEventType.START)); + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.START)); + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); Assert.assertEquals("service not re-started", ServiceState.STABLE, - serviceManager.getServiceSpec().getState()); + manager.getServiceSpec().getState()); } - @Test - public void testAutoFinalizeNothingToUpgrade() throws IOException, - SliderException { - ServiceManager serviceManager = createTestServiceManager( + @Test(timeout = TIMEOUT) + public void testAutoFinalizeNothingToUpgrade() throws Exception { + ServiceContext context = createServiceContext( "testAutoFinalizeNothingToUpgrade"); - upgrade(serviceManager, "v2", false, true); + initUpgrade(context, "v2", false, true, false); + ServiceManager manager = context.getServiceManager(); + //make components stable by upgrading all instances + upgradeAllInstances(context); - //make components stable - serviceManager.getServiceSpec().getComponents().forEach(comp -> - comp.setState(ComponentState.STABLE)); - serviceManager.handle(new ServiceEvent(ServiceEventType.CHECK_STABLE)); + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); Assert.assertEquals("service stable", ServiceState.STABLE, - serviceManager.getServiceSpec().getState()); + manager.getServiceSpec().getState()); } - @Test + @Test(timeout = TIMEOUT) public void testRestartWithPendingUpgrade() - throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager("testRestart"); - upgrade(serviceManager, "v2", true, false); - serviceManager.handle(new ServiceEvent(ServiceEventType.START)); + throws Exception { + ServiceContext context = createServiceContext("testRestart"); + initUpgrade(context, "v2", true, false, false); + ServiceManager manager = context.getServiceManager(); + + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.START)); + context.scheduler.getDispatcher().stop(); Assert.assertEquals("service should still be upgrading", - ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); + ServiceState.UPGRADING, manager.getServiceSpec().getState()); } - @Test - public void testCheckState() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager( - "testCheckState"); - upgrade(serviceManager, "v2", true, false); + @Test(timeout = TIMEOUT) + public void testFinalize() throws Exception { + ServiceContext context = createServiceContext("testCheckState"); + initUpgrade(context, "v2", true, false, false); + ServiceManager manager = context.getServiceManager(); Assert.assertEquals("service not upgrading", ServiceState.UPGRADING, - serviceManager.getServiceSpec().getState()); + manager.getServiceSpec().getState()); - // make components stable - serviceManager.getServiceSpec().getComponents().forEach(comp -> { - comp.setState(ComponentState.STABLE); - }); - ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE); - serviceManager.handle(checkStable); - Assert.assertEquals("service should still be upgrading", - ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); + //make components stable by upgrading all instances + upgradeAllInstances(context); // finalize service - ServiceEvent restart = new ServiceEvent(ServiceEventType.START); - serviceManager.handle(restart); - Assert.assertEquals("service not stable", - ServiceState.STABLE, serviceManager.getServiceSpec().getState()); + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.START)); + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); + Assert.assertEquals("service not re-started", ServiceState.STABLE, + manager.getServiceSpec().getState()); - validateUpgradeFinalization(serviceManager.getName(), "v2"); + validateUpgradeFinalization(manager.getName(), "v2"); } - @Test - public void testCheckStateAutoFinalize() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager( - "testCheckState"); - serviceManager.getServiceSpec().setState( + @Test(timeout = TIMEOUT) + public void testAutoFinalize() throws Exception { + ServiceContext context = createServiceContext("testCheckStateAutoFinalize"); + ServiceManager manager = context.getServiceManager(); + manager.getServiceSpec().setState( ServiceState.UPGRADING_AUTO_FINALIZE); - upgrade(serviceManager, "v2", true, true); - Assert.assertEquals("service not upgrading", - ServiceState.UPGRADING_AUTO_FINALIZE, - serviceManager.getServiceSpec().getState()); + initUpgrade(context, "v2", true, true, false); // make components stable - serviceManager.getServiceSpec().getComponents().forEach(comp -> - comp.setState(ComponentState.STABLE)); - ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE); - serviceManager.handle(checkStable); - Assert.assertEquals("service not stable", - ServiceState.STABLE, serviceManager.getServiceSpec().getState()); + upgradeAllInstances(context); - validateUpgradeFinalization(serviceManager.getName(), "v2"); + GenericTestUtils.waitFor(() -> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); + Assert.assertEquals("service not stable", + ServiceState.STABLE, manager.getServiceSpec().getState()); + + validateUpgradeFinalization(manager.getName(), "v2"); } @Test - public void testInvalidUpgrade() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager( - "testInvalidUpgrade"); - serviceManager.getServiceSpec().setState( + public void testInvalidUpgrade() throws Exception { + ServiceContext serviceContext = createServiceContext("testInvalidUpgrade"); + ServiceManager manager = serviceContext.getServiceManager(); + manager.getServiceSpec().setState( ServiceState.UPGRADING_AUTO_FINALIZE); Service upgradedDef = ServiceTestUtils.createExampleApplication(); - upgradedDef.setName(serviceManager.getName()); + upgradedDef.setName(manager.getName()); upgradedDef.setVersion("v2"); upgradedDef.setLifetime(2L); writeUpgradedDef(upgradedDef); try { - serviceManager.processUpgradeRequest("v2", true); + manager.processUpgradeRequest("v2", true, false); } catch (Exception ex) { Assert.assertTrue(ex instanceof UnsupportedOperationException); return; @@ -164,6 +170,32 @@ public void testInvalidUpgrade() throws IOException, SliderException { Assert.fail(); } + @Test(timeout = TIMEOUT) + public void testExpressUpgrade() throws Exception { + ServiceContext context = createServiceContext("testExpressUpgrade"); + ServiceManager manager = context.getServiceManager(); + manager.getServiceSpec().setState( + ServiceState.EXPRESS_UPGRADING); + initUpgrade(context, "v2", true, true, true); + + List comps = ServiceApiUtil.resolveCompsDependency(context.service); + // wait till instances of first component are in upgrade + String comp1 = comps.get(0); + upgradeInstancesOf(context, comp1); + + // wait till instances of second component are in upgrade + String comp2 = comps.get(1); + upgradeInstancesOf(context, comp2); + + GenericTestUtils.waitFor(() -> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); + + Assert.assertEquals("service not stable", + ServiceState.STABLE, manager.getServiceSpec().getState()); + validateUpgradeFinalization(manager.getName(), "v2"); + } + private void validateUpgradeFinalization(String serviceName, String expectedVersion) throws IOException { Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName); @@ -172,15 +204,16 @@ private void validateUpgradeFinalization(String serviceName, Assert.assertNotNull("app id not present", savedSpec.getId()); Assert.assertEquals("state not stable", ServiceState.STABLE, savedSpec.getState()); - savedSpec.getComponents().forEach(compSpec -> { - Assert.assertEquals("comp not stable", ComponentState.STABLE, - compSpec.getState()); - }); + savedSpec.getComponents().forEach(compSpec -> + Assert.assertEquals("comp not stable", ComponentState.STABLE, + compSpec.getState())); } - private void upgrade(ServiceManager serviceManager, String version, - boolean upgradeArtifact, boolean autoFinalize) - throws IOException, SliderException { + private void initUpgrade(ServiceContext context, String version, + boolean upgradeArtifact, boolean autoFinalize, boolean expressUpgrade) + throws IOException, SliderException, TimeoutException, + InterruptedException { + ServiceManager serviceManager = context.getServiceManager(); Service upgradedDef = ServiceTestUtils.createExampleApplication(); upgradedDef.setName(serviceManager.getName()); upgradedDef.setVersion(version); @@ -191,39 +224,81 @@ private void upgrade(ServiceManager serviceManager, String version, }); } writeUpgradedDef(upgradedDef); - serviceManager.processUpgradeRequest(version, autoFinalize); + serviceManager.processUpgradeRequest(version, autoFinalize, expressUpgrade); ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE); - upgradeEvent.setVersion(version); - if (autoFinalize) { - upgradeEvent.setAutoFinalize(true); - } - serviceManager.handle(upgradeEvent); + upgradeEvent.setVersion(version).setExpressUpgrade(expressUpgrade) + .setAutoFinalize(autoFinalize); + + GenericTestUtils.waitFor(()-> { + ServiceState serviceState = context.service.getState(); + if (serviceState.equals(ServiceState.UPGRADING) || + serviceState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) || + serviceState.equals(ServiceState.EXPRESS_UPGRADING)) { + return true; + } + return false; + }, CHECK_EVERY_MILLIS, TIMEOUT); } - private ServiceManager createTestServiceManager(String name) - throws IOException { - ServiceContext context = new ServiceContext(); - context.service = createBaseDef(name); - context.fs = rule.getFs(); + private void upgradeAllInstances(ServiceContext context) throws + TimeoutException, InterruptedException { + // upgrade the instances + context.scheduler.getLiveInstances().forEach(((containerId, instance) -> { + ComponentInstanceEvent event = new ComponentInstanceEvent(containerId, + ComponentInstanceEventType.UPGRADE); + context.scheduler.getDispatcher().getEventHandler().handle(event); + })); - context.scheduler = new ServiceScheduler(context) { - @Override - protected YarnRegistryViewForProviders createYarnRegistryOperations( - ServiceContext context, RegistryOperations registryClient) { - return mock(YarnRegistryViewForProviders.class); + // become ready + context.scheduler.getLiveInstances().forEach(((containerId, instance) -> { + ComponentInstanceEvent event = new ComponentInstanceEvent(containerId, + ComponentInstanceEventType.BECOME_READY); + + context.scheduler.getDispatcher().getEventHandler().handle(event); + })); + GenericTestUtils.waitFor(()-> { + for (ComponentInstance instance: + context.scheduler.getLiveInstances().values()) { + if (!instance.getContainerState().equals(ContainerState.READY)) { + return false; + } } - }; + return true; + }, CHECK_EVERY_MILLIS, TIMEOUT); + } - context.scheduler.init(rule.getConf()); + private void upgradeInstancesOf(ServiceContext context, String compName) + throws TimeoutException, InterruptedException { + Collection compInstances = context.scheduler + .getAllComponents().get(compName).getAllComponentInstances(); + GenericTestUtils.waitFor(() -> { + for (ComponentInstance instance : compInstances) { + if (!instance.getContainerState().equals(ContainerState.UPGRADING)) { + return false; + } + } + return true; + }, CHECK_EVERY_MILLIS, TIMEOUT); - Map - componentState = context.scheduler.getAllComponents(); - context.service.getComponents().forEach(component -> { - componentState.put(component.getName(), - new org.apache.hadoop.yarn.service.component.Component(component, - 1L, context)); + // instances of comp1 get upgraded and become ready event is triggered + // become ready + compInstances.forEach(instance -> { + ComponentInstanceEvent event = new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.BECOME_READY); + + context.scheduler.getDispatcher().getEventHandler().handle(event); }); - return new ServiceManager(context); + } + + private ServiceContext createServiceContext(String name) + throws Exception { + Service service = createBaseDef(name); + ServiceContext context = new MockRunningServiceContext(rule, + service); + context.scheduler.getDispatcher().setDrainEventsOnStop(); + context.scheduler.getDispatcher().start(); + return context; } public static Service createBaseDef(String name) { @@ -257,4 +332,6 @@ private void writeUpgradedDef(Service upgradedDef) upgradedDef); } + private static final int TIMEOUT = 200000; + private static final int CHECK_EVERY_MILLIS = 100; } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 8b13b2495b..216d88fc4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -415,6 +415,41 @@ public void testUpgrade() throws Exception { client.actionDestroy(service.getName()); } + @Test(timeout = 200000) + public void testExpressUpgrade() throws Exception { + setupInternal(NUM_NMS); + getConf().setBoolean(YARN_SERVICE_UPGRADE_ENABLED, true); + ServiceClient client = createClient(getConf()); + + Service service = createExampleApplication(); + client.actionCreate(service); + waitForServiceToBeStable(client, service); + + // upgrade the service + Component component = service.getComponents().iterator().next(); + service.setState(ServiceState.EXPRESS_UPGRADING); + service.setVersion("v2"); + component.getConfiguration().getEnv().put("key1", "val1"); + Component component2 = service.getComponent("compb"); + component2.getConfiguration().getEnv().put("key2", "val2"); + client.actionUpgradeExpress(service); + + // wait for upgrade to complete + waitForServiceToBeStable(client, service); + Service active = client.getStatus(service.getName()); + Assert.assertEquals("component not stable", ComponentState.STABLE, + active.getComponent(component.getName()).getState()); + Assert.assertEquals("compa does not have new env", "val1", + active.getComponent(component.getName()).getConfiguration() + .getEnv("key1")); + Assert.assertEquals("compb does not have new env", "val2", + active.getComponent(component2.getName()).getConfiguration() + .getEnv("key2")); + LOG.info("Stop/destroy service {}", service); + client.actionStop(service.getName(), true); + client.actionDestroy(service.getName()); + } + // Test to verify ANTI_AFFINITY placement policy // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler // 2. Create an example service with 3 containers diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestServiceApiUtil.java similarity index 87% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestServiceApiUtil.java index c2a80e7fa7..98e7474237 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestServiceApiUtil.java @@ -15,11 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.yarn.service; +package org.apache.hadoop.yarn.service.utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.registry.client.api.RegistryConstants; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.service.ServiceTestUtils; import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal; @@ -30,8 +31,6 @@ import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages; -import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; -import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -39,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -53,7 +53,7 @@ /** * Test for ServiceApiUtil helper methods. */ -public class TestServiceApiUtil { +public class TestServiceApiUtil extends ServiceTestUtils { private static final Logger LOG = LoggerFactory .getLogger(TestServiceApiUtil.class); private static final String EXCEPTION_PREFIX = "Should have thrown " + @@ -635,10 +635,12 @@ public void testKerberosPrincipalNameFormat() throws IOException { try { ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal()); - Assert.fail(EXCEPTION_PREFIX + "service with invalid principal name format."); + Assert.fail(EXCEPTION_PREFIX + "service with invalid principal name " + + "format."); } catch (IllegalArgumentException e) { assertEquals( - String.format(RestApiErrorMessages.ERROR_KERBEROS_PRINCIPAL_NAME_FORMAT, + String.format( + RestApiErrorMessages.ERROR_KERBEROS_PRINCIPAL_NAME_FORMAT, kp.getPrincipalName()), e.getMessage()); } @@ -650,4 +652,92 @@ public void testKerberosPrincipalNameFormat() throws IOException { Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); } } + + @Test + public void testResolveCompsDependency() { + Service service = createExampleApplication(); + List dependencies = new ArrayList(); + dependencies.add("compb"); + Component compa = createComponent("compa"); + compa.setDependencies(dependencies); + Component compb = createComponent("compb"); + service.addComponent(compa); + service.addComponent(compb); + List order = ServiceApiUtil.resolveCompsDependency(service); + List expected = new ArrayList(); + expected.add("compb"); + expected.add("compa"); + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals("Components are not equal.", expected.get(i), + order.get(i)); + } + } + + @Test + public void testResolveCompsDependencyReversed() { + Service service = createExampleApplication(); + List dependencies = new ArrayList(); + dependencies.add("compa"); + Component compa = createComponent("compa"); + Component compb = createComponent("compb"); + compb.setDependencies(dependencies); + service.addComponent(compa); + service.addComponent(compb); + List order = ServiceApiUtil.resolveCompsDependency(service); + List expected = new ArrayList(); + expected.add("compa"); + expected.add("compb"); + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals("Components are not equal.", expected.get(i), + order.get(i)); + } + } + + @Test + public void testResolveCompsCircularDependency() { + Service service = createExampleApplication(); + List dependencies = new ArrayList(); + List dependencies2 = new ArrayList(); + dependencies.add("compb"); + dependencies2.add("compa"); + Component compa = createComponent("compa"); + compa.setDependencies(dependencies); + Component compb = createComponent("compb"); + compa.setDependencies(dependencies2); + service.addComponent(compa); + service.addComponent(compb); + List order = ServiceApiUtil.resolveCompsDependency(service); + List expected = new ArrayList(); + expected.add("compa"); + expected.add("compb"); + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals("Components are not equal.", expected.get(i), + order.get(i)); + } + } + + @Test + public void testResolveNoCompsDependency() { + Service service = createExampleApplication(); + Component compa = createComponent("compa"); + Component compb = createComponent("compb"); + service.addComponent(compa); + service.addComponent(compb); + List order = ServiceApiUtil.resolveCompsDependency(service); + List expected = new ArrayList(); + expected.add("compa"); + expected.add("compb"); + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals("Components are not equal.", expected.get(i), + order.get(i)); + } + } + + public static Service createExampleApplication() { + + Service exampleApp = new Service(); + exampleApp.setName("example-app"); + exampleApp.setVersion("v1"); + return exampleApp; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 807938c7f0..a0e4e02b0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.client.cli; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; @@ -100,6 +101,7 @@ public class ApplicationCLI extends YarnCLI { public static final String COMPONENT = "component"; public static final String ENABLE_FAST_LAUNCH = "enableFastLaunch"; public static final String UPGRADE_CMD = "upgrade"; + public static final String UPGRADE_EXPRESS = "express"; public static final String UPGRADE_INITIATE = "initiate"; public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize"; public static final String UPGRADE_FINALIZE = "finalize"; @@ -247,6 +249,9 @@ public int run(String[] args) throws Exception { opts.addOption(UPGRADE_CMD, true, "Upgrades an application/long-" + "running service. It requires either -initiate, -instances, or " + "-finalize options."); + opts.addOption(UPGRADE_EXPRESS, true, "Works with -upgrade option to " + + "perform express upgrade. It requires the upgraded application " + + "specification file."); opts.addOption(UPGRADE_INITIATE, true, "Works with -upgrade option to " + "initiate the application upgrade. It requires the upgraded " + "application specification file."); @@ -639,9 +644,9 @@ public int run(String[] args) throws Exception { moveApplicationAcrossQueues(cliParser.getOptionValue(APP_ID), cliParser.getOptionValue(CHANGE_APPLICATION_QUEUE)); } else if (cliParser.hasOption(UPGRADE_CMD)) { - if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_INITIATE, - UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, COMPONENT_INSTS, COMPONENTS, - APP_TYPE_CMD)) { + if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_EXPRESS, + UPGRADE_INITIATE, UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, + COMPONENT_INSTS, COMPONENTS, APP_TYPE_CMD)) { printUsage(title, opts); return exitCode; } @@ -649,7 +654,14 @@ public int run(String[] args) throws Exception { AppAdminClient client = AppAdminClient.createAppAdminClient(appType, getConf()); String appName = cliParser.getOptionValue(UPGRADE_CMD); - if (cliParser.hasOption(UPGRADE_INITIATE)) { + if (cliParser.hasOption(UPGRADE_EXPRESS)) { + File file = new File(cliParser.getOptionValue(UPGRADE_EXPRESS)); + if (!file.exists()) { + System.err.println(file.getAbsolutePath() + " does not exist."); + return exitCode; + } + return client.actionUpgradeExpress(appName, file); + } else if (cliParser.hasOption(UPGRADE_INITIATE)) { if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_INITIATE, UPGRADE_AUTO_FINALIZE, APP_TYPE_CMD)) { printUsage(title, opts); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 526adfd3ae..20c9603291 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -2161,6 +2161,10 @@ private String createApplicationCLIHelpMessage() throws IOException { pw.println(" Optionally a destination folder"); pw.println(" for the tarball can be"); pw.println(" specified."); + pw.println(" -express Works with -upgrade option to"); + pw.println(" perform express upgrade. It"); + pw.println(" requires the upgraded"); + pw.println(" application specification file."); pw.println(" -finalize Works with -upgrade option to"); pw.println(" finalize the upgrade."); pw.println(" -flex Changes number of running"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java index 3fb4778327..232666db7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; @@ -288,4 +289,15 @@ public abstract String getInstances(String appName, List components, String version, List containerStates) throws IOException, YarnException; + /** + * Express upgrade a long running service. + * + * @param appName the name of the application + * @param fileName specification of application upgrade to save. + * @return exit code + */ + @Public + @Unstable + public abstract int actionUpgradeExpress(String appName, File fileName) + throws IOException, YarnException; }