From 8d3b39de89809f5eed06f85f00ab223e2f75e49c Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Tue, 15 May 2018 20:40:39 -0400 Subject: [PATCH] YARN-8081. Add support to upgrade a component. Contributed by Chandni Singh --- .../yarn/service/client/ApiServiceClient.java | 51 +++++++- .../hadoop/yarn/service/webapp/ApiServer.java | 114 ++++++++++++++---- .../yarn/service/ServiceClientTest.java | 41 +++++-- .../hadoop/yarn/service/TestApiServer.java | 71 ++++++++++- .../service/client/TestApiServiceClient.java | 12 ++ .../service/api/records/ComponentState.java | 2 +- .../yarn/service/client/ServiceClient.java | 11 ++ .../yarn/service/conf/RestApiConstants.java | 2 + .../exceptions/RestApiErrorMessages.java | 6 + .../yarn/service/utils/ServiceApiUtil.java | 53 +++++++- .../yarn/service/client/TestServiceCLI.java | 18 +++ .../yarn/client/api/AppAdminClient.java | 12 ++ .../yarn/client/cli/ApplicationCLI.java | 17 ++- .../hadoop/yarn/client/cli/TestYarnCLI.java | 3 + 14 files changed, 368 insertions(+), 45 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index 757e6646f8..a8e2f511f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; @@ -170,6 +171,22 @@ private String getInstancesPath(String appName) throws IOException { return api.toString(); } + private String getComponentsPath(String appName) throws IOException { + Preconditions.checkNotNull(appName); + String url = getRMWebAddress(); + StringBuilder api = new StringBuilder(); + api.append(url); + api.append("/app/v1/services/").append(appName).append("/") + .append(RestApiConstants.COMPONENTS); + Configuration conf = getConfig(); + if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase( + "simple")) { + api.append("?user.name=" + UrlEncoded + .encodeString(System.getProperty("user.name"))); + } + return api.toString(); + } + private Builder getApiClient() throws IOException { return getApiClient(getServicePath(null)); } @@ -536,7 +553,7 @@ public int actionUpgradeInstances(String appName, List compInstances) container.setState(ContainerState.UPGRADING); toUpgrade[idx++] = container; } - String buffer = containerJsonSerde.toJson(toUpgrade); + String buffer = CONTAINER_JSON_SERDE.toJson(toUpgrade); ClientResponse response = getApiClient(getInstancesPath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); @@ -547,7 +564,35 @@ public int actionUpgradeInstances(String appName, List compInstances) return result; } - private static JsonSerDeser containerJsonSerde = + @Override + public int actionUpgradeComponents(String appName, List components) + throws IOException, YarnException { + int result; + Component[] toUpgrade = new Component[components.size()]; + try { + int idx = 0; + for (String compName : components) { + Component component = new Component(); + component.setName(compName); + component.setState(ComponentState.UPGRADING); + toUpgrade[idx++] = component; + } + String buffer = COMP_JSON_SERDE.toJson(toUpgrade); + ClientResponse response = getApiClient(getComponentsPath(appName)) + .put(ClientResponse.class, buffer); + result = processResponse(response); + } catch (Exception e) { + LOG.error("Failed to upgrade components: ", e); + result = EXIT_EXCEPTION_THROWN; + } + return result; + } + + private static final JsonSerDeser CONTAINER_JSON_SERDE = new JsonSerDeser<>(Container[].class, - PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + + private static final JsonSerDeser COMP_JSON_SERDE = + new JsonSerDeser<>(Component[].class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index 8c7c0ee6aa..46c9abe6d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -17,7 +17,9 @@ package org.apache.hadoop.yarn.service.webapp; +import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -29,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; @@ -61,8 +64,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED; @@ -306,6 +311,42 @@ public Integer run() throws Exception { return formatResponse(Status.OK, serviceStatus); } + @PUT + @Path(COMPONENTS_PATH) + @Consumes({MediaType.APPLICATION_JSON}) + @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN}) + public Response updateComponents(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String serviceName, + List requestComponents) { + + try { + if (requestComponents == null || requestComponents.isEmpty()) { + throw new YarnException("No components provided."); + } + UserGroupInformation ugi = getProxyUser(request); + Set compNamesToUpgrade = new HashSet<>(); + requestComponents.forEach(reqComp -> { + if (reqComp.getState() != null && + reqComp.getState().equals(ComponentState.UPGRADING)) { + compNamesToUpgrade.add(reqComp.getName()); + } + }); + LOG.info("PUT: upgrade components {} for service {} " + + "user = {}", compNamesToUpgrade, serviceName, ugi); + return processComponentsUpgrade(ugi, serviceName, compNamesToUpgrade); + } catch (AccessControlException e) { + return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); + } catch (YarnException e) { + return formatResponse(Response.Status.BAD_REQUEST, e.getMessage()); + } catch (IOException | InterruptedException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getMessage()); + } catch (UndeclaredThrowableException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getCause().getMessage()); + } + } + @PUT @Path(COMPONENT_PATH) @Consumes({ MediaType.APPLICATION_JSON }) @@ -326,6 +367,15 @@ public Response updateComponent(@Context HttpServletRequest request, + componentName + ")"; throw new YarnException(msg); } + UserGroupInformation ugi = getProxyUser(request); + if (component.getState() != null && + component.getState().equals(ComponentState.UPGRADING)) { + LOG.info("PUT: upgrade component {} for service {} " + + "user = {}", component.getName(), appName, ugi); + return processComponentsUpgrade(ugi, appName, + Sets.newHashSet(componentName)); + } + if (component.getNumberOfContainers() == null) { throw new YarnException("No container count provided"); } @@ -334,7 +384,6 @@ public Response updateComponent(@Context HttpServletRequest request, + component.getNumberOfContainers(); throw new YarnException(message); } - UserGroupInformation ugi = getProxyUser(request); Map original = ugi .doAs(new PrivilegedExceptionAction>() { @Override @@ -472,7 +521,7 @@ public Response updateComponentInstance(@Context HttpServletRequest request, if (reqContainer.getState() != null && reqContainer.getState().equals(ContainerState.UPGRADING)) { - return processContainerUpgrade(ugi, service, + return processContainersUpgrade(ugi, service, Lists.newArrayList(liveContainer)); } } catch (AccessControlException e) { @@ -517,7 +566,7 @@ public Response updateComponentInstances(@Context HttpServletRequest request, List liveContainers = ServiceApiUtil .getLiveContainers(service, toUpgrade); - return processContainerUpgrade(ugi, service, liveContainers); + return processContainersUpgrade(ugi, service, liveContainers); } } catch (AccessControlException e) { return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); @@ -629,7 +678,29 @@ private Response upgradeService(Service service, return formatResponse(Status.ACCEPTED, status); } - private Response processContainerUpgrade(UserGroupInformation ugi, + private Response processComponentsUpgrade(UserGroupInformation ugi, + String serviceName, Set compNames) throws YarnException, + IOException, InterruptedException { + Service service = getServiceFromClient(ugi, serviceName); + if (service.getState() != ServiceState.UPGRADING) { + throw new YarnException( + String.format("The upgrade of service %s has not been initiated.", + service.getName())); + } + List containersToUpgrade = ServiceApiUtil + .validateAndResolveCompsUpgrade(service, compNames); + Integer result = invokeContainersUpgrade(ugi, service, containersToUpgrade); + if (result == EXIT_SUCCESS) { + ServiceStatus status = new ServiceStatus(); + status.setDiagnostics( + "Upgrading components " + Joiner.on(',').join(compNames) + "."); + return formatResponse(Response.Status.ACCEPTED, status); + } + // If result is not a success, consider it a no-op + return Response.status(Response.Status.NO_CONTENT).build(); + } + + private Response processContainersUpgrade(UserGroupInformation ugi, Service service, List containers) throws YarnException, IOException, InterruptedException { @@ -638,25 +709,8 @@ private Response processContainerUpgrade(UserGroupInformation ugi, String.format("The upgrade of service %s has not been initiated.", service.getName())); } - for (Container liveContainer : containers) { - if (liveContainer.getState() != ContainerState.NEEDS_UPGRADE) { - // Nothing to upgrade - throw new YarnException(String.format( - "The component instance (%s) does not need an upgrade.", - liveContainer.getComponentInstanceName())); - } - } - - Integer result = ugi.doAs((PrivilegedExceptionAction) () -> { - int result1; - ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - result1 = sc.actionUpgrade(service, containers); - sc.close(); - return result1; - }); - + ServiceApiUtil.validateInstancesUpgrade(containers); + Integer result = invokeContainersUpgrade(ugi, service, containers); if (result == EXIT_SUCCESS) { ServiceStatus status = new ServiceStatus(); status.setDiagnostics( @@ -668,6 +722,20 @@ private Response processContainerUpgrade(UserGroupInformation ugi, return Response.status(Response.Status.NO_CONTENT).build(); } + private int invokeContainersUpgrade(UserGroupInformation ugi, + Service service, List containers) throws IOException, + InterruptedException { + return ugi.doAs((PrivilegedExceptionAction) () -> { + int result1; + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + result1 = sc.actionUpgrade(service, containers); + sc.close(); + return result1; + }); + } + private Service getServiceFromClient(UserGroupInformation ugi, String serviceName) throws IOException, InterruptedException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java index 73a322cdc6..75b9486b11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java @@ -34,7 +34,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** * A mock version of ServiceClient - This class is design @@ -46,6 +49,7 @@ public class ServiceClientTest extends ServiceClient { private Configuration conf = new Configuration(); private Service goodServiceStatus = buildLiveGoodService(); private boolean initialized; + private Set expectedInstances = new HashSet<>(); public ServiceClientTest() { super(); @@ -61,11 +65,12 @@ public void init(Configuration conf) { @Override public void stop() { - // This is needed for testing API Server which use client to get status + // This is needed for testing API Server which uses client to get status // and then perform an action. } public void forceStop() { + expectedInstances.clear(); super.stop(); } @@ -144,17 +149,27 @@ public int initiateUpgrade(Service service) throws YarnException, @Override public int actionUpgrade(Service service, List compInstances) throws IOException, YarnException { - if (service.getName() != null && service.getName().equals("jenkins")) { - return EXIT_SUCCESS; - } else { - throw new IllegalArgumentException(); + if (service.getName() != null && service.getName().equals("jenkins") + && compInstances != null) { + Set actualInstances = compInstances.stream().map( + Container::getComponentInstanceName).collect(Collectors.toSet()); + if (actualInstances.equals(expectedInstances)) { + return EXIT_SUCCESS; + } } + throw new IllegalArgumentException(); } Service getGoodServiceStatus() { return goodServiceStatus; } + void setExpectedInstances(Set instances) { + if (instances != null) { + expectedInstances.addAll(instances); + } + } + static Service buildGoodService() { Service service = new Service(); service.setName("jenkins"); @@ -166,13 +181,15 @@ static Service buildGoodService() { resource.setCpus(1); resource.setMemory("2048"); List components = new ArrayList<>(); - Component c = new Component(); - c.setName("jenkins"); - c.setNumberOfContainers(2L); - c.setArtifact(artifact); - c.setLaunchCommand(""); - c.setResource(resource); - components.add(c); + for (int i = 0; i < 2; i++) { + Component c = new Component(); + c.setName("jenkins" + i); + c.setNumberOfContainers(2L); + c.setArtifact(artifact); + c.setLaunchCommand(""); + c.setResource(resource); + components.add(c); + } service.setComponents(components); return service; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java index 38aeb59c31..733b9bcffa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java @@ -23,18 +23,22 @@ import java.io.File; import java.io.FileWriter; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Path; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Artifact.TypeEnum; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Resource; @@ -523,8 +527,11 @@ public void testUpgradeSingleInstance() { // and container state needs to be in NEEDS_UPGRADE. Service serviceStatus = mockServerClient.getGoodServiceStatus(); serviceStatus.setState(ServiceState.UPGRADING); - serviceStatus.getComponents().iterator().next().getContainers().iterator() - .next().setState(ContainerState.NEEDS_UPGRADE); + Container liveContainer = serviceStatus.getComponents().iterator().next() + .getContainers().iterator().next(); + liveContainer.setState(ContainerState.NEEDS_UPGRADE); + mockServerClient.setExpectedInstances(Sets.newHashSet( + liveContainer.getComponentInstanceName())); final Response actual = apiServer.updateComponentInstance(request, goodService.getName(), comp.getName(), @@ -545,9 +552,14 @@ public void testUpgradeMultipleInstances() { // and container state needs to be in NEEDS_UPGRADE. Service serviceStatus = mockServerClient.getGoodServiceStatus(); serviceStatus.setState(ServiceState.UPGRADING); + Set expectedInstances = new HashSet<>(); serviceStatus.getComponents().iterator().next().getContainers().forEach( - container -> container.setState(ContainerState.NEEDS_UPGRADE) + container -> { + container.setState(ContainerState.NEEDS_UPGRADE); + expectedInstances.add(container.getComponentInstanceName()); + } ); + mockServerClient.setExpectedInstances(expectedInstances); final Response actual = apiServer.updateComponentInstances(request, goodService.getName(), comp.getContainers()); @@ -555,4 +567,57 @@ public void testUpgradeMultipleInstances() { Response.status(Status.ACCEPTED).build().getStatus(), actual.getStatus()); } + + @Test + public void testUpgradeComponent() { + Service goodService = ServiceClientTest.buildLiveGoodService(); + Component comp = goodService.getComponents().iterator().next(); + comp.setState(ComponentState.UPGRADING); + + // To be able to upgrade, the service needs to be in UPGRADING + // and component state needs to be in NEEDS_UPGRADE. + Service serviceStatus = mockServerClient.getGoodServiceStatus(); + serviceStatus.setState(ServiceState.UPGRADING); + Component liveComp = serviceStatus.getComponent(comp.getName()); + liveComp.setState(ComponentState.NEEDS_UPGRADE); + Set expectedInstances = new HashSet<>(); + liveComp.getContainers().forEach(container -> { + expectedInstances.add(container.getComponentInstanceName()); + container.setState(ContainerState.NEEDS_UPGRADE); + }); + mockServerClient.setExpectedInstances(expectedInstances); + + final Response actual = apiServer.updateComponent(request, + goodService.getName(), comp.getName(), comp); + assertEquals("Component upgrade is ", + Response.status(Status.ACCEPTED).build().getStatus(), + actual.getStatus()); + } + + @Test + public void testUpgradeMultipleComps() { + Service goodService = ServiceClientTest.buildLiveGoodService(); + goodService.getComponents().forEach(comp -> + comp.setState(ComponentState.UPGRADING)); + + // To be able to upgrade, the live service needs to be in UPGRADING + // and component states needs to be in NEEDS_UPGRADE. + Service serviceStatus = mockServerClient.getGoodServiceStatus(); + serviceStatus.setState(ServiceState.UPGRADING); + Set expectedInstances = new HashSet<>(); + serviceStatus.getComponents().forEach(liveComp -> { + liveComp.setState(ComponentState.NEEDS_UPGRADE); + liveComp.getContainers().forEach(liveContainer -> { + expectedInstances.add(liveContainer.getComponentInstanceName()); + liveContainer.setState(ContainerState.NEEDS_UPGRADE); + }); + }); + mockServerClient.setExpectedInstances(expectedInstances); + + final Response actual = apiServer.updateComponents(request, + goodService.getName(), goodService.getComponents()); + assertEquals("Component upgrade is ", + Response.status(Status.ACCEPTED).build().getStatus(), + actual.getStatus()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java index fd31570000..6cf08807c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java @@ -298,5 +298,17 @@ public void testInstancesUpgrade() { } } + @Test + public void testComponentsUpgrade() { + String appName = "example-app"; + try { + int result = asc.actionUpgradeComponents(appName, Lists.newArrayList( + "comp")); + assertEquals(EXIT_SUCCESS, result); + } catch (IOException | YarnException e) { + fail(); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java index f7eda7bccf..3e7ed11a25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java @@ -26,5 +26,5 @@ @InterfaceStability.Unstable @ApiModel(description = "The current state of a component.") public enum ComponentState { - FLEXING, STABLE, NEEDS_UPGRADE; + FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 364a94ca3f..93a74e3cfa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -294,6 +294,17 @@ public int actionUpgradeInstances(String appName, Service persistedService = ServiceApiUtil.loadService(fs, appName); List containersToUpgrade = ServiceApiUtil. getLiveContainers(persistedService, componentInstances); + ServiceApiUtil.validateInstancesUpgrade(containersToUpgrade); + return actionUpgrade(persistedService, containersToUpgrade); + } + + @Override + public int actionUpgradeComponents(String appName, + List components) throws IOException, YarnException { + checkAppExistOnHdfs(appName); + Service persistedService = ServiceApiUtil.loadService(fs, appName); + List containersToUpgrade = ServiceApiUtil + .validateAndResolveCompsUpgrade(persistedService, components); return actionUpgrade(persistedService, containersToUpgrade); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java index bd1e9e7155..2d7db32b23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java @@ -34,6 +34,8 @@ public interface RestApiConstants { "/component-instances/{component_instance_name}"; String COMP_INSTANCES = "component-instances"; String COMP_INSTANCES_PATH = SERVICE_PATH + "/" + COMP_INSTANCES; + String COMPONENTS = "components"; + String COMPONENTS_PATH = SERVICE_PATH + "/" + COMPONENTS; // Query param String SERVICE_NAME = "service_name"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java index 0e42533504..5b6eac3300 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/exceptions/RestApiErrorMessages.java @@ -105,4 +105,10 @@ public interface RestApiErrorMessages { + "expression name defined for this component only."; String ERROR_KEYTAB_URI_SCHEME_INVALID = "Unsupported keytab URI scheme: %s"; String ERROR_KEYTAB_URI_INVALID = "Invalid keytab URI: %s"; + + String ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE = "The component instance " + + "(%s) does not need an upgrade."; + + String ERROR_COMP_DOES_NOT_NEED_UPGRADE = "The component (%s) does not need" + + " an upgrade."; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index 6e62c56f23..2f826fad8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.service.utils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -29,14 +31,16 @@ import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Configuration; -import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal; import org.apache.hadoop.yarn.service.api.records.PlacementConstraint; import org.apache.hadoop.yarn.service.api.records.Resource; -import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.conf.RestApiConstants; import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages; @@ -58,6 +62,9 @@ import java.util.Map; import java.util.Set; +import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.ERROR_COMP_DOES_NOT_NEED_UPGRADE; +import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE; + public class ServiceApiUtil { private static final Logger LOG = LoggerFactory.getLogger(ServiceApiUtil.class); @@ -545,6 +552,48 @@ public static List getLiveContainers(Service service, return result; } + /** + * Validates that the component instances that are requested to upgrade + * require an upgrade. + */ + public static void validateInstancesUpgrade(List + liveContainers) throws YarnException { + for (Container liveContainer : liveContainers) { + if (!liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) { + // Nothing to upgrade + throw new YarnException(String.format( + ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE, + liveContainer.getComponentInstanceName())); + } + } + } + + /** + * Validates the components that are requested to upgrade require an upgrade. + * It returns the instances of the components which need upgrade. + */ + public static List validateAndResolveCompsUpgrade( + Service liveService, Collection compNames) throws YarnException { + Preconditions.checkNotNull(compNames); + HashSet requestedComps = Sets.newHashSet(compNames); + List containerNeedUpgrade = new ArrayList<>(); + for (Component liveComp : liveService.getComponents()) { + if (requestedComps.contains(liveComp.getName())) { + if (!liveComp.getState().equals(ComponentState.NEEDS_UPGRADE)) { + // Nothing to upgrade + throw new YarnException(String.format( + ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName())); + } + liveComp.getContainers().forEach(liveContainer -> { + if (liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) { + containerNeedUpgrade.add(liveContainer); + } + }); + } + } + return containerNeedUpgrade; + } + private static String parseComponentName(String componentInstanceName) throws YarnException { int idx = componentInstanceName.lastIndexOf('-'); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java index c40a39d021..78a8198818 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java @@ -193,6 +193,18 @@ public void testUpgradeInstances() throws Exception { Assert.assertEquals(result, 0); } + @Test (timeout = 180000) + public void testUpgradeComponents() throws Exception { + conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE, + DummyServiceClient.class.getName()); + cli.setConf(conf); + String[] args = {"app", "-upgrade", "app-1", + "-components", "comp1,comp2", + "-appTypes", DUMMY_APP_TYPE}; + int result = cli.run(ApplicationCLI.preProcessArgs(args)); + Assert.assertEquals(result, 0); + } + @Test (timeout = 180000) public void testEnableFastLaunch() throws Exception { fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar")) @@ -291,5 +303,11 @@ public int actionUpgradeInstances(String appName, List componentInstances) throws IOException, YarnException { return 0; } + + @Override + public int actionUpgradeComponents(String appName, List components) + throws IOException, YarnException { + return 0; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java index e3974742b2..91f899c82a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -258,4 +258,16 @@ public abstract int initiateUpgrade(String appName, String fileName, public abstract int actionUpgradeInstances(String appName, List componentInstances) throws IOException, YarnException; + + /** + * Upgrade components of a long running service. + * + * @param appName the name of the application. + * @param components the name of the components. + */ + @Public + @Unstable + public abstract int actionUpgradeComponents(String appName, + List components) throws IOException, YarnException; + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 17fc961fc3..1d26a96bb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -104,6 +104,7 @@ public class ApplicationCLI extends YarnCLI { public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize"; public static final String UPGRADE_FINALIZE = "finalize"; public static final String COMPONENT_INSTS = "instances"; + public static final String COMPONENTS = "components"; private static String firstArg = null; @@ -250,6 +251,8 @@ public int run(String[] args) throws Exception { opts.addOption(COMPONENT_INSTS, true, "Works with -upgrade option to " + "trigger the upgrade of specified component instances of the " + "application."); + opts.addOption(COMPONENTS, true, "Works with -upgrade option to " + + "trigger the upgrade of specified components of the application."); opts.addOption(UPGRADE_FINALIZE, false, "Works with -upgrade option to " + "finalize the upgrade."); opts.addOption(UPGRADE_AUTO_FINALIZE, false, "Works with -upgrade and " + @@ -274,6 +277,9 @@ public int run(String[] args) throws Exception { opts.getOption(COMPONENT_INSTS).setArgName("Component Instances"); opts.getOption(COMPONENT_INSTS).setValueSeparator(','); opts.getOption(COMPONENT_INSTS).setArgs(Option.UNLIMITED_VALUES); + opts.getOption(COMPONENTS).setArgName("Components"); + opts.getOption(COMPONENTS).setValueSeparator(','); + opts.getOption(COMPONENTS).setArgs(Option.UNLIMITED_VALUES); } else if (title != null && title.equalsIgnoreCase(APPLICATION_ATTEMPT)) { opts.addOption(STATUS_CMD, true, "Prints the status of the application attempt."); @@ -574,7 +580,7 @@ public int run(String[] args) throws Exception { cliParser.getOptionValue(CHANGE_APPLICATION_QUEUE)); } else if (cliParser.hasOption(UPGRADE_CMD)) { if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_INITIATE, - UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, COMPONENT_INSTS, + UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, COMPONENT_INSTS, COMPONENTS, APP_TYPE_CMD)) { printUsage(title, opts); return exitCode; @@ -603,6 +609,15 @@ public int run(String[] args) throws Exception { } String[] instances = cliParser.getOptionValues(COMPONENT_INSTS); return client.actionUpgradeInstances(appName, Arrays.asList(instances)); + } else if (cliParser.hasOption(COMPONENTS)) { + if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, + COMPONENTS, APP_TYPE_CMD)) { + printUsage(title, opts); + return exitCode; + } + String[] components = cliParser.getOptionValues(COMPONENTS); + return client.actionUpgradeComponents(appName, + Arrays.asList(components)); } else if (cliParser.hasOption(UPGRADE_FINALIZE)) { if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_FINALIZE, APP_TYPE_CMD)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 244eb4573f..fc2eeab833 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -2143,6 +2143,9 @@ private String createApplicationCLIHelpMessage() throws IOException { pw.println(" long-running service. Supports"); pw.println(" absolute or relative changes,"); pw.println(" such as +1, 2, or -3."); + pw.println(" -components Works with -upgrade option to"); + pw.println(" trigger the upgrade of specified"); + pw.println(" components of the application."); pw.println(" -destroy Destroys a saved application"); pw.println(" specification and removes all"); pw.println(" application data permanently.");