YARN-8081. Add support to upgrade a component.

Contributed by Chandni Singh
This commit is contained in:
Eric Yang 2018-05-15 20:40:39 -04:00
parent 63480976a0
commit 8d3b39de89
14 changed files with 368 additions and 45 deletions

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
@ -170,6 +171,22 @@ private String getInstancesPath(String appName) throws IOException {
return api.toString(); return api.toString();
} }
private String getComponentsPath(String appName) throws IOException {
Preconditions.checkNotNull(appName);
String url = getRMWebAddress();
StringBuilder api = new StringBuilder();
api.append(url);
api.append("/app/v1/services/").append(appName).append("/")
.append(RestApiConstants.COMPONENTS);
Configuration conf = getConfig();
if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase(
"simple")) {
api.append("?user.name=" + UrlEncoded
.encodeString(System.getProperty("user.name")));
}
return api.toString();
}
private Builder getApiClient() throws IOException { private Builder getApiClient() throws IOException {
return getApiClient(getServicePath(null)); return getApiClient(getServicePath(null));
} }
@ -536,7 +553,7 @@ public int actionUpgradeInstances(String appName, List<String> compInstances)
container.setState(ContainerState.UPGRADING); container.setState(ContainerState.UPGRADING);
toUpgrade[idx++] = container; toUpgrade[idx++] = container;
} }
String buffer = containerJsonSerde.toJson(toUpgrade); String buffer = CONTAINER_JSON_SERDE.toJson(toUpgrade);
ClientResponse response = getApiClient(getInstancesPath(appName)) ClientResponse response = getApiClient(getInstancesPath(appName))
.put(ClientResponse.class, buffer); .put(ClientResponse.class, buffer);
result = processResponse(response); result = processResponse(response);
@ -547,7 +564,35 @@ public int actionUpgradeInstances(String appName, List<String> compInstances)
return result; return result;
} }
private static JsonSerDeser<Container[]> containerJsonSerde = @Override
public int actionUpgradeComponents(String appName, List<String> components)
throws IOException, YarnException {
int result;
Component[] toUpgrade = new Component[components.size()];
try {
int idx = 0;
for (String compName : components) {
Component component = new Component();
component.setName(compName);
component.setState(ComponentState.UPGRADING);
toUpgrade[idx++] = component;
}
String buffer = COMP_JSON_SERDE.toJson(toUpgrade);
ClientResponse response = getApiClient(getComponentsPath(appName))
.put(ClientResponse.class, buffer);
result = processResponse(response);
} catch (Exception e) {
LOG.error("Failed to upgrade components: ", e);
result = EXIT_EXCEPTION_THROWN;
}
return result;
}
private static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE =
new JsonSerDeser<>(Container[].class, new JsonSerDeser<>(Container[].class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
private static final JsonSerDeser<Component[]> COMP_JSON_SERDE =
new JsonSerDeser<>(Component[].class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
} }

View File

@ -17,7 +17,9 @@
package org.apache.hadoop.yarn.service.webapp; package org.apache.hadoop.yarn.service.webapp;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@ -29,6 +31,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
@ -61,8 +64,10 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED; import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
@ -306,6 +311,42 @@ public Integer run() throws Exception {
return formatResponse(Status.OK, serviceStatus); return formatResponse(Status.OK, serviceStatus);
} }
@PUT
@Path(COMPONENTS_PATH)
@Consumes({MediaType.APPLICATION_JSON})
@Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN})
public Response updateComponents(@Context HttpServletRequest request,
@PathParam(SERVICE_NAME) String serviceName,
List<Component> requestComponents) {
try {
if (requestComponents == null || requestComponents.isEmpty()) {
throw new YarnException("No components provided.");
}
UserGroupInformation ugi = getProxyUser(request);
Set<String> compNamesToUpgrade = new HashSet<>();
requestComponents.forEach(reqComp -> {
if (reqComp.getState() != null &&
reqComp.getState().equals(ComponentState.UPGRADING)) {
compNamesToUpgrade.add(reqComp.getName());
}
});
LOG.info("PUT: upgrade components {} for service {} " +
"user = {}", compNamesToUpgrade, serviceName, ugi);
return processComponentsUpgrade(ugi, serviceName, compNamesToUpgrade);
} catch (AccessControlException e) {
return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
} catch (YarnException e) {
return formatResponse(Response.Status.BAD_REQUEST, e.getMessage());
} catch (IOException | InterruptedException e) {
return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
} catch (UndeclaredThrowableException e) {
return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
e.getCause().getMessage());
}
}
@PUT @PUT
@Path(COMPONENT_PATH) @Path(COMPONENT_PATH)
@Consumes({ MediaType.APPLICATION_JSON }) @Consumes({ MediaType.APPLICATION_JSON })
@ -326,6 +367,15 @@ public Response updateComponent(@Context HttpServletRequest request,
+ componentName + ")"; + componentName + ")";
throw new YarnException(msg); throw new YarnException(msg);
} }
UserGroupInformation ugi = getProxyUser(request);
if (component.getState() != null &&
component.getState().equals(ComponentState.UPGRADING)) {
LOG.info("PUT: upgrade component {} for service {} " +
"user = {}", component.getName(), appName, ugi);
return processComponentsUpgrade(ugi, appName,
Sets.newHashSet(componentName));
}
if (component.getNumberOfContainers() == null) { if (component.getNumberOfContainers() == null) {
throw new YarnException("No container count provided"); throw new YarnException("No container count provided");
} }
@ -334,7 +384,6 @@ public Response updateComponent(@Context HttpServletRequest request,
+ component.getNumberOfContainers(); + component.getNumberOfContainers();
throw new YarnException(message); throw new YarnException(message);
} }
UserGroupInformation ugi = getProxyUser(request);
Map<String, Long> original = ugi Map<String, Long> original = ugi
.doAs(new PrivilegedExceptionAction<Map<String, Long>>() { .doAs(new PrivilegedExceptionAction<Map<String, Long>>() {
@Override @Override
@ -472,7 +521,7 @@ public Response updateComponentInstance(@Context HttpServletRequest request,
if (reqContainer.getState() != null if (reqContainer.getState() != null
&& reqContainer.getState().equals(ContainerState.UPGRADING)) { && reqContainer.getState().equals(ContainerState.UPGRADING)) {
return processContainerUpgrade(ugi, service, return processContainersUpgrade(ugi, service,
Lists.newArrayList(liveContainer)); Lists.newArrayList(liveContainer));
} }
} catch (AccessControlException e) { } catch (AccessControlException e) {
@ -517,7 +566,7 @@ public Response updateComponentInstances(@Context HttpServletRequest request,
List<Container> liveContainers = ServiceApiUtil List<Container> liveContainers = ServiceApiUtil
.getLiveContainers(service, toUpgrade); .getLiveContainers(service, toUpgrade);
return processContainerUpgrade(ugi, service, liveContainers); return processContainersUpgrade(ugi, service, liveContainers);
} }
} catch (AccessControlException e) { } catch (AccessControlException e) {
return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
@ -629,7 +678,29 @@ private Response upgradeService(Service service,
return formatResponse(Status.ACCEPTED, status); return formatResponse(Status.ACCEPTED, status);
} }
private Response processContainerUpgrade(UserGroupInformation ugi, private Response processComponentsUpgrade(UserGroupInformation ugi,
String serviceName, Set<String> compNames) throws YarnException,
IOException, InterruptedException {
Service service = getServiceFromClient(ugi, serviceName);
if (service.getState() != ServiceState.UPGRADING) {
throw new YarnException(
String.format("The upgrade of service %s has not been initiated.",
service.getName()));
}
List<Container> containersToUpgrade = ServiceApiUtil
.validateAndResolveCompsUpgrade(service, compNames);
Integer result = invokeContainersUpgrade(ugi, service, containersToUpgrade);
if (result == EXIT_SUCCESS) {
ServiceStatus status = new ServiceStatus();
status.setDiagnostics(
"Upgrading components " + Joiner.on(',').join(compNames) + ".");
return formatResponse(Response.Status.ACCEPTED, status);
}
// If result is not a success, consider it a no-op
return Response.status(Response.Status.NO_CONTENT).build();
}
private Response processContainersUpgrade(UserGroupInformation ugi,
Service service, List<Container> containers) throws YarnException, Service service, List<Container> containers) throws YarnException,
IOException, InterruptedException { IOException, InterruptedException {
@ -638,25 +709,8 @@ private Response processContainerUpgrade(UserGroupInformation ugi,
String.format("The upgrade of service %s has not been initiated.", String.format("The upgrade of service %s has not been initiated.",
service.getName())); service.getName()));
} }
for (Container liveContainer : containers) { ServiceApiUtil.validateInstancesUpgrade(containers);
if (liveContainer.getState() != ContainerState.NEEDS_UPGRADE) { Integer result = invokeContainersUpgrade(ugi, service, containers);
// Nothing to upgrade
throw new YarnException(String.format(
"The component instance (%s) does not need an upgrade.",
liveContainer.getComponentInstanceName()));
}
}
Integer result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
int result1;
ServiceClient sc = getServiceClient();
sc.init(YARN_CONFIG);
sc.start();
result1 = sc.actionUpgrade(service, containers);
sc.close();
return result1;
});
if (result == EXIT_SUCCESS) { if (result == EXIT_SUCCESS) {
ServiceStatus status = new ServiceStatus(); ServiceStatus status = new ServiceStatus();
status.setDiagnostics( status.setDiagnostics(
@ -668,6 +722,20 @@ private Response processContainerUpgrade(UserGroupInformation ugi,
return Response.status(Response.Status.NO_CONTENT).build(); return Response.status(Response.Status.NO_CONTENT).build();
} }
private int invokeContainersUpgrade(UserGroupInformation ugi,
Service service, List<Container> containers) throws IOException,
InterruptedException {
return ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
int result1;
ServiceClient sc = getServiceClient();
sc.init(YARN_CONFIG);
sc.start();
result1 = sc.actionUpgrade(service, containers);
sc.close();
return result1;
});
}
private Service getServiceFromClient(UserGroupInformation ugi, private Service getServiceFromClient(UserGroupInformation ugi,
String serviceName) throws IOException, InterruptedException { String serviceName) throws IOException, InterruptedException {

View File

@ -34,7 +34,10 @@
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/** /**
* A mock version of ServiceClient - This class is design * A mock version of ServiceClient - This class is design
@ -46,6 +49,7 @@ public class ServiceClientTest extends ServiceClient {
private Configuration conf = new Configuration(); private Configuration conf = new Configuration();
private Service goodServiceStatus = buildLiveGoodService(); private Service goodServiceStatus = buildLiveGoodService();
private boolean initialized; private boolean initialized;
private Set<String> expectedInstances = new HashSet<>();
public ServiceClientTest() { public ServiceClientTest() {
super(); super();
@ -61,11 +65,12 @@ public void init(Configuration conf) {
@Override @Override
public void stop() { public void stop() {
// This is needed for testing API Server which use client to get status // This is needed for testing API Server which uses client to get status
// and then perform an action. // and then perform an action.
} }
public void forceStop() { public void forceStop() {
expectedInstances.clear();
super.stop(); super.stop();
} }
@ -144,17 +149,27 @@ public int initiateUpgrade(Service service) throws YarnException,
@Override @Override
public int actionUpgrade(Service service, List<Container> compInstances) public int actionUpgrade(Service service, List<Container> compInstances)
throws IOException, YarnException { throws IOException, YarnException {
if (service.getName() != null && service.getName().equals("jenkins")) { if (service.getName() != null && service.getName().equals("jenkins")
return EXIT_SUCCESS; && compInstances != null) {
} else { Set<String> actualInstances = compInstances.stream().map(
throw new IllegalArgumentException(); Container::getComponentInstanceName).collect(Collectors.toSet());
if (actualInstances.equals(expectedInstances)) {
return EXIT_SUCCESS;
}
} }
throw new IllegalArgumentException();
} }
Service getGoodServiceStatus() { Service getGoodServiceStatus() {
return goodServiceStatus; return goodServiceStatus;
} }
void setExpectedInstances(Set<String> instances) {
if (instances != null) {
expectedInstances.addAll(instances);
}
}
static Service buildGoodService() { static Service buildGoodService() {
Service service = new Service(); Service service = new Service();
service.setName("jenkins"); service.setName("jenkins");
@ -166,13 +181,15 @@ static Service buildGoodService() {
resource.setCpus(1); resource.setCpus(1);
resource.setMemory("2048"); resource.setMemory("2048");
List<Component> components = new ArrayList<>(); List<Component> components = new ArrayList<>();
Component c = new Component(); for (int i = 0; i < 2; i++) {
c.setName("jenkins"); Component c = new Component();
c.setNumberOfContainers(2L); c.setName("jenkins" + i);
c.setArtifact(artifact); c.setNumberOfContainers(2L);
c.setLaunchCommand(""); c.setArtifact(artifact);
c.setResource(resource); c.setLaunchCommand("");
components.add(c); c.setResource(resource);
components.add(c);
}
service.setComponents(components); service.setComponents(components);
return service; return service;
} }

View File

@ -23,18 +23,22 @@
import java.io.File; import java.io.File;
import java.io.FileWriter; import java.io.FileWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Artifact.TypeEnum; import org.apache.hadoop.yarn.service.api.records.Artifact.TypeEnum;
import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Resource;
@ -523,8 +527,11 @@ public void testUpgradeSingleInstance() {
// and container state needs to be in NEEDS_UPGRADE. // and container state needs to be in NEEDS_UPGRADE.
Service serviceStatus = mockServerClient.getGoodServiceStatus(); Service serviceStatus = mockServerClient.getGoodServiceStatus();
serviceStatus.setState(ServiceState.UPGRADING); serviceStatus.setState(ServiceState.UPGRADING);
serviceStatus.getComponents().iterator().next().getContainers().iterator() Container liveContainer = serviceStatus.getComponents().iterator().next()
.next().setState(ContainerState.NEEDS_UPGRADE); .getContainers().iterator().next();
liveContainer.setState(ContainerState.NEEDS_UPGRADE);
mockServerClient.setExpectedInstances(Sets.newHashSet(
liveContainer.getComponentInstanceName()));
final Response actual = apiServer.updateComponentInstance(request, final Response actual = apiServer.updateComponentInstance(request,
goodService.getName(), comp.getName(), goodService.getName(), comp.getName(),
@ -545,9 +552,14 @@ public void testUpgradeMultipleInstances() {
// and container state needs to be in NEEDS_UPGRADE. // and container state needs to be in NEEDS_UPGRADE.
Service serviceStatus = mockServerClient.getGoodServiceStatus(); Service serviceStatus = mockServerClient.getGoodServiceStatus();
serviceStatus.setState(ServiceState.UPGRADING); serviceStatus.setState(ServiceState.UPGRADING);
Set<String> expectedInstances = new HashSet<>();
serviceStatus.getComponents().iterator().next().getContainers().forEach( serviceStatus.getComponents().iterator().next().getContainers().forEach(
container -> container.setState(ContainerState.NEEDS_UPGRADE) container -> {
container.setState(ContainerState.NEEDS_UPGRADE);
expectedInstances.add(container.getComponentInstanceName());
}
); );
mockServerClient.setExpectedInstances(expectedInstances);
final Response actual = apiServer.updateComponentInstances(request, final Response actual = apiServer.updateComponentInstances(request,
goodService.getName(), comp.getContainers()); goodService.getName(), comp.getContainers());
@ -555,4 +567,57 @@ public void testUpgradeMultipleInstances() {
Response.status(Status.ACCEPTED).build().getStatus(), Response.status(Status.ACCEPTED).build().getStatus(),
actual.getStatus()); actual.getStatus());
} }
@Test
public void testUpgradeComponent() {
Service goodService = ServiceClientTest.buildLiveGoodService();
Component comp = goodService.getComponents().iterator().next();
comp.setState(ComponentState.UPGRADING);
// To be able to upgrade, the service needs to be in UPGRADING
// and component state needs to be in NEEDS_UPGRADE.
Service serviceStatus = mockServerClient.getGoodServiceStatus();
serviceStatus.setState(ServiceState.UPGRADING);
Component liveComp = serviceStatus.getComponent(comp.getName());
liveComp.setState(ComponentState.NEEDS_UPGRADE);
Set<String> expectedInstances = new HashSet<>();
liveComp.getContainers().forEach(container -> {
expectedInstances.add(container.getComponentInstanceName());
container.setState(ContainerState.NEEDS_UPGRADE);
});
mockServerClient.setExpectedInstances(expectedInstances);
final Response actual = apiServer.updateComponent(request,
goodService.getName(), comp.getName(), comp);
assertEquals("Component upgrade is ",
Response.status(Status.ACCEPTED).build().getStatus(),
actual.getStatus());
}
@Test
public void testUpgradeMultipleComps() {
Service goodService = ServiceClientTest.buildLiveGoodService();
goodService.getComponents().forEach(comp ->
comp.setState(ComponentState.UPGRADING));
// To be able to upgrade, the live service needs to be in UPGRADING
// and component states needs to be in NEEDS_UPGRADE.
Service serviceStatus = mockServerClient.getGoodServiceStatus();
serviceStatus.setState(ServiceState.UPGRADING);
Set<String> expectedInstances = new HashSet<>();
serviceStatus.getComponents().forEach(liveComp -> {
liveComp.setState(ComponentState.NEEDS_UPGRADE);
liveComp.getContainers().forEach(liveContainer -> {
expectedInstances.add(liveContainer.getComponentInstanceName());
liveContainer.setState(ContainerState.NEEDS_UPGRADE);
});
});
mockServerClient.setExpectedInstances(expectedInstances);
final Response actual = apiServer.updateComponents(request,
goodService.getName(), goodService.getComponents());
assertEquals("Component upgrade is ",
Response.status(Status.ACCEPTED).build().getStatus(),
actual.getStatus());
}
} }

View File

@ -298,5 +298,17 @@ public void testInstancesUpgrade() {
} }
} }
@Test
public void testComponentsUpgrade() {
String appName = "example-app";
try {
int result = asc.actionUpgradeComponents(appName, Lists.newArrayList(
"comp"));
assertEquals(EXIT_SUCCESS, result);
} catch (IOException | YarnException e) {
fail();
}
}
} }

View File

@ -26,5 +26,5 @@
@InterfaceStability.Unstable @InterfaceStability.Unstable
@ApiModel(description = "The current state of a component.") @ApiModel(description = "The current state of a component.")
public enum ComponentState { public enum ComponentState {
FLEXING, STABLE, NEEDS_UPGRADE; FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING;
} }

View File

@ -294,6 +294,17 @@ public int actionUpgradeInstances(String appName,
Service persistedService = ServiceApiUtil.loadService(fs, appName); Service persistedService = ServiceApiUtil.loadService(fs, appName);
List<Container> containersToUpgrade = ServiceApiUtil. List<Container> containersToUpgrade = ServiceApiUtil.
getLiveContainers(persistedService, componentInstances); getLiveContainers(persistedService, componentInstances);
ServiceApiUtil.validateInstancesUpgrade(containersToUpgrade);
return actionUpgrade(persistedService, containersToUpgrade);
}
@Override
public int actionUpgradeComponents(String appName,
List<String> components) throws IOException, YarnException {
checkAppExistOnHdfs(appName);
Service persistedService = ServiceApiUtil.loadService(fs, appName);
List<Container> containersToUpgrade = ServiceApiUtil
.validateAndResolveCompsUpgrade(persistedService, components);
return actionUpgrade(persistedService, containersToUpgrade); return actionUpgrade(persistedService, containersToUpgrade);
} }

View File

@ -34,6 +34,8 @@ public interface RestApiConstants {
"/component-instances/{component_instance_name}"; "/component-instances/{component_instance_name}";
String COMP_INSTANCES = "component-instances"; String COMP_INSTANCES = "component-instances";
String COMP_INSTANCES_PATH = SERVICE_PATH + "/" + COMP_INSTANCES; String COMP_INSTANCES_PATH = SERVICE_PATH + "/" + COMP_INSTANCES;
String COMPONENTS = "components";
String COMPONENTS_PATH = SERVICE_PATH + "/" + COMPONENTS;
// Query param // Query param
String SERVICE_NAME = "service_name"; String SERVICE_NAME = "service_name";

View File

@ -105,4 +105,10 @@ public interface RestApiErrorMessages {
+ "expression name defined for this component only."; + "expression name defined for this component only.";
String ERROR_KEYTAB_URI_SCHEME_INVALID = "Unsupported keytab URI scheme: %s"; String ERROR_KEYTAB_URI_SCHEME_INVALID = "Unsupported keytab URI scheme: %s";
String ERROR_KEYTAB_URI_INVALID = "Invalid keytab URI: %s"; String ERROR_KEYTAB_URI_INVALID = "Invalid keytab URI: %s";
String ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE = "The component instance " +
"(%s) does not need an upgrade.";
String ERROR_COMP_DOES_NOT_NEED_UPGRADE = "The component (%s) does not need" +
" an upgrade.";
} }

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.yarn.service.utils; package org.apache.hadoop.yarn.service.utils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -29,14 +31,16 @@
import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Configuration; import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal; import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint; import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.conf.RestApiConstants; import org.apache.hadoop.yarn.service.conf.RestApiConstants;
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages; import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
@ -58,6 +62,9 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.ERROR_COMP_DOES_NOT_NEED_UPGRADE;
import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE;
public class ServiceApiUtil { public class ServiceApiUtil {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(ServiceApiUtil.class); LoggerFactory.getLogger(ServiceApiUtil.class);
@ -545,6 +552,48 @@ public static List<Container> getLiveContainers(Service service,
return result; return result;
} }
/**
* Validates that the component instances that are requested to upgrade
* require an upgrade.
*/
public static void validateInstancesUpgrade(List<Container>
liveContainers) throws YarnException {
for (Container liveContainer : liveContainers) {
if (!liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) {
// Nothing to upgrade
throw new YarnException(String.format(
ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE,
liveContainer.getComponentInstanceName()));
}
}
}
/**
* Validates the components that are requested to upgrade require an upgrade.
* It returns the instances of the components which need upgrade.
*/
public static List<Container> validateAndResolveCompsUpgrade(
Service liveService, Collection<String> compNames) throws YarnException {
Preconditions.checkNotNull(compNames);
HashSet<String> requestedComps = Sets.newHashSet(compNames);
List<Container> containerNeedUpgrade = new ArrayList<>();
for (Component liveComp : liveService.getComponents()) {
if (requestedComps.contains(liveComp.getName())) {
if (!liveComp.getState().equals(ComponentState.NEEDS_UPGRADE)) {
// Nothing to upgrade
throw new YarnException(String.format(
ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName()));
}
liveComp.getContainers().forEach(liveContainer -> {
if (liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) {
containerNeedUpgrade.add(liveContainer);
}
});
}
}
return containerNeedUpgrade;
}
private static String parseComponentName(String componentInstanceName) private static String parseComponentName(String componentInstanceName)
throws YarnException { throws YarnException {
int idx = componentInstanceName.lastIndexOf('-'); int idx = componentInstanceName.lastIndexOf('-');

View File

@ -193,6 +193,18 @@ public void testUpgradeInstances() throws Exception {
Assert.assertEquals(result, 0); Assert.assertEquals(result, 0);
} }
@Test (timeout = 180000)
public void testUpgradeComponents() throws Exception {
conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
DummyServiceClient.class.getName());
cli.setConf(conf);
String[] args = {"app", "-upgrade", "app-1",
"-components", "comp1,comp2",
"-appTypes", DUMMY_APP_TYPE};
int result = cli.run(ApplicationCLI.preProcessArgs(args));
Assert.assertEquals(result, 0);
}
@Test (timeout = 180000) @Test (timeout = 180000)
public void testEnableFastLaunch() throws Exception { public void testEnableFastLaunch() throws Exception {
fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar")) fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar"))
@ -291,5 +303,11 @@ public int actionUpgradeInstances(String appName,
List<String> componentInstances) throws IOException, YarnException { List<String> componentInstances) throws IOException, YarnException {
return 0; return 0;
} }
@Override
public int actionUpgradeComponents(String appName, List<String> components)
throws IOException, YarnException {
return 0;
}
} }
} }

View File

@ -258,4 +258,16 @@ public abstract int initiateUpgrade(String appName, String fileName,
public abstract int actionUpgradeInstances(String appName, public abstract int actionUpgradeInstances(String appName,
List<String> componentInstances) throws IOException, YarnException; List<String> componentInstances) throws IOException, YarnException;
/**
* Upgrade components of a long running service.
*
* @param appName the name of the application.
* @param components the name of the components.
*/
@Public
@Unstable
public abstract int actionUpgradeComponents(String appName,
List<String> components) throws IOException, YarnException;
} }

View File

@ -104,6 +104,7 @@ public class ApplicationCLI extends YarnCLI {
public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize"; public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize";
public static final String UPGRADE_FINALIZE = "finalize"; public static final String UPGRADE_FINALIZE = "finalize";
public static final String COMPONENT_INSTS = "instances"; public static final String COMPONENT_INSTS = "instances";
public static final String COMPONENTS = "components";
private static String firstArg = null; private static String firstArg = null;
@ -250,6 +251,8 @@ public int run(String[] args) throws Exception {
opts.addOption(COMPONENT_INSTS, true, "Works with -upgrade option to " + opts.addOption(COMPONENT_INSTS, true, "Works with -upgrade option to " +
"trigger the upgrade of specified component instances of the " + "trigger the upgrade of specified component instances of the " +
"application."); "application.");
opts.addOption(COMPONENTS, true, "Works with -upgrade option to " +
"trigger the upgrade of specified components of the application.");
opts.addOption(UPGRADE_FINALIZE, false, "Works with -upgrade option to " + opts.addOption(UPGRADE_FINALIZE, false, "Works with -upgrade option to " +
"finalize the upgrade."); "finalize the upgrade.");
opts.addOption(UPGRADE_AUTO_FINALIZE, false, "Works with -upgrade and " + opts.addOption(UPGRADE_AUTO_FINALIZE, false, "Works with -upgrade and " +
@ -274,6 +277,9 @@ public int run(String[] args) throws Exception {
opts.getOption(COMPONENT_INSTS).setArgName("Component Instances"); opts.getOption(COMPONENT_INSTS).setArgName("Component Instances");
opts.getOption(COMPONENT_INSTS).setValueSeparator(','); opts.getOption(COMPONENT_INSTS).setValueSeparator(',');
opts.getOption(COMPONENT_INSTS).setArgs(Option.UNLIMITED_VALUES); opts.getOption(COMPONENT_INSTS).setArgs(Option.UNLIMITED_VALUES);
opts.getOption(COMPONENTS).setArgName("Components");
opts.getOption(COMPONENTS).setValueSeparator(',');
opts.getOption(COMPONENTS).setArgs(Option.UNLIMITED_VALUES);
} else if (title != null && title.equalsIgnoreCase(APPLICATION_ATTEMPT)) { } else if (title != null && title.equalsIgnoreCase(APPLICATION_ATTEMPT)) {
opts.addOption(STATUS_CMD, true, opts.addOption(STATUS_CMD, true,
"Prints the status of the application attempt."); "Prints the status of the application attempt.");
@ -574,7 +580,7 @@ public int run(String[] args) throws Exception {
cliParser.getOptionValue(CHANGE_APPLICATION_QUEUE)); cliParser.getOptionValue(CHANGE_APPLICATION_QUEUE));
} else if (cliParser.hasOption(UPGRADE_CMD)) { } else if (cliParser.hasOption(UPGRADE_CMD)) {
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_INITIATE, if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_INITIATE,
UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, COMPONENT_INSTS, UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, COMPONENT_INSTS, COMPONENTS,
APP_TYPE_CMD)) { APP_TYPE_CMD)) {
printUsage(title, opts); printUsage(title, opts);
return exitCode; return exitCode;
@ -603,6 +609,15 @@ public int run(String[] args) throws Exception {
} }
String[] instances = cliParser.getOptionValues(COMPONENT_INSTS); String[] instances = cliParser.getOptionValues(COMPONENT_INSTS);
return client.actionUpgradeInstances(appName, Arrays.asList(instances)); return client.actionUpgradeInstances(appName, Arrays.asList(instances));
} else if (cliParser.hasOption(COMPONENTS)) {
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
COMPONENTS, APP_TYPE_CMD)) {
printUsage(title, opts);
return exitCode;
}
String[] components = cliParser.getOptionValues(COMPONENTS);
return client.actionUpgradeComponents(appName,
Arrays.asList(components));
} else if (cliParser.hasOption(UPGRADE_FINALIZE)) { } else if (cliParser.hasOption(UPGRADE_FINALIZE)) {
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
UPGRADE_FINALIZE, APP_TYPE_CMD)) { UPGRADE_FINALIZE, APP_TYPE_CMD)) {

View File

@ -2143,6 +2143,9 @@ private String createApplicationCLIHelpMessage() throws IOException {
pw.println(" long-running service. Supports"); pw.println(" long-running service. Supports");
pw.println(" absolute or relative changes,"); pw.println(" absolute or relative changes,");
pw.println(" such as +1, 2, or -3."); pw.println(" such as +1, 2, or -3.");
pw.println(" -components <Components> Works with -upgrade option to");
pw.println(" trigger the upgrade of specified");
pw.println(" components of the application.");
pw.println(" -destroy <Application Name> Destroys a saved application"); pw.println(" -destroy <Application Name> Destroys a saved application");
pw.println(" specification and removes all"); pw.println(" specification and removes all");
pw.println(" application data permanently."); pw.println(" application data permanently.");