diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index e4a245d3dc..cdba555c42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -26,6 +26,7 @@ import javax.ws.rs.core.MediaType; +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -40,11 +41,16 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ServiceStatus; +import org.apache.hadoop.yarn.service.conf.RestApiConstants; +import org.apache.hadoop.yarn.service.utils.JsonSerDeser; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.util.RMHAUtils; +import org.codehaus.jackson.map.PropertyNamingStrategy; import org.eclipse.jetty.util.UrlEncoded; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,7 +137,7 @@ private String getRMWebAddress() { * @return URI to API Service * @throws IOException */ - private String getApiUrl(String appName) throws IOException { + private String getServicePath(String appName) throws IOException { String url = getRMWebAddress(); StringBuilder api = new StringBuilder(); api.append(url); @@ -148,23 +154,40 @@ private String getApiUrl(String appName) throws IOException { return api.toString(); } + private String getInstancesPath(String appName) throws IOException { + Preconditions.checkNotNull(appName); + String url = getRMWebAddress(); + StringBuilder api = new StringBuilder(); + api.append(url); + api.append("/app/v1/services/").append(appName).append("/") + .append(RestApiConstants.COMP_INSTANCES); + Configuration conf = getConfig(); + if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase( + "simple")) { + api.append("?user.name=" + UrlEncoded + .encodeString(System.getProperty("user.name"))); + } + return api.toString(); + } + private Builder getApiClient() throws IOException { - return getApiClient(null); + return getApiClient(getServicePath(null)); } /** * Setup API service web request. * - * @param appName + * @param requestPath * @return * @throws IOException */ - private Builder getApiClient(String appName) throws IOException { + private Builder getApiClient(String requestPath) + throws IOException { Client client = Client.create(getClientConfig()); Configuration conf = getConfig(); client.setChunkedEncodingSize(null); Builder builder = client - .resource(getApiUrl(appName)).type(MediaType.APPLICATION_JSON); + .resource(requestPath).type(MediaType.APPLICATION_JSON); if (conf.get("hadoop.http.authentication.type").equals("kerberos")) { AuthenticatedURL.Token token = new AuthenticatedURL.Token(); builder.header("WWW-Authenticate", token); @@ -312,7 +335,7 @@ public int actionStop(String appName) throws IOException, YarnException { service.setName(appName); service.setState(ServiceState.STOPPED); String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(appName) + ClientResponse response = getApiClient(getServicePath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); } catch (Exception e) { @@ -335,7 +358,7 @@ public int actionStart(String appName) throws IOException, YarnException { service.setName(appName); service.setState(ServiceState.STARTED); String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(appName) + ClientResponse response = getApiClient(getServicePath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); } catch (Exception e) { @@ -381,7 +404,7 @@ public int actionSave(String fileName, String appName, Long lifetime, public int actionDestroy(String appName) throws IOException, YarnException { int result = EXIT_SUCCESS; try { - ClientResponse response = getApiClient(appName) + ClientResponse response = getApiClient(getServicePath(appName)) .delete(ClientResponse.class); result = processResponse(response); } catch (Exception e) { @@ -413,7 +436,7 @@ public int actionFlex(String appName, Map componentCounts) service.addComponent(component); } String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient(appName) + ClientResponse response = getApiClient(getServicePath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); } catch (Exception e) { @@ -454,7 +477,8 @@ public String getStatusString(String appIdOrName) throws IOException, ServiceApiUtil.validateNameFormat(appName, getConfig()); } try { - ClientResponse response = getApiClient(appName).get(ClientResponse.class); + ClientResponse response = getApiClient(getServicePath(appName)) + .get(ClientResponse.class); if (response.getStatus() != 200) { StringBuilder sb = new StringBuilder(); sb.append(appName); @@ -470,16 +494,20 @@ public String getStatusString(String appIdOrName) throws IOException, } @Override - public int actionUpgrade(String appName, - String fileName) throws IOException, YarnException { + public int initiateUpgrade(String appName, + String fileName, boolean autoFinalize) throws IOException, YarnException { int result; try { Service service = loadAppJsonFromLocalFS(fileName, appName, null, null); - service.setState(ServiceState.UPGRADING); + if (autoFinalize) { + service.setState(ServiceState.UPGRADING_AUTO_FINALIZE); + } else { + service.setState(ServiceState.UPGRADING); + } String buffer = jsonSerDeser.toJson(service); - ClientResponse response = getApiClient() - .post(ClientResponse.class, buffer); + ClientResponse response = getApiClient(getServicePath(appName)) + .put(ClientResponse.class, buffer); result = processResponse(response); } catch (Exception e) { LOG.error("Failed to upgrade application: ", e); @@ -487,4 +515,32 @@ public int actionUpgrade(String appName, } return result; } + + @Override + public int actionUpgradeInstances(String appName, List compInstances) + throws IOException, YarnException { + int result; + Container[] toUpgrade = new Container[compInstances.size()]; + try { + int idx = 0; + for (String instanceName : compInstances) { + Container container = new Container(); + container.setComponentInstanceName(instanceName); + container.setState(ContainerState.UPGRADING); + toUpgrade[idx++] = container; + } + String buffer = containerJsonSerde.toJson(toUpgrade); + ClientResponse response = getApiClient(getInstancesPath(appName)) + .put(ClientResponse.class, buffer); + result = processResponse(response); + } catch (Exception e) { + LOG.error("Failed to upgrade component instance: ", e); + result = EXIT_EXCEPTION_THROWN; + } + return result; + } + + private static JsonSerDeser containerJsonSerde = + new JsonSerDeser<>(Container[].class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index 14c77f6a2c..6f32598f81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.service.webapp; +import com.google.common.collect.Lists; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -28,10 +29,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ServiceStatus; import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.conf.RestApiConstants; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,9 +58,12 @@ import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED; import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*; @@ -177,17 +185,7 @@ public Response getService(@Context HttpServletRequest request, } UserGroupInformation ugi = getProxyUser(request); LOG.info("GET: getService for appName = {} user = {}", appName, ugi); - Service app = ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Service run() throws IOException, YarnException { - ServiceClient sc = getServiceClient(); - sc.init(YARN_CONFIG); - sc.start(); - Service app = sc.getStatus(appName); - sc.close(); - return app; - } - }); + Service app = getServiceFromClient(ugi, appName); return Response.ok(app).build(); } catch (AccessControlException e) { return formatResponse(Status.FORBIDDEN, e.getMessage()); @@ -393,17 +391,19 @@ public Response updateService(@Context HttpServletRequest request, return startService(appName, ugi); } + // If an UPGRADE is requested + if (updateServiceData.getState() != null && ( + updateServiceData.getState() == ServiceState.UPGRADING || + updateServiceData.getState() == + ServiceState.UPGRADING_AUTO_FINALIZE)) { + return upgradeService(updateServiceData, ugi); + } + // If new lifetime value specified then update it if (updateServiceData.getLifetime() != null && updateServiceData.getLifetime() > 0) { return updateLifetime(appName, updateServiceData, ugi); } - - // If an UPGRADE is requested - if (updateServiceData.getState() != null && - updateServiceData.getState() == ServiceState.UPGRADING) { - return upgradeService(updateServiceData, ugi); - } } catch (UndeclaredThrowableException e) { return formatResponse(Status.BAD_REQUEST, e.getCause().getMessage()); @@ -427,6 +427,103 @@ public Response updateService(@Context HttpServletRequest request, return Response.status(Status.NO_CONTENT).build(); } + @PUT + @Path(COMP_INSTANCE_LONG_PATH) + @Consumes({MediaType.APPLICATION_JSON}) + @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN}) + public Response updateComponentInstance(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String serviceName, + @PathParam(COMPONENT_NAME) String componentName, + @PathParam(COMP_INSTANCE_NAME) String compInstanceName, + Container reqContainer) { + + try { + UserGroupInformation ugi = getProxyUser(request); + LOG.info("PUT: update component instance {} for component = {}" + + " service = {} user = {}", compInstanceName, componentName, + serviceName, ugi); + if (reqContainer == null) { + throw new YarnException("No container data provided."); + } + Service service = getServiceFromClient(ugi, serviceName); + Component component = service.getComponent(componentName); + if (component == null) { + throw new YarnException(String.format( + "The component name in the URI path (%s) is invalid.", + componentName)); + } + + Container liveContainer = component.getComponentInstance( + compInstanceName); + if (liveContainer == null) { + throw new YarnException(String.format( + "The component (%s) does not have a component instance (%s).", + componentName, compInstanceName)); + } + + if (reqContainer.getState() != null + && reqContainer.getState().equals(ContainerState.UPGRADING)) { + return processContainerUpgrade(ugi, service, + Lists.newArrayList(liveContainer)); + } + } catch (AccessControlException e) { + return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); + } catch (YarnException e) { + return formatResponse(Response.Status.BAD_REQUEST, e.getMessage()); + } catch (IOException | InterruptedException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getMessage()); + } catch (UndeclaredThrowableException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getCause().getMessage()); + } + return Response.status(Status.NO_CONTENT).build(); + } + + @PUT + @Path(COMP_INSTANCES_PATH) + @Consumes({MediaType.APPLICATION_JSON}) + @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN}) + public Response updateComponentInstances(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String serviceName, + List requestContainers) { + + try { + if (requestContainers == null || requestContainers.isEmpty()) { + throw new YarnException("No containers provided."); + } + UserGroupInformation ugi = getProxyUser(request); + List toUpgrade = new ArrayList<>(); + for (Container reqContainer : requestContainers) { + if (reqContainer.getState() != null && + reqContainer.getState().equals(ContainerState.UPGRADING)) { + toUpgrade.add(reqContainer.getComponentInstanceName()); + } + } + + if (!toUpgrade.isEmpty()) { + Service service = getServiceFromClient(ugi, serviceName); + LOG.info("PUT: upgrade component instances {} for service = {} " + + "user = {}", toUpgrade, serviceName, ugi); + List liveContainers = ServiceApiUtil + .getLiveContainers(service, toUpgrade); + + return processContainerUpgrade(ugi, service, liveContainers); + } + } catch (AccessControlException e) { + return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); + } catch (YarnException e) { + return formatResponse(Response.Status.BAD_REQUEST, e.getMessage()); + } catch (IOException | InterruptedException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getMessage()); + } catch (UndeclaredThrowableException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getCause().getMessage()); + } + return Response.status(Status.NO_CONTENT).build(); + } + private Response flexService(Service service, UserGroupInformation ugi) throws IOException, InterruptedException { String appName = service.getName(); @@ -511,17 +608,70 @@ private Response upgradeService(Service service, ServiceClient sc = getServiceClient(); sc.init(YARN_CONFIG); sc.start(); - sc.actionUpgrade(service); + sc.initiateUpgrade(service); sc.close(); return null; }); - LOG.info("Service {} version {} upgrade initialized"); + LOG.info("Service {} version {} upgrade initialized", service.getName(), + service.getVersion()); status.setDiagnostics("Service " + service.getName() + " version " + service.getVersion() + " saved."); status.setState(ServiceState.ACCEPTED); return formatResponse(Status.ACCEPTED, status); } + private Response processContainerUpgrade(UserGroupInformation ugi, + Service service, List containers) throws YarnException, + IOException, InterruptedException { + + if (service.getState() != ServiceState.UPGRADING) { + throw new YarnException( + String.format("The upgrade of service %s has not been initiated.", + service.getName())); + } + for (Container liveContainer : containers) { + if (liveContainer.getState() != ContainerState.NEEDS_UPGRADE) { + // Nothing to upgrade + throw new YarnException(String.format( + "The component instance (%s) does not need an upgrade.", + liveContainer.getComponentInstanceName())); + } + } + + Integer result = ugi.doAs((PrivilegedExceptionAction) () -> { + int result1; + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + result1 = sc.actionUpgrade(service, containers); + sc.close(); + return result1; + }); + + if (result == EXIT_SUCCESS) { + ServiceStatus status = new ServiceStatus(); + status.setDiagnostics( + "Upgrading component instances " + containers.stream() + .map(Container::getId).collect(Collectors.joining(",")) + "."); + return formatResponse(Response.Status.ACCEPTED, status); + } + // If result is not a success, consider it a no-op + return Response.status(Response.Status.NO_CONTENT).build(); + } + + private Service getServiceFromClient(UserGroupInformation ugi, + String serviceName) throws IOException, InterruptedException { + + return ugi.doAs((PrivilegedExceptionAction) () -> { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + Service app1 = sc.getStatus(serviceName); + sc.close(); + return app1; + }); + } + /** * Used by negative test case. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java index 543c5833b4..cff3e397c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java @@ -17,17 +17,24 @@ package org.apache.hadoop.yarn.service; -import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Artifact; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + /** * A mock version of ServiceClient - This class is design * to simulate various error conditions that will happen @@ -36,14 +43,31 @@ public class ServiceClientTest extends ServiceClient { private Configuration conf = new Configuration(); - - protected static void init() { - } + private Service goodServiceStatus = buildLiveGoodService(); + private boolean initialized; public ServiceClientTest() { super(); } + @Override + public void init(Configuration conf) { + if (!initialized) { + super.init(conf); + initialized = true; + } + } + + @Override + public void stop() { + // This is needed for testing API Server which use client to get status + // and then perform an action. + } + + public void forceStop() { + super.stop(); + } + @Override public Configuration getConfig() { return conf; @@ -58,11 +82,8 @@ public ApplicationId actionCreate(Service service) throws IOException { @Override public Service getStatus(String appName) { - if (appName == null) { - throw new NullPointerException(); - } - if (appName.equals("jenkins")) { - return new Service(); + if (appName != null && appName.equals("jenkins")) { + return goodServiceStatus; } else { throw new IllegalArgumentException(); } @@ -71,10 +92,7 @@ public Service getStatus(String appName) { @Override public int actionStart(String serviceName) throws YarnException, IOException { - if (serviceName == null) { - throw new NullPointerException(); - } - if (serviceName.equals("jenkins")) { + if (serviceName != null && serviceName.equals("jenkins")) { return EXIT_SUCCESS; } else { throw new ApplicationNotFoundException(""); @@ -98,19 +116,77 @@ public int actionStop(String serviceName, boolean waitForAppStopped) @Override public int actionDestroy(String serviceName) { - if (serviceName == null) { - throw new NullPointerException(); + if (serviceName != null) { + if (serviceName.equals("jenkins")) { + return EXIT_SUCCESS; + } else if (serviceName.equals("jenkins-already-stopped")) { + return EXIT_SUCCESS; + } else if (serviceName.equals("jenkins-doesn't-exist")) { + return EXIT_NOT_FOUND; + } else if (serviceName.equals("jenkins-error-cleaning-registry")) { + return EXIT_OTHER_FAILURE; + } } - if (serviceName.equals("jenkins")) { + throw new IllegalArgumentException(); + } + + @Override + public int initiateUpgrade(Service service) throws YarnException, + IOException { + if (service.getName() != null && service.getName().equals("jenkins")) { return EXIT_SUCCESS; - } else if (serviceName.equals("jenkins-already-stopped")) { - return EXIT_SUCCESS; - } else if (serviceName.equals("jenkins-doesn't-exist")) { - return EXIT_NOT_FOUND; - } else if (serviceName.equals("jenkins-error-cleaning-registry")) { - return EXIT_OTHER_FAILURE; } else { throw new IllegalArgumentException(); } } + + @Override + public int actionUpgrade(Service service, List compInstances) + throws IOException, YarnException { + if (service.getName() != null && service.getName().equals("jenkins")) { + return EXIT_SUCCESS; + } else { + throw new IllegalArgumentException(); + } + } + + Service getGoodServiceStatus() { + return goodServiceStatus; + } + + static Service buildGoodService() { + Service service = new Service(); + service.setName("jenkins"); + service.setVersion("v1"); + Artifact artifact = new Artifact(); + artifact.setType(Artifact.TypeEnum.DOCKER); + artifact.setId("jenkins:latest"); + Resource resource = new Resource(); + resource.setCpus(1); + resource.setMemory("2048"); + List components = new ArrayList<>(); + Component c = new Component(); + c.setName("jenkins"); + c.setNumberOfContainers(2L); + c.setArtifact(artifact); + c.setLaunchCommand(""); + c.setResource(resource); + components.add(c); + service.setComponents(components); + return service; + } + + static Service buildLiveGoodService() { + Service service = buildGoodService(); + Component comp = service.getComponents().iterator().next(); + List containers = new ArrayList<>(); + for (int i = 0; i < comp.getNumberOfContainers(); i++) { + Container container = new Container(); + container.setComponentInstanceName(comp.getName() + "-" + (i + 1)); + container.setState(ContainerState.READY); + containers.add(container); + } + comp.setContainers(containers); + return service; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java index 72c6e2f111..85c3cd410c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java @@ -35,12 +35,14 @@ import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Artifact.TypeEnum; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ServiceStatus; -import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.webapp.ApiServer; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -52,13 +54,14 @@ public class TestApiServer { private ApiServer apiServer; private HttpServletRequest request; + private ServiceClientTest mockServerClient; @Before public void setup() throws Exception { request = Mockito.mock(HttpServletRequest.class); Mockito.when(request.getRemoteUser()) .thenReturn(System.getProperty("user.name")); - ServiceClient mockServerClient = new ServiceClientTest(); + mockServerClient = new ServiceClientTest(); Configuration conf = new Configuration(); conf.set("yarn.api-service.service.client.class", ServiceClientTest.class.getName()); @@ -66,6 +69,11 @@ public void setup() throws Exception { apiServer.setServiceClient(mockServerClient); } + @After + public void teardown() { + mockServerClient.forceStop(); + } + @Test public void testPathAnnotation() { assertNotNull(this.apiServer.getClass().getAnnotation(Path.class)); @@ -107,24 +115,7 @@ public void testGoodCreateService() throws Exception { BufferedWriter bw = new BufferedWriter(new FileWriter(dockerConfig)); bw.write(json); bw.close(); - Service service = new Service(); - service.setName("jenkins"); - service.setVersion("v1"); - Artifact artifact = new Artifact(); - artifact.setType(TypeEnum.DOCKER); - artifact.setId("jenkins:latest"); - Resource resource = new Resource(); - resource.setCpus(1); - resource.setMemory("2048"); - List components = new ArrayList(); - Component c = new Component(); - c.setName("jenkins"); - c.setNumberOfContainers(1L); - c.setArtifact(artifact); - c.setLaunchCommand(""); - c.setResource(resource); - components.add(c); - service.setComponents(components); + Service service = ServiceClientTest.buildGoodService(); final Response actual = apiServer.createService(request, service); assertEquals("Create service is ", Response.status(Status.ACCEPTED).build().getStatus(), @@ -495,4 +486,60 @@ public void testUpdateComponent() { + "that in the URI path (jenkins-master)", serviceStatus.getDiagnostics()); } + + @Test + public void testInitiateUpgrade() { + Service goodService = ServiceClientTest.buildLiveGoodService(); + goodService.setVersion("v2"); + goodService.setState(ServiceState.UPGRADING); + final Response actual = apiServer.updateService(request, + goodService.getName(), goodService); + assertEquals("Initiate upgrade is ", + Response.status(Status.ACCEPTED).build().getStatus(), + actual.getStatus()); + } + + @Test + public void testUpgradeSingleInstance() { + Service goodService = ServiceClientTest.buildLiveGoodService(); + Component comp = goodService.getComponents().iterator().next(); + Container container = comp.getContainers().iterator().next(); + container.setState(ContainerState.UPGRADING); + + // To be able to upgrade, the service needs to be in UPGRADING + // and container state needs to be in NEEDS_UPGRADE. + Service serviceStatus = mockServerClient.getGoodServiceStatus(); + serviceStatus.setState(ServiceState.UPGRADING); + serviceStatus.getComponents().iterator().next().getContainers().iterator() + .next().setState(ContainerState.NEEDS_UPGRADE); + + final Response actual = apiServer.updateComponentInstance(request, + goodService.getName(), comp.getName(), + container.getComponentInstanceName(), container); + assertEquals("Instance upgrade is ", + Response.status(Status.ACCEPTED).build().getStatus(), + actual.getStatus()); + } + + @Test + public void testUpgradeMultipleInstances() { + Service goodService = ServiceClientTest.buildLiveGoodService(); + Component comp = goodService.getComponents().iterator().next(); + comp.getContainers().forEach(container -> + container.setState(ContainerState.UPGRADING)); + + // To be able to upgrade, the service needs to be in UPGRADING + // and container state needs to be in NEEDS_UPGRADE. + Service serviceStatus = mockServerClient.getGoodServiceStatus(); + serviceStatus.setState(ServiceState.UPGRADING); + serviceStatus.getComponents().iterator().next().getContainers().forEach( + container -> container.setState(ContainerState.NEEDS_UPGRADE) + ); + + final Response actual = apiServer.updateComponentInstances(request, + goodService.getName(), comp.getContainers()); + assertEquals("Instance upgrade is ", + Response.status(Status.ACCEPTED).build().getStatus(), + actual.getStatus()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java index ffd9328041..a24514468c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java @@ -26,6 +26,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.eclipse.jetty.server.Server; @@ -256,4 +257,29 @@ public void testBadDestroy() { } } + @Test + public void testInitiateServiceUpgrade() { + String appName = "example-app"; + String upgradeFileName = "target/test-classes/example-app.json"; + try { + int result = asc.initiateUpgrade(appName, upgradeFileName, false); + assertEquals(EXIT_SUCCESS, result); + } catch (IOException | YarnException e) { + fail(); + } + } + + @Test + public void testInstancesUpgrade() { + String appName = "example-app"; + try { + int result = asc.actionUpgradeInstances(appName, Lists.newArrayList( + "comp-1", "comp-2")); + assertEquals(EXIT_SUCCESS, result); + } catch (IOException | YarnException e) { + fail(); + } + } + + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java index 4422451c5c..45ff98ac57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; @@ -49,4 +51,8 @@ UpgradeServiceResponseProto upgrade(UpgradeServiceRequestProto request) RestartServiceResponseProto restart(RestartServiceRequestProto request) throws IOException, YarnException; + + CompInstancesUpgradeResponseProto upgrade( + CompInstancesUpgradeRequestProto request) throws IOException, + YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index 3d037e7189..d5d6fa421b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -26,8 +26,11 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; @@ -40,6 +43,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; import org.apache.hadoop.yarn.service.component.ComponentEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,12 +156,16 @@ public InetSocketAddress getBindAddress() { @Override public UpgradeServiceResponseProto upgrade( UpgradeServiceRequestProto request) throws IOException { - ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE); - event.setVersion(request.getVersion()); - context.scheduler.getDispatcher().getEventHandler().handle(event); - LOG.info("Upgrading service to version {} by {}", request.getVersion(), - UserGroupInformation.getCurrentUser()); - return UpgradeServiceResponseProto.newBuilder().build(); + try { + context.getServiceManager().processUpgradeRequest(request.getVersion(), + request.getAutoFinalize()); + LOG.info("Upgrading service to version {} by {}", request.getVersion(), + UserGroupInformation.getCurrentUser()); + return UpgradeServiceResponseProto.newBuilder().build(); + } catch (Exception ex) { + return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage()) + .build(); + } } @Override @@ -167,4 +176,21 @@ public RestartServiceResponseProto restart(RestartServiceRequestProto request) LOG.info("Restart service by {}", UserGroupInformation.getCurrentUser()); return RestartServiceResponseProto.newBuilder().build(); } + + @Override + public CompInstancesUpgradeResponseProto upgrade( + CompInstancesUpgradeRequestProto request) + throws IOException, YarnException { + if (!request.getContainerIdsList().isEmpty()) { + + for (String containerId : request.getContainerIdsList()) { + ComponentInstanceEvent event = + new ComponentInstanceEvent(ContainerId.fromString(containerId), + ComponentInstanceEventType.UPGRADE); + LOG.info("Upgrade container {}", containerId); + context.scheduler.getDispatcher().getEventHandler().handle(event); + } + } + return CompInstancesUpgradeResponseProto.newBuilder().build(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java index cd41ab7c83..6c91b9cd5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service; +import com.google.common.base.Preconditions; import com.google.common.cache.LoadingCache; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; @@ -42,8 +43,17 @@ public class ServiceContext { public String principal; // AM keytab location public String keytab; + private ServiceManager serviceManager; public ServiceContext() { } + + public ServiceManager getServiceManager() { + return serviceManager; + } + + void setServiceManager(ServiceManager serviceManager) { + this.serviceManager = Preconditions.checkNotNull(serviceManager); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java index 9e7d442b9e..0196be2a98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java @@ -28,6 +28,7 @@ public class ServiceEvent extends AbstractEvent { private final ServiceEventType type; private String version; + private boolean autoFinalize; public ServiceEvent(ServiceEventType serviceEventType) { super(serviceEventType); @@ -46,4 +47,13 @@ public ServiceEvent setVersion(String version) { this.version = version; return this; } + + public boolean isAutoFinalize() { + return autoFinalize; + } + + public ServiceEvent setAutoFinalize(boolean autoFinalize) { + this.autoFinalize = autoFinalize; + return this; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java index 2162eb5e93..4fc420ba6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java @@ -24,5 +24,5 @@ public enum ServiceEventType { START, UPGRADE, - STOP_UPGRADE + CHECK_STABLE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java index a3fbe899bb..869d7f3659 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; @@ -39,10 +40,13 @@ import java.text.MessageFormat; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; + /** - * Manages the state of the service. + * Manages the state of Service. */ public class ServiceManager implements EventHandler { private static final Logger LOG = LoggerFactory.getLogger( @@ -56,10 +60,10 @@ public class ServiceManager implements EventHandler { private final StateMachine stateMachine; + private final UpgradeComponentsFinder componentsFinder; private final AsyncDispatcher dispatcher; private final SliderFileSystem fs; - private final UpgradeComponentsFinder componentsFinder; private String upgradeVersion; @@ -72,9 +76,16 @@ public class ServiceManager implements EventHandler { State.UPGRADING), ServiceEventType.UPGRADE, new StartUpgradeTransition()) + .addTransition(State.STABLE, EnumSet.of(State.STABLE), + ServiceEventType.CHECK_STABLE, new CheckStableTransition()) + .addTransition(State.UPGRADING, EnumSet.of(State.STABLE, State.UPGRADING), ServiceEventType.START, - new StopUpgradeTransition()) + new CheckStableTransition()) + + .addTransition(State.UPGRADING, EnumSet.of(State.STABLE, + State.UPGRADING), ServiceEventType.CHECK_STABLE, + new CheckStableTransition()) .installTopology(); public ServiceManager(ServiceContext context) { @@ -102,7 +113,7 @@ public void handle(ServiceEvent event) { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitionException e) { LOG.error(MessageFormat.format( - "[SERVICE]: Invalid event {0} at {1}.", event.getType(), + "[SERVICE]: Invalid event {1} at {2}.", event.getType(), oldState), e); } if (oldState != getState()) { @@ -130,22 +141,11 @@ private static class StartUpgradeTransition implements public State transition(ServiceManager serviceManager, ServiceEvent event) { try { - Service targetSpec = ServiceApiUtil.loadServiceUpgrade( - serviceManager.fs, serviceManager.getName(), event.getVersion()); - - serviceManager.serviceSpec.setState(ServiceState.UPGRADING); - List - compsThatNeedUpgrade = serviceManager.componentsFinder. - findTargetComponentSpecs(serviceManager.serviceSpec, targetSpec); - - if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) { - compsThatNeedUpgrade.forEach(component -> { - ComponentEvent needUpgradeEvent = new ComponentEvent( - component.getName(), ComponentEventType.UPGRADE). - setTargetSpec(component); - serviceManager.dispatcher.getEventHandler().handle( - needUpgradeEvent); - }); + if (!event.isAutoFinalize()) { + serviceManager.serviceSpec.setState(ServiceState.UPGRADING); + } else { + serviceManager.serviceSpec.setState( + ServiceState.UPGRADING_AUTO_FINALIZE); } serviceManager.upgradeVersion = event.getVersion(); return State.UPGRADING; @@ -157,22 +157,29 @@ public State transition(ServiceManager serviceManager, } } - private static class StopUpgradeTransition implements + private static class CheckStableTransition implements MultipleArcTransition { @Override public State transition(ServiceManager serviceManager, ServiceEvent event) { - //abort is not supported currently - //trigger re-check of service state - ServiceMaster.checkAndUpdateServiceState(serviceManager.scheduler, - true); - if (serviceManager.serviceSpec.getState().equals(ServiceState.STABLE)) { - return serviceManager.finalizeUpgrade() ? State.STABLE : - State.UPGRADING; - } else { - return State.UPGRADING; + //trigger check of service state + ServiceState currState = serviceManager.serviceSpec.getState(); + if (currState.equals(ServiceState.STABLE)) { + return State.STABLE; } + if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) || + event.getType().equals(ServiceEventType.START)) { + ServiceState targetState = checkIfStable(serviceManager.serviceSpec); + if (targetState.equals(ServiceState.STABLE)) { + if (serviceManager.finalizeUpgrade()) { + LOG.info("Service def state changed from {} -> {}", currState, + serviceManager.serviceSpec.getState()); + return State.STABLE; + } + } + } + return State.UPGRADING; } } @@ -181,12 +188,21 @@ public State transition(ServiceManager serviceManager, */ private boolean finalizeUpgrade() { try { - Service upgradeSpec = ServiceApiUtil.loadServiceUpgrade( + // save the application id and state to + Service targetSpec = ServiceApiUtil.loadServiceUpgrade( fs, getName(), upgradeVersion); - ServiceApiUtil.writeAppDefinition(fs, - ServiceApiUtil.getServiceJsonPath(fs, getName()), upgradeSpec); + targetSpec.setId(serviceSpec.getId()); + targetSpec.setState(ServiceState.STABLE); + Map allComps = scheduler.getAllComponents(); + targetSpec.getComponents().forEach(compSpec -> { + Component comp = allComps.get(compSpec.getName()); + compSpec.setState(comp.getComponentSpec().getState()); + }); + jsonSerDeser.save(fs.getFileSystem(), + ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true); + fs.deleteClusterUpgradeDir(getName(), upgradeVersion); } catch (IOException e) { - LOG.error("Upgrade did not complete because unable to overwrite the" + + LOG.error("Upgrade did not complete because unable to re-write the" + " service definition", e); return false; } @@ -195,13 +211,79 @@ private boolean finalizeUpgrade() { fs.deleteClusterUpgradeDir(getName(), upgradeVersion); } catch (IOException e) { LOG.warn("Unable to delete upgrade definition for service {} " + - "version {}", getName(), upgradeVersion); + "version {}", getName(), upgradeVersion); } + serviceSpec.setState(ServiceState.STABLE); serviceSpec.setVersion(upgradeVersion); upgradeVersion = null; return true; } + private static ServiceState checkIfStable(Service service) { + // if desired == running + for (org.apache.hadoop.yarn.service.api.records.Component comp : + service.getComponents()) { + if (!comp.getState().equals( + org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE)) { + return service.getState(); + } + } + return ServiceState.STABLE; + } + + /** + * Service state gets directly modified by ServiceMaster and Component. + * This is a problem for upgrade and flexing. For now, invoking + * ServiceMaster.checkAndUpdateServiceState here to make it easy to fix + * this in future. + */ + public void checkAndUpdateServiceState(boolean isIncrement) { + writeLock.lock(); + try { + if (!getState().equals(State.UPGRADING)) { + ServiceMaster.checkAndUpdateServiceState(this.scheduler, + isIncrement); + } + } finally { + writeLock.unlock(); + } + } + + void processUpgradeRequest(String upgradeVersion, + boolean autoFinalize) throws IOException { + Service targetSpec = ServiceApiUtil.loadServiceUpgrade( + context.fs, context.service.getName(), upgradeVersion); + + List + compsThatNeedUpgrade = componentsFinder. + findTargetComponentSpecs(context.service, targetSpec); + ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE) + .setVersion(upgradeVersion) + .setAutoFinalize(autoFinalize); + context.scheduler.getDispatcher().getEventHandler().handle(event); + + if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) { + if (autoFinalize) { + event.setAutoFinalize(true); + } + compsThatNeedUpgrade.forEach(component -> { + ComponentEvent needUpgradeEvent = new ComponentEvent( + component.getName(), ComponentEventType.UPGRADE) + .setTargetSpec(component) + .setUpgradeVersion(event.getVersion()); + context.scheduler.getDispatcher().getEventHandler().handle( + needUpgradeEvent); + }); + } else { + // nothing to upgrade if upgrade auto finalize is requested, trigger a + // state check. + if (autoFinalize) { + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.CHECK_STABLE)); + } + } + } + /** * Returns the name of the service. */ @@ -216,10 +298,8 @@ public enum State { STABLE, UPGRADING } - @VisibleForTesting Service getServiceSpec() { return serviceSpec; } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 8d01410149..ee0a1a73dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -329,6 +329,7 @@ public void serviceStart() throws Exception { // Since AM has been started and registered, the service is in STARTED state app.setState(ServiceState.STARTED); serviceManager = new ServiceManager(context); + context.setServiceManager(serviceManager); // recover components based on containers sent from RM recoverComponents(response); @@ -757,6 +758,32 @@ public void onStartContainerError(ContainerId containerId, Throwable t) { // automatically which will trigger stopping COMPONENT INSTANCE } + @Override + public void onContainerReInitialize(ContainerId containerId) { + ComponentInstance instance = liveInstances.get(containerId); + if (instance == null) { + LOG.error("No component instance exists for {}", containerId); + return; + } + ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent( + containerId, ComponentInstanceEventType.BECOME_READY); + dispatcher.getEventHandler().handle(becomeReadyEvent); + } + + @Override + public void onContainerReInitializeError(ContainerId containerId, + Throwable t) { + ComponentInstance instance = liveInstances.get(containerId); + if (instance == null) { + LOG.error("No component instance exists for {}", containerId); + return; + } + ComponentEvent event = new ComponentEvent(instance.getCompName(), + ComponentEventType.CONTAINER_COMPLETED) + .setInstance(instance).setContainerId(containerId); + dispatcher.getEventHandler().handle(event); + } + @Override public void onContainerResourceIncreased(ContainerId containerId, Resource resource) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java index 667b1aa795..7deb076eda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java @@ -250,6 +250,15 @@ public Container getContainer(String id) { return null; } + public Container getComponentInstance(String compInstanceName) { + for (Container container : containers) { + if (compInstanceName.equals(container.getComponentInstanceName())) { + return container; + } + } + return null; + } + /** * Run all containers of this component in privileged mode (YARN-4262). **/ @@ -441,4 +450,16 @@ public void mergeFrom(Component that) { this.setReadinessCheck(that.getReadinessCheck()); } } + + public void overwrite(Component that) { + setArtifact(that.getArtifact()); + setResource(that.resource); + setNumberOfContainers(that.getNumberOfContainers()); + setLaunchCommand(that.getLaunchCommand()); + setConfiguration(that.configuration); + setRunPrivilegedContainer(that.getRunPrivilegedContainer()); + setDependencies(that.getDependencies()); + setPlacementPolicy(that.getPlacementPolicy()); + setReadinessCheck(that.getReadinessCheck()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java index bf09ff2442..6e390737e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java @@ -26,5 +26,5 @@ @InterfaceAudience.Public @InterfaceStability.Unstable public enum ContainerState { - RUNNING_BUT_UNREADY, READY, STOPPED + RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index 286eaa2497..b6ae38bdee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -29,5 +29,6 @@ @ApiModel(description = "The current state of an service.") @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") public enum ServiceState { - ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING; + ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING, + UPGRADING_AUTO_FINALIZE; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 453619b972..52cd369c23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; @@ -59,8 +60,10 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; import org.apache.hadoop.yarn.service.ClientAMProtocol; import org.apache.hadoop.yarn.service.ServiceMaster; +import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; @@ -206,15 +209,21 @@ public int actionBuild(Service service) } @Override - public int actionUpgrade(String appName, String fileName) + public int initiateUpgrade(String appName, String fileName, + boolean autoFinalize) throws IOException, YarnException { - checkAppExistOnHdfs(appName); Service upgradeService = loadAppJsonFromLocalFS(fileName, appName, null, null); - return actionUpgrade(upgradeService); + if (autoFinalize) { + upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE); + } else { + upgradeService.setState(ServiceState.UPGRADING); + } + return initiateUpgrade(upgradeService); } - public int actionUpgrade(Service service) throws YarnException, IOException { + public int initiateUpgrade(Service service) throws YarnException, + IOException { Service persistedService = ServiceApiUtil.loadService(fs, service.getName()); if (!StringUtils.isEmpty(persistedService.getId())) { @@ -231,6 +240,15 @@ public int actionUpgrade(Service service) throws YarnException, IOException { throw new YarnException(message); } + Service liveService = getStatus(service.getName()); + if (!liveService.getState().equals(ServiceState.STABLE)) { + String message = service.getName() + " is at " + + liveService.getState() + + " state, upgrade can not be invoked when service is STABLE."; + LOG.error(message); + throw new YarnException(message); + } + Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true); ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service); @@ -245,8 +263,56 @@ public int actionUpgrade(Service service) throws YarnException, IOException { UpgradeServiceRequestProto.Builder requestBuilder = UpgradeServiceRequestProto.newBuilder(); requestBuilder.setVersion(service.getVersion()); + if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { + requestBuilder.setAutoFinalize(true); + } + UpgradeServiceResponseProto responseProto = proxy.upgrade( + requestBuilder.build()); + if (responseProto.hasError()) { + LOG.error("Service {} upgrade to version {} failed because {}", + service.getName(), service.getVersion(), responseProto.getError()); + throw new YarnException("Failed to upgrade service " + service.getName() + + " to version " + service.getVersion() + " because " + + responseProto.getError()); + } + return EXIT_SUCCESS; + } - proxy.upgrade(requestBuilder.build()); + @Override + public int actionUpgradeInstances(String appName, + List componentInstances) throws IOException, YarnException { + checkAppExistOnHdfs(appName); + Service persistedService = ServiceApiUtil.loadService(fs, appName); + List containersToUpgrade = ServiceApiUtil. + getLiveContainers(persistedService, componentInstances); + return actionUpgrade(persistedService, containersToUpgrade); + } + + public int actionUpgrade(Service service, List compInstances) + throws IOException, YarnException { + ApplicationReport appReport = + yarnClient.getApplicationReport(getAppId(service.getName())); + + if (appReport.getYarnApplicationState() != RUNNING) { + String message = service.getName() + " is at " + + appReport.getYarnApplicationState() + + " state, upgrade can only be invoked when service is running."; + LOG.error(message); + throw new YarnException(message); + } + if (StringUtils.isEmpty(appReport.getHost())) { + throw new YarnException(service.getName() + " AM hostname is empty."); + } + ClientAMProtocol proxy = createAMProxy(service.getName(), appReport); + + List containerIdsToUpgrade = new ArrayList<>(); + compInstances.forEach(compInst -> + containerIdsToUpgrade.add(compInst.getId())); + LOG.info("instances to upgrade {}", containerIdsToUpgrade); + CompInstancesUpgradeRequestProto.Builder upgradeRequestBuilder = + CompInstancesUpgradeRequestProto.newBuilder(); + upgradeRequestBuilder.addAllContainerIds(containerIdsToUpgrade); + proxy.upgrade(upgradeRequestBuilder.build()); return EXIT_SUCCESS; } @@ -391,6 +457,17 @@ private Map flexComponents(String serviceName, LOG.error(message); throw new YarnException(message); } + + Service liveService = getStatus(serviceName); + if (liveService.getState().equals(ServiceState.UPGRADING) || + liveService.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { + String message = serviceName + " is at " + + liveService.getState() + + " state, flex can not be invoked when service is upgrading. "; + LOG.error(message); + throw new YarnException(message); + } + if (StringUtils.isEmpty(appReport.getHost())) { throw new YarnException(serviceName + " AM hostname is empty"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 782cc3beec..5a85e8f39a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -34,20 +34,23 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.service.ServiceEvent; +import org.apache.hadoop.yarn.service.ServiceEventType; +import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.apache.hadoop.yarn.service.api.records.ResourceInformation; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; import org.apache.hadoop.yarn.service.ContainerFailureTracker; import org.apache.hadoop.yarn.service.ServiceContext; -import org.apache.hadoop.yarn.service.ServiceMaster; import org.apache.hadoop.yarn.service.ServiceMetrics; import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.api.records.PlacementPolicy; -import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.api.records.ServiceState; -import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; -import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils; import org.apache.hadoop.yarn.service.monitor.probe.Probe; +import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -70,6 +73,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -109,6 +113,10 @@ public class Component implements EventHandler { // disk_failed containers etc. This will be reset to 0 periodically. public AtomicInteger currentContainerFailure = new AtomicInteger(0); + private AtomicBoolean upgradeInProgress = new AtomicBoolean(false); + private ComponentEvent upgradeEvent; + private AtomicLong numContainersThatNeedUpgrade = new AtomicLong(0); + private StateMachine stateMachine; private AsyncDispatcher dispatcher; @@ -131,7 +139,7 @@ FLEX, new FlexComponentTransition()) .addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED, new ContainerAllocatedTransition()) // container launched on NM - .addTransition(FLEXING, EnumSet.of(STABLE, FLEXING), + .addTransition(FLEXING, EnumSet.of(STABLE, FLEXING, UPGRADING), CONTAINER_STARTED, new ContainerStartedTransition()) // container failed while flexing .addTransition(FLEXING, FLEXING, CONTAINER_COMPLETED, @@ -151,12 +159,19 @@ CONTAINER_STARTED, new ContainerStartedTransition()) // For flex down, go to STABLE state .addTransition(STABLE, EnumSet.of(STABLE, FLEXING), FLEX, new FlexComponentTransition()) - .addTransition(STABLE, UPGRADING, UPGRADE, + .addTransition(STABLE, UPGRADING, ComponentEventType.UPGRADE, new ComponentNeedsUpgradeTransition()) - .addTransition(FLEXING, UPGRADING, UPGRADE, - new ComponentNeedsUpgradeTransition()) - .addTransition(UPGRADING, UPGRADING, UPGRADE, + //Upgrade while previous upgrade is still in progress + .addTransition(UPGRADING, UPGRADING, ComponentEventType.UPGRADE, new ComponentNeedsUpgradeTransition()) + .addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE), + CHECK_STABLE, new CheckStableTransition()) + .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE), + CHECK_STABLE, new CheckStableTransition()) + .addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE, + new CheckStableTransition()) + .addTransition(UPGRADING, FLEXING, CONTAINER_COMPLETED, + new ContainerCompletedTransition()) .installTopology(); public Component( @@ -291,7 +306,10 @@ public void transition(Component component, ComponentEvent event) { component.pendingInstances.remove(instance); instance.setContainer(container); - ProviderUtils.initCompInstanceDir(component.getContext().fs, instance); + + ProviderUtils.initCompInstanceDir(component.getContext().fs, + component.createLaunchContext(component.componentSpec, + component.scheduler.getApp().getVersion()), instance); component.getScheduler().addLiveCompInstance(container.getId(), instance); LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " + "host {}, num pending component instances reduced to {} ", @@ -317,14 +335,21 @@ private static class ContainerStartedTransition implements private static ComponentState checkIfStable(Component component) { // if desired == running if (component.componentMetrics.containersReady.value() == component - .getComponentSpec().getNumberOfContainers()) { + .getComponentSpec().getNumberOfContainers() && + component.numContainersThatNeedUpgrade.get() == 0) { component.componentSpec.setState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); return STABLE; - } else { + } else if (component.componentMetrics.containersReady.value() != component + .getComponentSpec().getNumberOfContainers()) { component.componentSpec.setState( org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); return FLEXING; + } else { + // component.numContainersThatNeedUpgrade.get() > 0 + component.componentSpec.setState(org.apache.hadoop.yarn.service.api. + records.ComponentState.NEEDS_UPGRADE); + return UPGRADING; } } @@ -336,8 +361,9 @@ public static synchronized void checkAndUpdateComponentState( component.componentSpec.getState(); if (isIncrement) { // check if all containers are in READY state - if (component.componentMetrics.containersReady - .value() == component.componentMetrics.containersDesired.value()) { + if (component.numContainersThatNeedUpgrade.get() == 0 && + component.componentMetrics.containersReady.value() == + component.componentMetrics.containersDesired.value()) { component.componentSpec.setState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); if (curState != component.componentSpec.getState()) { @@ -346,8 +372,7 @@ public static synchronized void checkAndUpdateComponentState( component.componentSpec.getState()); } // component state change will trigger re-check of service state - ServiceMaster.checkAndUpdateServiceState(component.scheduler, - isIncrement); + component.context.getServiceManager().checkAndUpdateServiceState(true); } } else { // container moving out of READY state could be because of FLEX down so @@ -362,10 +387,13 @@ public static synchronized void checkAndUpdateComponentState( component.componentSpec.getState()); } // component state change will trigger re-check of service state - ServiceMaster.checkAndUpdateServiceState(component.scheduler, - isIncrement); + component.context.getServiceManager().checkAndUpdateServiceState(false); } } + // when the service is stable then the state of component needs to + // transition to stable + component.dispatcher.getEventHandler().handle(new ComponentEvent( + component.getName(), ComponentEventType.CHECK_STABLE)); } private static class ContainerCompletedTransition extends BaseTransition { @@ -377,15 +405,52 @@ public void transition(Component component, ComponentEvent event) { STOP).setStatus(event.getStatus())); component.componentSpec.setState( org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); - component.getScheduler().getApp().setState(ServiceState.STARTED); + if (component.context.service.getState().equals(ServiceState.STABLE)) { + component.getScheduler().getApp().setState(ServiceState.STARTED); + LOG.info("Service def state changed from {} -> {}", + ServiceState.STABLE, ServiceState.STARTED); + } } } private static class ComponentNeedsUpgradeTransition extends BaseTransition { @Override public void transition(Component component, ComponentEvent event) { + component.upgradeInProgress.set(true); component.componentSpec.setState(org.apache.hadoop.yarn.service.api. records.ComponentState.NEEDS_UPGRADE); + component.numContainersThatNeedUpgrade.set( + component.componentSpec.getNumberOfContainers()); + component.componentSpec.getContainers().forEach(container -> + container.setState(ContainerState.NEEDS_UPGRADE)); + component.upgradeEvent = event; + } + } + + private static class CheckStableTransition implements MultipleArcTransition + { + + @Override + public ComponentState transition(Component component, + ComponentEvent componentEvent) { + org.apache.hadoop.yarn.service.api.records.ComponentState currState = + component.componentSpec.getState(); + if (currState.equals(org.apache.hadoop.yarn.service.api.records + .ComponentState.STABLE)) { + return ComponentState.STABLE; + } + // checkIfStable also updates the state in definition when STABLE + ComponentState targetState = checkIfStable(component); + if (targetState.equals(STABLE) && component.upgradeInProgress.get()) { + component.componentSpec.overwrite( + component.upgradeEvent.getTargetSpec()); + component.upgradeEvent = null; + ServiceEvent checkStable = new ServiceEvent(ServiceEventType. + CHECK_STABLE); + component.dispatcher.getEventHandler().handle(checkStable); + component.upgradeInProgress.set(false); + } + return targetState; } } @@ -421,8 +486,28 @@ private void assignContainerToCompInstance(Container container) { "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ", getName(), container.getId(), instance.getCompInstanceName(), container.getNodeId()); - scheduler.getContainerLaunchService() - .launchCompInstance(scheduler.getApp(), instance, container); + if (upgradeInProgress.get()) { + scheduler.getContainerLaunchService() + .launchCompInstance(scheduler.getApp(), instance, container, + createLaunchContext(upgradeEvent.getTargetSpec(), + upgradeEvent.getUpgradeVersion())); + } else { + scheduler.getContainerLaunchService().launchCompInstance( + scheduler.getApp(), instance, container, + createLaunchContext(componentSpec, scheduler.getApp().getVersion())); + } + } + + public ContainerLaunchService.ComponentLaunchContext createLaunchContext( + org.apache.hadoop.yarn.service.api.records.Component compSpec, + String version) { + ContainerLaunchService.ComponentLaunchContext launchContext = + new ContainerLaunchService.ComponentLaunchContext(compSpec.getName(), + version); + launchContext.setArtifact(compSpec.getArtifact()) + .setConfiguration(compSpec.getConfiguration()) + .setLaunchCommand(compSpec.getLaunchCommand()); + return launchContext; } @SuppressWarnings({ "unchecked" }) @@ -661,16 +746,24 @@ public void decRunningContainers() { scheduler.getServiceMetrics().containersRunning.decr(); } - public void incContainersReady() { + public void incContainersReady(boolean updateDefinition) { componentMetrics.containersReady.incr(); scheduler.getServiceMetrics().containersReady.incr(); - checkAndUpdateComponentState(this, true); + if (updateDefinition) { + checkAndUpdateComponentState(this, true); + } } - public void decContainersReady() { + public void decContainersReady(boolean updateDefinition) { componentMetrics.containersReady.decr(); scheduler.getServiceMetrics().containersReady.decr(); - checkAndUpdateComponentState(this, false); + if (updateDefinition) { + checkAndUpdateComponentState(this, false); + } + } + + public void decContainersThatNeedUpgrade() { + numContainersThatNeedUpgrade.decrementAndGet(); } public int getNumReadyInstances() { @@ -729,6 +822,16 @@ public ComponentState getState() { this.readLock.unlock(); } } + + public ComponentEvent getUpgradeEvent() { + this.readLock.lock(); + try { + return upgradeEvent; + } finally { + this.readLock.unlock(); + } + } + public ServiceScheduler getScheduler() { return scheduler; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java index 7bd5cb9399..84caa77b20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java @@ -34,6 +34,7 @@ public class ComponentEvent extends AbstractEvent { private ContainerStatus status; private ContainerId containerId; private org.apache.hadoop.yarn.service.api.records.Component targetSpec; + private String upgradeVersion; public ContainerId getContainerId() { return containerId; @@ -103,4 +104,13 @@ public ComponentEvent setTargetSpec( this.targetSpec = Preconditions.checkNotNull(targetSpec); return this; } + + public String getUpgradeVersion() { + return upgradeVersion; + } + + public ComponentEvent setUpgradeVersion(String upgradeVersion) { + this.upgradeVersion = upgradeVersion; + return this; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java index 970788aadc..44d781f225 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java @@ -25,5 +25,5 @@ public enum ComponentEventType { CONTAINER_STARTED, CONTAINER_COMPLETED, UPGRADE, - STOP_UPGRADE + CHECK_STABLE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index c57d888272..ffb9d76220 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.component.Component; +import org.apache.hadoop.yarn.service.component.ComponentEvent; +import org.apache.hadoop.yarn.service.component.ComponentEventType; import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; @@ -116,10 +118,15 @@ public class ComponentInstance implements EventHandler, .addTransition(READY, STARTED, BECOME_NOT_READY, new ContainerBecomeNotReadyTransition()) .addTransition(READY, INIT, STOP, new ContainerStoppedTransition()) + .addTransition(READY, UPGRADING, UPGRADE, + new ContainerUpgradeTransition()) + .addTransition(UPGRADING, UPGRADING, UPGRADE, + new ContainerUpgradeTransition()) + .addTransition(UPGRADING, READY, BECOME_READY, + new ContainerBecomeReadyTransition()) + .addTransition(UPGRADING, INIT, STOP, new ContainerStoppedTransition()) .installTopology(); - - public ComponentInstance(Component component, ComponentInstanceId compInstanceId) { this.stateMachine = stateMachineFactory.make(this); @@ -186,7 +193,17 @@ private static class ContainerBecomeReadyTransition extends BaseTransition { public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { compInstance.containerSpec.setState(ContainerState.READY); - compInstance.component.incContainersReady(); + if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) { + compInstance.component.incContainersReady(false); + compInstance.component.decContainersThatNeedUpgrade(); + ComponentEvent checkState = new ComponentEvent( + compInstance.component.getName(), ComponentEventType.CHECK_STABLE); + compInstance.scheduler.getDispatcher().getEventHandler().handle( + checkState); + + } else { + compInstance.component.incContainersReady(true); + } if (compInstance.timelineServiceEnabled) { compInstance.serviceTimelinePublisher .componentInstanceBecomeReady(compInstance.containerSpec); @@ -199,7 +216,7 @@ private static class ContainerBecomeNotReadyTransition extends BaseTransition { public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY); - compInstance.component.decContainersReady(); + compInstance.component.decContainersReady(true); } } @@ -225,9 +242,11 @@ public void transition(ComponentInstance compInstance, .getDiagnostics(); compInstance.diagnostics.append(containerDiag + System.lineSeparator()); compInstance.cancelContainerStatusRetriever(); - + if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) { + compInstance.component.decContainersThatNeedUpgrade(); + } if (compInstance.getState().equals(READY)) { - compInstance.component.decContainersReady(); + compInstance.component.decContainersReady(true); } compInstance.component.decRunningContainers(); boolean shouldExit = false; @@ -287,6 +306,23 @@ public void transition(ComponentInstance compInstance, } } + private static class ContainerUpgradeTransition extends BaseTransition { + + @Override + public void transition(ComponentInstance compInstance, + ComponentInstanceEvent event) { + compInstance.containerSpec.setState(ContainerState.UPGRADING); + compInstance.component.decContainersReady(false); + ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent(); + compInstance.scheduler.getContainerLaunchService() + .reInitCompInstance(compInstance.scheduler.getApp(), compInstance, + compInstance.container, + compInstance.component.createLaunchContext( + upgradeEvent.getTargetSpec(), + upgradeEvent.getUpgradeVersion())); + } + } + public ComponentInstanceState getState() { this.readLock.lock(); @@ -422,7 +458,7 @@ public void destroy() { component.decRunningContainers(); } if (getState() == READY) { - component.decContainersReady(); + component.decContainersReady(true); component.decRunningContainers(); } getCompSpec().removeContainer(containerSpec); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java index 1a880ba442..665b8faf55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java @@ -22,5 +22,6 @@ public enum ComponentInstanceEventType { START, STOP, BECOME_READY, - BECOME_NOT_READY + BECOME_NOT_READY, + UPGRADE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java index 243fc5269b..bd1e9e7155 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java @@ -17,6 +17,8 @@ package org.apache.hadoop.yarn.service.conf; +import javax.ws.rs.core.MediaType; + public interface RestApiConstants { // Rest endpoints @@ -26,9 +28,19 @@ public interface RestApiConstants { String SERVICE_PATH = "/services/{service_name}"; String COMPONENT_PATH = "/services/{service_name}/components/{component_name}"; + String COMP_INSTANCE_PATH = SERVICE_PATH + + "/component-instances/{component_instance_name}"; + String COMP_INSTANCE_LONG_PATH = COMPONENT_PATH + + "/component-instances/{component_instance_name}"; + String COMP_INSTANCES = "component-instances"; + String COMP_INSTANCES_PATH = SERVICE_PATH + "/" + COMP_INSTANCES; + // Query param String SERVICE_NAME = "service_name"; String COMPONENT_NAME = "component_name"; + String COMP_INSTANCE_NAME = "component_instance_name"; + + String MEDIA_TYPE_JSON_UTF8 = MediaType.APPLICATION_JSON + ";charset=utf-8"; Long DEFAULT_UNLIMITED_LIFETIME = -1l; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java index e07661bec6..084c721ebf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java @@ -18,11 +18,12 @@ package org.apache.hadoop.yarn.service.containerlaunch; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.service.ServiceContext; -import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.provider.ProviderService; import org.apache.hadoop.yarn.service.provider.ProviderFactory; @@ -63,36 +64,57 @@ protected void serviceStop() throws Exception { } public void launchCompInstance(Service service, - ComponentInstance instance, Container container) { + ComponentInstance instance, Container container, + ComponentLaunchContext componentLaunchContext) { ContainerLauncher launcher = - new ContainerLauncher(service, instance, container); + new ContainerLauncher(service, instance, container, + componentLaunchContext, false); executorService.execute(launcher); } + public void reInitCompInstance(Service service, + ComponentInstance instance, Container container, + ComponentLaunchContext componentLaunchContext) { + ContainerLauncher reInitializer = new ContainerLauncher(service, instance, + container, componentLaunchContext, true); + executorService.execute(reInitializer); + } + private class ContainerLauncher implements Runnable { public final Container container; public final Service service; public ComponentInstance instance; + private final ComponentLaunchContext componentLaunchContext; + private final boolean reInit; - public ContainerLauncher( - Service service, - ComponentInstance instance, Container container) { + ContainerLauncher(Service service, ComponentInstance instance, + Container container, ComponentLaunchContext componentLaunchContext, + boolean reInit) { this.container = container; this.service = service; this.instance = instance; + this.componentLaunchContext = componentLaunchContext; + this.reInit = reInit; } @Override public void run() { - Component compSpec = instance.getCompSpec(); ProviderService provider = ProviderFactory.getProviderService( - compSpec.getArtifact()); + componentLaunchContext.getArtifact()); AbstractLauncher launcher = new AbstractLauncher(context); try { provider.buildContainerLaunchContext(launcher, service, - instance, fs, getConfig(), container); - instance.getComponent().getScheduler().getNmClient() - .startContainerAsync(container, - launcher.completeContainerLaunch()); + instance, fs, getConfig(), container, componentLaunchContext); + if (!reInit) { + LOG.info("launching container {}", container.getId()); + instance.getComponent().getScheduler().getNmClient() + .startContainerAsync(container, + launcher.completeContainerLaunch()); + } else { + LOG.info("reInitializing container {}", container.getId()); + instance.getComponent().getScheduler().getNmClient() + .reInitializeContainerAsync(container.getId(), + launcher.completeContainerLaunch(), true); + } } catch (Exception e) { LOG.error(instance.getCompInstanceId() + ": Failed to launch container. ", e); @@ -100,4 +122,58 @@ public ContainerLauncher( } } } + + /** + * Launch context of a component. + */ + public static class ComponentLaunchContext { + private final String name; + private final String serviceVersion; + private Artifact artifact; + private org.apache.hadoop.yarn.service.api.records.Configuration + configuration; + private String launchCommand; + + public ComponentLaunchContext(String name, String serviceVersion) { + this.name = Preconditions.checkNotNull(name); + this.serviceVersion = Preconditions.checkNotNull(serviceVersion); + } + + public String getName() { + return name; + } + + public String getServiceVersion() { + return serviceVersion; + } + + public Artifact getArtifact() { + return artifact; + } + + public org.apache.hadoop.yarn.service.api.records. + Configuration getConfiguration() { + return configuration; + } + + public String getLaunchCommand() { + return launchCommand; + } + + public ComponentLaunchContext setArtifact(Artifact artifact) { + this.artifact = artifact; + return this; + } + + public ComponentLaunchContext setConfiguration(org.apache.hadoop.yarn. + service.api.records.Configuration configuration) { + this.configuration = configuration; + return this; + } + + public ComponentLaunchContext setLaunchCommand(String launchCommand) { + this.launchCommand = launchCommand; + return this; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java index 8152225e8a..e82181eb70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.net.InetSocketAddress; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; @@ -114,4 +116,16 @@ public RestartServiceResponseProto restart(RestartServiceRequestProto request) } return null; } + + @Override + public CompInstancesUpgradeResponseProto upgrade( + CompInstancesUpgradeRequestProto request) + throws IOException, YarnException { + try { + return proxy.upgrade(null, request); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + } + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java index 1a1a1ef01e..50a678b393 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java @@ -21,6 +21,8 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; @@ -91,4 +93,14 @@ public RestartServiceResponseProto restartService(RpcController controller, throw new ServiceException(e); } } + + @Override + public CompInstancesUpgradeResponseProto upgrade(RpcController controller, + CompInstancesUpgradeRequestProto request) throws ServiceException { + try { + return real.upgrade(request); + } catch (IOException | YarnException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java index ee276866af..560f42160c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java @@ -23,8 +23,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.conf.YarnServiceConf; -import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; +import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.service.exceptions.SliderException; @@ -60,9 +60,9 @@ public abstract void processArtifact(AbstractLauncher launcher, public void buildContainerLaunchContext(AbstractLauncher launcher, Service service, ComponentInstance instance, - SliderFileSystem fileSystem, Configuration yarnConf, Container container) + SliderFileSystem fileSystem, Configuration yarnConf, Container container, + ContainerLaunchService.ComponentLaunchContext compLaunchContext) throws IOException, SliderException { - Component component = instance.getComponent().getComponentSpec();; processArtifact(launcher, instance, fileSystem, service); ServiceContext context = @@ -72,11 +72,12 @@ public void buildContainerLaunchContext(AbstractLauncher launcher, Map globalTokens = instance.getComponent().getScheduler().globalTokens; Map tokensForSubstitution = ProviderUtils - .initCompTokensForSubstitute(instance, container); + .initCompTokensForSubstitute(instance, container, + compLaunchContext); tokensForSubstitution.putAll(globalTokens); // Set the environment variables in launcher - launcher.putEnv(ServiceUtils - .buildEnvMap(component.getConfiguration(), tokensForSubstitution)); + launcher.putEnv(ServiceUtils.buildEnvMap( + compLaunchContext.getConfiguration(), tokensForSubstitution)); launcher.setEnv("WORK_DIR", ApplicationConstants.Environment.PWD.$()); launcher.setEnv("LOG_DIR", ApplicationConstants.LOG_DIR_EXPANSION_VAR); if (System.getenv(HADOOP_USER_NAME) != null) { @@ -94,10 +95,10 @@ public void buildContainerLaunchContext(AbstractLauncher launcher, // create config file on hdfs and add local resource ProviderUtils.createConfigFileAndAddLocalResource(launcher, fileSystem, - component, tokensForSubstitution, instance, context); + compLaunchContext, tokensForSubstitution, instance, context); // substitute launch command - String launchCommand = component.getLaunchCommand(); + String launchCommand = compLaunchContext.getLaunchCommand(); // docker container may have empty commands if (!StringUtils.isEmpty(launchCommand)) { launchCommand = ProviderUtils @@ -111,12 +112,12 @@ public void buildContainerLaunchContext(AbstractLauncher launcher, // By default retry forever every 30 seconds launcher.setRetryContext( YarnServiceConf.getInt(CONTAINER_RETRY_MAX, DEFAULT_CONTAINER_RETRY_MAX, - component.getConfiguration(), yarnConf), + compLaunchContext.getConfiguration(), yarnConf), YarnServiceConf.getInt(CONTAINER_RETRY_INTERVAL, - DEFAULT_CONTAINER_RETRY_INTERVAL, component.getConfiguration(), - yarnConf), + DEFAULT_CONTAINER_RETRY_INTERVAL, + compLaunchContext.getConfiguration(), yarnConf), YarnServiceConf.getLong(CONTAINER_FAILURES_VALIDITY_INTERVAL, DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL, - component.getConfiguration(), yarnConf)); + compLaunchContext.getConfiguration(), yarnConf)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java index 11015ea175..fe765de695 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher; @@ -35,6 +36,8 @@ public interface ProviderService { */ void buildContainerLaunchContext(AbstractLauncher containerLauncher, Service service, ComponentInstance instance, - SliderFileSystem sliderFileSystem, Configuration yarnConf, Container - container) throws IOException, SliderException; + SliderFileSystem sliderFileSystem, Configuration yarnConf, + Container container, + ContainerLaunchService.ComponentLaunchContext componentLaunchContext) + throws IOException, SliderException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java index d65a1969a1..2fc8cfb5cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java @@ -27,12 +27,12 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.service.ServiceContext; -import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.ConfigFile; import org.apache.hadoop.yarn.service.api.records.ConfigFormat; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher; +import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.exceptions.BadCommandArgumentsException; import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.utils.PublishedConfiguration; @@ -51,7 +51,11 @@ import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; -import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*; +import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_ID; +import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_INSTANCE_NAME; +import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_NAME; +import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_NAME_LC; +import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.CONTAINER_ID; /** * This is a factoring out of methods handy for providers. It's bonded to a log @@ -160,9 +164,11 @@ public static void substituteMapWithTokens(Map configs, } public static Path initCompInstanceDir(SliderFileSystem fs, + ContainerLaunchService.ComponentLaunchContext compLaunchContext, ComponentInstance instance) { Path compDir = new Path(new Path(fs.getAppDir(), "components"), - instance.getCompName()); + compLaunchContext.getServiceVersion() + "/" + + compLaunchContext.getName()); Path compInstanceDir = new Path(compDir, instance.getCompInstanceName()); instance.setCompInstanceDir(compInstanceDir); return compInstanceDir; @@ -171,10 +177,11 @@ public static Path initCompInstanceDir(SliderFileSystem fs, // 1. Create all config files for a component on hdfs for localization // 2. Add the config file to localResource public static synchronized void createConfigFileAndAddLocalResource( - AbstractLauncher launcher, SliderFileSystem fs, Component component, + AbstractLauncher launcher, SliderFileSystem fs, + ContainerLaunchService.ComponentLaunchContext compLaunchContext, Map tokensForSubstitution, ComponentInstance instance, ServiceContext context) throws IOException { - Path compInstanceDir = initCompInstanceDir(fs, instance); + Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext, instance); if (!fs.getFileSystem().exists(compInstanceDir)) { log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir); fs.getFileSystem().mkdirs(compInstanceDir, @@ -189,7 +196,8 @@ public static synchronized void createConfigFileAndAddLocalResource( + tokensForSubstitution); } - for (ConfigFile originalFile : component.getConfiguration().getFiles()) { + for (ConfigFile originalFile : compLaunchContext.getConfiguration() + .getFiles()) { ConfigFile configFile = originalFile.copy(); String fileName = new Path(configFile.getDestFile()).getName(); @@ -343,11 +351,12 @@ private static void resolvePlainTemplateAndSaveOnHdfs(FileSystem fs, * @return tokens to replace */ public static Map initCompTokensForSubstitute( - ComponentInstance instance, Container container) { + ComponentInstance instance, Container container, + ContainerLaunchService.ComponentLaunchContext componentLaunchContext) { Map tokens = new HashMap<>(); - tokens.put(COMPONENT_NAME, instance.getCompSpec().getName()); + tokens.put(COMPONENT_NAME, componentLaunchContext.getName()); tokens - .put(COMPONENT_NAME_LC, instance.getCompSpec().getName().toLowerCase()); + .put(COMPONENT_NAME_LC, componentLaunchContext.getName().toLowerCase()); tokens.put(COMPONENT_INSTANCE_NAME, instance.getCompInstanceName()); tokens.put(CONTAINER_ID, container.getId().toString()); tokens.put(COMPONENT_ID, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index 194ae83d68..33919baf91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.service.utils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -27,12 +29,13 @@ import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Configuration; import org.apache.hadoop.yarn.service.api.records.PlacementConstraint; import org.apache.hadoop.yarn.service.api.records.Resource; -import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.conf.RestApiConstants; import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages; @@ -66,6 +69,8 @@ public class ServiceApiUtil { private static final PatternValidator userNamePattern = new PatternValidator("[a-z][a-z0-9-.]*"); + + @VisibleForTesting public static void setJsonSerDeser(JsonSerDeser jsd) { jsonSerDeser = jsd; @@ -496,6 +501,47 @@ public static Path writeAppDefinition(SliderFileSystem fs, Path appDir, return appJson; } + public static List getLiveContainers(Service service, + List componentInstances) + throws YarnException { + List result = new ArrayList<>(); + + // In order to avoid iterating over all the containers of all components, + // first find the affected components by parsing the instance name. + Multimap affectedComps = ArrayListMultimap.create(); + for (String instanceName : componentInstances) { + affectedComps.put( + ServiceApiUtil.parseComponentName(instanceName), instanceName); + } + + service.getComponents().forEach(comp -> { + // Iterating once over the containers of the affected component to + // find all the containers. Avoiding multiple calls to + // service.getComponent(...) and component.getContainer(...) because they + // iterate over all the components of the service and all the containers + // of the components respectively. + if (affectedComps.get(comp.getName()) != null) { + Collection instanceNames = affectedComps.get(comp.getName()); + comp.getContainers().forEach(container -> { + if (instanceNames.contains(container.getComponentInstanceName())) { + result.add(container); + } + }); + } + }); + return result; + } + + private static String parseComponentName(String componentInstanceName) + throws YarnException { + int idx = componentInstanceName.lastIndexOf('-'); + if (idx == -1) { + throw new YarnException("Invalid component instance (" + + componentInstanceName + ") name."); + } + return componentInstanceName.substring(0, idx); + } + public static String $(String s) { return "${" + s +"}"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto index 3677593971..91721b0d90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto @@ -30,6 +30,8 @@ service ClientAMProtocolService { returns (UpgradeServiceResponseProto); rpc restartService(RestartServiceRequestProto) returns (RestartServiceResponseProto); + rpc upgrade(CompInstancesUpgradeRequestProto) returns + (CompInstancesUpgradeResponseProto); } message FlexComponentsRequestProto { @@ -61,13 +63,22 @@ message StopResponseProto { message UpgradeServiceRequestProto { optional string version = 1; + optional bool autoFinalize = 2; } message UpgradeServiceResponseProto { + optional string error = 1; } message RestartServiceRequestProto { } message RestartServiceResponseProto { +} + +message CompInstancesUpgradeRequestProto { + repeated string containerIds = 1; +} + +message CompInstancesUpgradeResponseProto { } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java index 57cf367aad..260976aabd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java @@ -108,6 +108,7 @@ public void testContainerCompleted() throws TimeoutException, ApplicationId applicationId = ApplicationId.newInstance(123456, 1); Service exampleApp = new Service(); exampleApp.setId(applicationId.toString()); + exampleApp.setVersion("v1"); exampleApp.setName("testContainerCompleted"); exampleApp.addComponent(createComponent("compa", 1, "pwd")); @@ -146,6 +147,7 @@ public void testContainersFromPreviousAttemptsWithRMRestart() System.currentTimeMillis(), 1); Service exampleApp = new Service(); exampleApp.setId(applicationId.toString()); + exampleApp.setVersion("v1"); exampleApp.setName("testContainersRecovers"); String comp1Name = "comp1"; String comp1InstName = "comp1-0"; @@ -189,6 +191,7 @@ public void testContainersReleasedWhenExpired() Service exampleApp = new Service(); exampleApp.setId(applicationId.toString()); exampleApp.setName("testContainersRecovers"); + exampleApp.setVersion("v1"); String comp1Name = "comp1"; String comp1InstName = "comp1-0"; @@ -230,6 +233,7 @@ public void testContainersFromDifferentApp() Service exampleApp = new Service(); exampleApp.setId(applicationId.toString()); exampleApp.setName("testContainersFromDifferentApp"); + exampleApp.setVersion("v1"); String comp1Name = "comp1"; String comp1InstName = "comp1-0"; @@ -270,6 +274,7 @@ public void testScheduleWithMultipleResourceTypes() Service exampleApp = new Service(); exampleApp.setId(applicationId.toString()); exampleApp.setName("testScheduleWithMultipleResourceTypes"); + exampleApp.setVersion("v1"); List resourceTypeInfos = new ArrayList<>( ResourceUtils.getResourcesTypeInfo()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java index c65a5d4efe..56a0c71efd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java @@ -49,7 +49,7 @@ public class TestServiceManager { @Test public void testUpgrade() throws IOException, SliderException { ServiceManager serviceManager = createTestServiceManager("testUpgrade"); - upgrade(serviceManager, "v2", false); + upgrade(serviceManager, "v2", false, false); Assert.assertEquals("service not upgraded", ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); } @@ -57,8 +57,9 @@ public void testUpgrade() throws IOException, SliderException { @Test public void testRestartNothingToUpgrade() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager("testRestart"); - upgrade(serviceManager, "v2", false); + ServiceManager serviceManager = createTestServiceManager( + "testRestartNothingToUpgrade"); + upgrade(serviceManager, "v2", false, false); //make components stable serviceManager.getServiceSpec().getComponents().forEach(comp -> { @@ -69,22 +70,119 @@ public void testRestartNothingToUpgrade() serviceManager.getServiceSpec().getState()); } + @Test + public void testAutoFinalizeNothingToUpgrade() throws IOException, + SliderException { + ServiceManager serviceManager = createTestServiceManager( + "testAutoFinalizeNothingToUpgrade"); + upgrade(serviceManager, "v2", false, true); + + //make components stable + serviceManager.getServiceSpec().getComponents().forEach(comp -> + comp.setState(ComponentState.STABLE)); + serviceManager.handle(new ServiceEvent(ServiceEventType.CHECK_STABLE)); + Assert.assertEquals("service stable", ServiceState.STABLE, + serviceManager.getServiceSpec().getState()); + } + @Test public void testRestartWithPendingUpgrade() throws IOException, SliderException { ServiceManager serviceManager = createTestServiceManager("testRestart"); - upgrade(serviceManager, "v2", true); + upgrade(serviceManager, "v2", true, false); serviceManager.handle(new ServiceEvent(ServiceEventType.START)); Assert.assertEquals("service should still be upgrading", ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); } + @Test + public void testCheckState() throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager( + "testCheckState"); + upgrade(serviceManager, "v2", true, false); + Assert.assertEquals("service not upgrading", ServiceState.UPGRADING, + serviceManager.getServiceSpec().getState()); - private void upgrade(ServiceManager service, String version, - boolean upgradeArtifact) + // make components stable + serviceManager.getServiceSpec().getComponents().forEach(comp -> { + comp.setState(ComponentState.STABLE); + }); + ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE); + serviceManager.handle(checkStable); + Assert.assertEquals("service should still be upgrading", + ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); + + // finalize service + ServiceEvent restart = new ServiceEvent(ServiceEventType.START); + serviceManager.handle(restart); + Assert.assertEquals("service not stable", + ServiceState.STABLE, serviceManager.getServiceSpec().getState()); + + validateUpgradeFinalization(serviceManager.getName(), "v2"); + } + + @Test + public void testCheckStateAutoFinalize() throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager( + "testCheckState"); + serviceManager.getServiceSpec().setState( + ServiceState.UPGRADING_AUTO_FINALIZE); + upgrade(serviceManager, "v2", true, true); + Assert.assertEquals("service not upgrading", + ServiceState.UPGRADING_AUTO_FINALIZE, + serviceManager.getServiceSpec().getState()); + + // make components stable + serviceManager.getServiceSpec().getComponents().forEach(comp -> + comp.setState(ComponentState.STABLE)); + ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE); + serviceManager.handle(checkStable); + Assert.assertEquals("service not stable", + ServiceState.STABLE, serviceManager.getServiceSpec().getState()); + + validateUpgradeFinalization(serviceManager.getName(), "v2"); + } + + @Test + public void testInvalidUpgrade() throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager( + "testInvalidUpgrade"); + serviceManager.getServiceSpec().setState( + ServiceState.UPGRADING_AUTO_FINALIZE); + Service upgradedDef = ServiceTestUtils.createExampleApplication(); + upgradedDef.setName(serviceManager.getName()); + upgradedDef.setVersion("v2"); + upgradedDef.setLifetime(2L); + writeUpgradedDef(upgradedDef); + + try { + serviceManager.processUpgradeRequest("v2", true); + } catch (Exception ex) { + Assert.assertTrue(ex instanceof UnsupportedOperationException); + return; + } + Assert.fail(); + } + + private void validateUpgradeFinalization(String serviceName, + String expectedVersion) throws IOException { + Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName); + Assert.assertEquals("service def not re-written", expectedVersion, + savedSpec.getVersion()); + Assert.assertNotNull("app id not present", savedSpec.getId()); + Assert.assertEquals("state not stable", ServiceState.STABLE, + savedSpec.getState()); + savedSpec.getComponents().forEach(compSpec -> { + Assert.assertEquals("comp not stable", ComponentState.STABLE, + compSpec.getState()); + }); + } + + private void upgrade(ServiceManager serviceManager, String version, + boolean upgradeArtifact, boolean autoFinalize) throws IOException, SliderException { Service upgradedDef = ServiceTestUtils.createExampleApplication(); - upgradedDef.setName(service.getName()); + upgradedDef.setName(serviceManager.getName()); upgradedDef.setVersion(version); if (upgradeArtifact) { Artifact upgradedArtifact = createTestArtifact("2"); @@ -93,9 +191,13 @@ private void upgrade(ServiceManager service, String version, }); } writeUpgradedDef(upgradedDef); + serviceManager.processUpgradeRequest(version, autoFinalize); ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE); - upgradeEvent.setVersion("v2"); - service.handle(upgradeEvent); + upgradeEvent.setVersion(version); + if (autoFinalize) { + upgradeEvent.setAutoFinalize(true); + } + serviceManager.handle(upgradeEvent); } private ServiceManager createTestServiceManager(String name) @@ -124,7 +226,7 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( return new ServiceManager(context); } - static Service createBaseDef(String name) { + public static Service createBaseDef(String name) { ApplicationId applicationId = ApplicationId.newInstance( System.currentTimeMillis(), 1); Service serviceDef = ServiceTestUtils.createExampleApplication(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 443ba0b403..32ea6e5c71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -31,9 +31,9 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.service.api.records.Component; -import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.PlacementConstraint; @@ -372,25 +372,47 @@ public void testRecoverComponentsAfterRMRestart() throws Exception { } @Test(timeout = 200000) - public void testUpgradeService() throws Exception { + public void testUpgrade() throws Exception { setupInternal(NUM_NMS); ServiceClient client = createClient(getConf()); Service service = createExampleApplication(); client.actionCreate(service); - waitForServiceToBeStarted(client, service); + waitForServiceToBeStable(client, service); - //upgrade the service + // upgrade the service + Component component = service.getComponents().iterator().next(); + service.setState(ServiceState.UPGRADING); service.setVersion("v2"); - client.actionUpgrade(service); + component.getConfiguration().getEnv().put("key1", "val1"); + client.initiateUpgrade(service); - //wait for service to be in upgrade state + // wait for service to be in upgrade state waitForServiceToBeInState(client, service, ServiceState.UPGRADING); SliderFileSystem fs = new SliderFileSystem(getConf()); Service fromFs = ServiceApiUtil.loadServiceUpgrade(fs, service.getName(), service.getVersion()); Assert.assertEquals(service.getName(), fromFs.getName()); Assert.assertEquals(service.getVersion(), fromFs.getVersion()); + + // upgrade containers + Service liveService = client.getStatus(service.getName()); + client.actionUpgrade(service, + liveService.getComponent(component.getName()).getContainers()); + waitForAllCompToBeReady(client, service); + + // finalize the upgrade + client.actionStart(service.getName()); + waitForServiceToBeStable(client, service); + Service active = client.getStatus(service.getName()); + Assert.assertEquals("component not stable", ComponentState.STABLE, + active.getComponent(component.getName()).getState()); + Assert.assertEquals("comp does not have new env", "val1", + active.getComponent(component.getName()).getConfiguration() + .getEnv("key1")); + LOG.info("Stop/destroy service {}", service); + client.actionStop(service.getName(), true); + client.actionDestroy(service.getName()); } // Test to verify ANTI_AFFINITY placement policy diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java index a95818f083..72909628fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java @@ -21,9 +21,9 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.client.cli.ApplicationCLI; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.conf.ExampleAppJson; @@ -36,12 +36,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; -import java.util.Arrays; +import java.io.PrintStream; import java.util.List; +import static org.apache.hadoop.yarn.client.api.AppAdminClient.YARN_APP_ADMIN_CLIENT_PREFIX; import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH; +import static org.mockito.Mockito.spy; public class TestServiceCLI { private static final Logger LOG = LoggerFactory.getLogger(TestServiceCLI @@ -51,33 +54,36 @@ public class TestServiceCLI { private File basedir; private SliderFileSystem fs; private String basedirProp; + private ApplicationCLI cli; - private void runCLI(String[] args) throws Exception { - LOG.info("running CLI: yarn {}", Arrays.asList(args)); - ApplicationCLI cli = new ApplicationCLI(); - cli.setSysOutPrintStream(System.out); - cli.setSysErrPrintStream(System.err); - int res = ToolRunner.run(cli, ApplicationCLI.preProcessArgs(args)); - cli.stop(); + private void createCLI() { + cli = new ApplicationCLI(); + PrintStream sysOut = spy(new PrintStream(new ByteArrayOutputStream())); + PrintStream sysErr = spy(new PrintStream(new ByteArrayOutputStream())); + cli.setSysOutPrintStream(sysOut); + cli.setSysErrPrintStream(sysErr); + conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE, + DummyServiceClient.class.getName()); + cli.setConf(conf); } private void buildApp(String serviceName, String appDef) throws Throwable { String[] args = {"app", "-D", basedirProp, "-save", serviceName, ExampleAppJson.resourceName(appDef), - "-appTypes", AppAdminClient.UNIT_TEST_TYPE}; - runCLI(args); + "-appTypes", DUMMY_APP_TYPE}; + ToolRunner.run(cli, ApplicationCLI.preProcessArgs(args)); } - private void buildApp(String serviceName, String appDef, String lifetime, - String queue) throws Throwable { + private void buildApp(String serviceName, String appDef, + String lifetime, String queue) throws Throwable { String[] args = {"app", "-D", basedirProp, "-save", serviceName, ExampleAppJson.resourceName(appDef), - "-appTypes", AppAdminClient.UNIT_TEST_TYPE, + "-appTypes", DUMMY_APP_TYPE, "-updateLifetime", lifetime, "-changeQueue", queue}; - runCLI(args); + ToolRunner.run(cli, ApplicationCLI.preProcessArgs(args)); } @Before @@ -91,6 +97,7 @@ public void setup() throws Throwable { } else { basedir.mkdirs(); } + createCLI(); } @After @@ -98,6 +105,7 @@ public void tearDown() throws IOException { if (basedir != null) { FileUtils.deleteDirectory(basedir); } + cli.stop(); } @Test @@ -114,6 +122,38 @@ public void testFlexComponents() throws Throwable { checkApp(serviceName, "master", 1L, 1000L, "qname"); } + @Test + public void testInitiateServiceUpgrade() throws Exception { + String[] args = {"app", "-upgrade", "app-1", + "-initiate", ExampleAppJson.resourceName(ExampleAppJson.APP_JSON), + "-appTypes", DUMMY_APP_TYPE}; + int result = cli.run(ApplicationCLI.preProcessArgs(args)); + Assert.assertEquals(result, 0); + } + + @Test + public void testInitiateAutoFinalizeServiceUpgrade() throws Exception { + String[] args = {"app", "-upgrade", "app-1", + "-initiate", ExampleAppJson.resourceName(ExampleAppJson.APP_JSON), + "-autoFinalize", + "-appTypes", DUMMY_APP_TYPE}; + int result = cli.run(ApplicationCLI.preProcessArgs(args)); + Assert.assertEquals(result, 0); + } + + @Test + public void testUpgradeInstances() throws Exception { + conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE, + DummyServiceClient.class.getName()); + cli.setConf(conf); + String[] args = {"app", "-upgrade", "app-1", + "-instances", "comp1-0,comp1-1", + "-appTypes", DUMMY_APP_TYPE}; + int result = cli.run(ApplicationCLI.preProcessArgs(args)); + Assert.assertEquals(result, 0); + } + + private void checkApp(String serviceName, String compName, long count, Long lifetime, String queue) throws IOException { Service service = ServiceApiUtil.loadService(fs, serviceName); @@ -130,4 +170,24 @@ private void checkApp(String serviceName, String compName, long count, Long } Assert.fail(); } + + private static final String DUMMY_APP_TYPE = "dummy"; + + /** + * Dummy service client for test purpose. + */ + public static class DummyServiceClient extends ServiceClient { + + @Override + public int initiateUpgrade(String appName, String fileName, + boolean autoFinalize) throws IOException, YarnException { + return 0; + } + + @Override + public int actionUpgradeInstances(String appName, + List componentInstances) throws IOException, YarnException { + return 0; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java index cc5b6ec7fe..3e3280be66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java @@ -24,17 +24,29 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.ClientAMProtocol; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.mockito.Matchers; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -47,79 +59,152 @@ */ public class TestServiceClient { + private static final Logger LOG = LoggerFactory.getLogger( + TestServiceClient.class); + @Rule public ServiceTestUtils.ServiceFSWatcher rule = new ServiceTestUtils.ServiceFSWatcher(); @Test - public void testActionUpgrade() throws Exception { - ApplicationId applicationId = ApplicationId.newInstance( - System.currentTimeMillis(), 1); - ServiceClient client = createServiceClient(applicationId); - - Service service = ServiceTestUtils.createExampleApplication(); - service.setVersion("v1"); - client.actionCreate(service); + public void testActionServiceUpgrade() throws Exception { + Service service = createService(); + ServiceClient client = MockServiceClient.create(rule, service); //upgrade the service service.setVersion("v2"); - client.actionUpgrade(service); + client.initiateUpgrade(service); - //wait for service to be in upgrade state Service fromFs = ServiceApiUtil.loadServiceUpgrade(rule.getFs(), service.getName(), service.getVersion()); Assert.assertEquals(service.getName(), fromFs.getName()); Assert.assertEquals(service.getVersion(), fromFs.getVersion()); + client.stop(); } + @Test + public void testActionCompInstanceUpgrade() throws Exception { + Service service = createService(); + MockServiceClient client = MockServiceClient.create(rule, service); - private ServiceClient createServiceClient(ApplicationId applicationId) - throws Exception { - ClientAMProtocol amProxy = mock(ClientAMProtocol.class); - YarnClient yarnClient = createMockYarnClient(); - ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( - applicationId, 1); - ApplicationAttemptReport attemptReport = - ApplicationAttemptReport.newInstance(attemptId, "localhost", 0, - null, null, null, - YarnApplicationAttemptState.RUNNING, null); + //upgrade the service + service.setVersion("v2"); + client.initiateUpgrade(service); - ApplicationReport appReport = mock(ApplicationReport.class); - when(appReport.getHost()).thenReturn("localhost"); + //add containers to the component that needs to be upgraded. + Component comp = service.getComponents().iterator().next(); + ContainerId containerId = ContainerId.newContainerId(client.attemptId, 1L); + comp.addContainer(new Container().id(containerId.toString())); - when(yarnClient.getApplicationAttemptReport(Matchers.any())) - .thenReturn(attemptReport); - when(yarnClient.getApplicationReport(applicationId)).thenReturn(appReport); - - ServiceClient client = new ServiceClient() { - @Override - protected void serviceInit(Configuration configuration) throws Exception { - } - - @Override - protected ClientAMProtocol createAMProxy(String serviceName, - ApplicationReport appReport) throws IOException, YarnException { - return amProxy; - } - - @Override - ApplicationId submitApp(Service app) throws IOException, YarnException { - return applicationId; - } - }; - - client.setFileSystem(rule.getFs()); - client.setYarnClient(yarnClient); - - client.init(rule.getConf()); - client.start(); - return client; + client.actionUpgrade(service, comp.getContainers()); + CompInstancesUpgradeResponseProto response = client.getLastProxyResponse( + CompInstancesUpgradeResponseProto.class); + Assert.assertNotNull("upgrade did not complete", response); + client.stop(); } - private YarnClient createMockYarnClient() throws IOException, YarnException { + private Service createService() throws IOException, + YarnException { + Service service = ServiceTestUtils.createExampleApplication(); + service.setVersion("v1"); + service.setState(ServiceState.UPGRADING); + return service; + } + + private static final class MockServiceClient extends ServiceClient { + + private final ApplicationId appId; + private final ApplicationAttemptId attemptId; + private final ClientAMProtocol amProxy; + private Object proxyResponse; + private Service service; + + private MockServiceClient() { + amProxy = mock(ClientAMProtocol.class); + appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + LOG.debug("mocking service client for {}", appId); + attemptId = ApplicationAttemptId.newInstance(appId, 1); + } + + static MockServiceClient create(ServiceTestUtils.ServiceFSWatcher rule, + Service service) + throws IOException, YarnException { + MockServiceClient client = new MockServiceClient(); + + YarnClient yarnClient = createMockYarnClient(); + ApplicationReport appReport = mock(ApplicationReport.class); + when(appReport.getHost()).thenReturn("localhost"); + when(appReport.getYarnApplicationState()).thenReturn( + YarnApplicationState.RUNNING); + + ApplicationAttemptReport attemptReport = + ApplicationAttemptReport.newInstance(client.attemptId, "localhost", 0, + null, null, null, + YarnApplicationAttemptState.RUNNING, null); + when(yarnClient.getApplicationAttemptReport(Matchers.any())) + .thenReturn(attemptReport); + when(yarnClient.getApplicationReport(client.appId)).thenReturn(appReport); + when(client.amProxy.upgrade( + Matchers.any(UpgradeServiceRequestProto.class))).thenAnswer( + (Answer) invocation -> { + UpgradeServiceResponseProto response = + UpgradeServiceResponseProto.newBuilder().build(); + client.proxyResponse = response; + return response; + }); + when(client.amProxy.upgrade(Matchers.any( + CompInstancesUpgradeRequestProto.class))).thenAnswer( + (Answer) invocation -> { + CompInstancesUpgradeResponseProto response = + CompInstancesUpgradeResponseProto.newBuilder().build(); + client.proxyResponse = response; + return response; + }); + client.setFileSystem(rule.getFs()); + client.setYarnClient(yarnClient); + client.service = service; + + client.init(rule.getConf()); + client.start(); + client.actionCreate(service); + return client; + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + } + + @Override + protected ClientAMProtocol createAMProxy(String serviceName, + ApplicationReport appReport) throws IOException, YarnException { + return amProxy; + } + + @Override + ApplicationId submitApp(Service app) throws IOException, YarnException { + return appId; + } + + @Override + public Service getStatus(String serviceName) throws IOException, + YarnException { + service.setState(ServiceState.STABLE); + return service; + } + + private T getLastProxyResponse(Class clazz) { + if (clazz.isInstance(proxyResponse)) { + return clazz.cast(proxyResponse); + } + return null; + } + } + + private static YarnClient createMockYarnClient() throws IOException, + YarnException { YarnClient yarnClient = mock(YarnClient.class); - when(yarnClient.getApplications(Matchers.any(GetApplicationsRequest.class))) - .thenReturn(new ArrayList<>()); + when(yarnClient.getApplications(Matchers.any( + GetApplicationsRequest.class))).thenReturn(new ArrayList<>()); return yarnClient; } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java new file mode 100644 index 0000000000..600e438e01 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.hadoop.yarn.service.ServiceScheduler; +import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.TestServiceManager; +import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; +import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; +import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link Component}. + */ +public class TestComponent { + + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = + new ServiceTestUtils.ServiceFSWatcher(); + + @Test + public void testComponentUpgrade() throws Exception { + ServiceContext context = createTestContext(rule, "testComponentUpgrade"); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + + ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(), + ComponentEventType.UPGRADE); + comp.handle(upgradeEvent); + Assert.assertEquals("component not in need upgrade state", + ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); + } + + @Test + public void testCheckState() throws Exception { + String serviceName = "testCheckState"; + ServiceContext context = createTestContext(rule, serviceName); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + + comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE) + .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1", + "val1")).setUpgradeVersion("v2")); + + // one instance finished upgrading + comp.decContainersThatNeedUpgrade(); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + Assert.assertEquals("component not in need upgrade state", + ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); + + // second instance finished upgrading + comp.decContainersThatNeedUpgrade(); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in stable state", + ComponentState.STABLE, comp.getComponentSpec().getState()); + Assert.assertEquals("component did not upgrade successfully", "val1", + comp.getComponentSpec().getConfiguration().getEnv("key1")); + } + + @Test + public void testContainerCompletedWhenUpgrading() throws Exception { + String serviceName = "testContainerComplete"; + ServiceContext context = createTestContext(rule, serviceName); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + + comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE) + .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1", + "val1")).setUpgradeVersion("v2")); + comp.getAllComponentInstances().forEach(instance -> { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE)); + }); + Iterator instanceIter = comp. + getAllComponentInstances().iterator(); + + // reinitialization of a container failed + ContainerStatus status = mock(ContainerStatus.class); + when(status.getExitStatus()).thenReturn(ContainerExitStatus.ABORTED); + ComponentInstance instance = instanceIter.next(); + ComponentEvent stopEvent = new ComponentEvent(comp.getName(), + ComponentEventType.CONTAINER_COMPLETED) + .setInstance(instance).setContainerId(instance.getContainer().getId()) + .setStatus(status); + comp.handle(stopEvent); + instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), + STOP).setStatus(status)); + + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in flexing state", + ComponentState.FLEXING, comp.getComponentSpec().getState()); + + // new container get allocated + assignNewContainer(context.attemptId, 10, context, comp); + + // second instance finished upgrading + ComponentInstance instance2 = instanceIter.next(); + instance2.handle(new ComponentInstanceEvent( + instance2.getContainer().getId(), + ComponentInstanceEventType.BECOME_READY)); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in stable state", + ComponentState.STABLE, comp.getComponentSpec().getState()); + Assert.assertEquals("component did not upgrade successfully", "val1", + comp.getComponentSpec().getConfiguration().getEnv("key1")); + } + + private static org.apache.hadoop.yarn.service.api.records.Component + createSpecWithEnv(String serviceName, String compName, String key, + String val) { + Service service = TestServiceManager.createBaseDef(serviceName); + org.apache.hadoop.yarn.service.api.records.Component spec = + service.getComponent(compName); + spec.getConfiguration().getEnv().put(key, val); + return spec; + } + + public static ServiceContext createTestContext( + ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName) + throws Exception { + ServiceContext context = new ServiceContext(); + context.service = TestServiceManager.createBaseDef(serviceName); + context.fs = fsWatcher.getFs(); + + ContainerLaunchService mockLaunchService = mock( + ContainerLaunchService.class); + + context.scheduler = new ServiceScheduler(context) { + @Override + protected YarnRegistryViewForProviders createYarnRegistryOperations( + ServiceContext context, RegistryOperations registryClient) { + return mock(YarnRegistryViewForProviders.class); + } + + @Override + public NMClientAsync createNMClient() { + NMClientAsync nmClientAsync = super.createNMClient(); + NMClient nmClient = mock(NMClient.class); + try { + when(nmClient.getContainerStatus(anyObject(), anyObject())) + .thenAnswer((Answer) invocation -> + ContainerStatus.newInstance( + (ContainerId) invocation.getArguments()[0], + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + "", 0)); + } catch (YarnException | IOException e) { + throw new RuntimeException(e); + } + nmClientAsync.setClient(nmClient); + return nmClientAsync; + } + + @Override + public ContainerLaunchService getContainerLaunchService() { + return mockLaunchService; + } + }; + context.scheduler.init(fsWatcher.getConf()); + + doNothing().when(mockLaunchService). + reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject()); + stabilizeComponents(context); + return context; + } + + private static void stabilizeComponents(ServiceContext context) { + + ApplicationId appId = ApplicationId.fromString(context.service.getId()); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + context.attemptId = attemptId; + Map + componentState = context.scheduler.getAllComponents(); + for (org.apache.hadoop.yarn.service.api.records.Component componentSpec : + context.service.getComponents()) { + Component component = new org.apache.hadoop.yarn.service.component. + Component(componentSpec, 1L, context); + componentState.put(component.getName(), component); + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.FLEX)); + for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) { + assignNewContainer(attemptId, i + 1, context, component); + } + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.CHECK_STABLE)); + } + } + + private static void assignNewContainer( + ApplicationAttemptId attemptId, long containerNum, + ServiceContext context, Component component) { + Container container = org.apache.hadoop.yarn.api.records.Container + .newInstance(ContainerId.newContainerId(attemptId, containerNum), + NODE_ID, "localhost", null, null, + null); + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.CONTAINER_ALLOCATED) + .setContainer(container).setContainerId(container.getId())); + ComponentInstance instance = context.scheduler.getLiveInstances().get( + container.getId()); + ComponentInstanceEvent startEvent = new ComponentInstanceEvent( + container.getId(), ComponentInstanceEventType.START); + instance.handle(startEvent); + + ComponentInstanceEvent readyEvent = new ComponentInstanceEvent( + container.getId(), ComponentInstanceEventType.BECOME_READY); + instance.handle(readyEvent); + } + + private static final NodeId NODE_ID = NodeId.fromString("localhost:0"); + +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java new file mode 100644 index 0000000000..0b56d7ef19 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component.instance; + +import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.apache.hadoop.yarn.service.component.Component; +import org.apache.hadoop.yarn.service.component.ComponentEvent; +import org.apache.hadoop.yarn.service.component.ComponentEventType; +import org.apache.hadoop.yarn.service.component.TestComponent; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for {@link ComponentInstance}. + */ +public class TestComponentInstance { + + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = + new ServiceTestUtils.ServiceFSWatcher(); + + @Test + public void testContainerUpgrade() throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testContainerUpgrade"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + upgradeComponent(component); + + ComponentInstance instance = component.getAllComponentInstances() + .iterator().next(); + ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); + instance.handle(instanceEvent); + Container containerSpec = component.getComponentSpec().getContainer( + instance.getContainer().getId().toString()); + Assert.assertEquals("instance not upgrading", + ContainerState.UPGRADING, containerSpec.getState()); + } + + @Test + public void testContainerReadyAfterUpgrade() throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testContainerStarted"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + upgradeComponent(component); + + ComponentInstance instance = component.getAllComponentInstances() + .iterator().next(); + + ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); + instance.handle(instanceEvent); + + instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), + ComponentInstanceEventType.BECOME_READY)); + Assert.assertEquals("instance not ready", + ContainerState.READY, instance.getCompSpec().getContainer( + instance.getContainer().getId().toString()).getState()); + } + + private void upgradeComponent(Component component) { + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.UPGRADE) + .setTargetSpec(component.getComponentSpec()).setUpgradeVersion("v2")); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java index e25d38dd49..7cef91e25d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java @@ -81,6 +81,7 @@ public void tearDown() throws IOException { public void testComponentDependency() throws Exception{ ApplicationId applicationId = ApplicationId.newInstance(123456, 1); Service exampleApp = new Service(); + exampleApp.setVersion("v1"); exampleApp.setId(applicationId.toString()); exampleApp.setName("testComponentDependency"); exampleApp.addComponent(createComponent("compa", 1, "sleep 1000")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java index d5eb787c70..e3974742b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import java.io.IOException; +import java.util.List; import java.util.Map; /** @@ -231,18 +232,30 @@ public abstract String getStatusString(String appIdOrName) throws IOException, YarnException; /** - * Upgrade a long running service. - * - * @param appName the name of the application - * @param fileName specification of application upgrade to save. + * Initiate upgrade of a long running service. * + * @param appName the name of the application. + * @param fileName specification of application upgrade to save. + * @param autoFinalize when true, finalization of upgrade will be done + * automatically. * @return exit code - * @throws IOException IOException + * @throws IOException IOException * @throws YarnException exception in client or server */ @Public @Unstable - public abstract int actionUpgrade(String appName, String fileName) - throws IOException, YarnException; + public abstract int initiateUpgrade(String appName, String fileName, + boolean autoFinalize) throws IOException, YarnException; + + /** + * Upgrade component instances of a long running service. + * + * @param appName the name of the application. + * @param componentInstances the name of the component instances. + */ + @Public + @Unstable + public abstract int actionUpgradeInstances(String appName, + List componentInstances) throws IOException, YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index daca296ff7..17fc961fc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -99,6 +99,11 @@ public class ApplicationCLI extends YarnCLI { public static final String FLEX_CMD = "flex"; public static final String COMPONENT = "component"; public static final String ENABLE_FAST_LAUNCH = "enableFastLaunch"; + public static final String UPGRADE_CMD = "upgrade"; + public static final String UPGRADE_INITIATE = "initiate"; + public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize"; + public static final String UPGRADE_FINALIZE = "finalize"; + public static final String COMPONENT_INSTS = "instances"; private static String firstArg = null; @@ -236,6 +241,20 @@ public int run(String[] args) throws Exception { "to HDFS to make future launches faster. Supports -appTypes option " + "to specify which client implementation to use. Optionally a " + "destination folder for the tarball can be specified."); + opts.addOption(UPGRADE_CMD, true, "Upgrades an application/long-" + + "running service. It requires either -initiate, -instances, or " + + "-finalize options."); + opts.addOption(UPGRADE_INITIATE, true, "Works with -upgrade option to " + + "initiate the application upgrade. It requires the upgraded " + + "application specification file."); + opts.addOption(COMPONENT_INSTS, true, "Works with -upgrade option to " + + "trigger the upgrade of specified component instances of the " + + "application."); + opts.addOption(UPGRADE_FINALIZE, false, "Works with -upgrade option to " + + "finalize the upgrade."); + opts.addOption(UPGRADE_AUTO_FINALIZE, false, "Works with -upgrade and " + + "-initiate options to initiate the upgrade of the application with " + + "the ability to finalize the upgrade automatically."); opts.getOption(LAUNCH_CMD).setArgName("Application Name> Moves application to a new"); pw.println(" queue. ApplicationId can be"); pw.println(" passed using 'appId' option."); @@ -2152,6 +2157,8 @@ private String createApplicationCLIHelpMessage() throws IOException { pw.println(" Optionally a destination folder"); pw.println(" for the tarball can be"); pw.println(" specified."); + pw.println(" -finalize Works with -upgrade option to"); + pw.println(" finalize the upgrade."); pw.println(" -flex Changes number of running"); pw.println(" containers for a component of an"); pw.println(" application / long-running"); @@ -2165,6 +2172,15 @@ private String createApplicationCLIHelpMessage() throws IOException { pw.println(" which client implementation to"); pw.println(" use."); pw.println(" -help Displays help for all commands."); + pw.println(" -initiate Works with -upgrade option to"); + pw.println(" initiate the application"); + pw.println(" upgrade. It requires the"); + pw.println(" upgraded application"); + pw.println(" specification file."); + pw.println(" -instances Works with -upgrade option to"); + pw.println(" trigger the upgrade of specified"); + pw.println(" component instances of the"); + pw.println(" application."); pw.println(" -kill Kills the application. Set of"); pw.println(" applications can be provided"); pw.println(" separated with space"); @@ -2232,6 +2248,11 @@ private String createApplicationCLIHelpMessage() throws IOException { pw.println(" -updatePriority update priority of an"); pw.println(" application. ApplicationId can"); pw.println(" be passed using 'appId' option."); + pw.println(" -upgrade Upgrades an"); + pw.println(" application/long-running"); + pw.println(" service. It requires either"); + pw.println(" -initiate, -instances, or"); + pw.println(" -finalize options."); pw.close(); String appsHelpStr = baos.toString("UTF-8"); return appsHelpStr;