YARN-8734. Readiness check for remote service belonging to the same user. Contributed by Eric Yang
This commit is contained in:
parent
e2113500df
commit
d1c1dde309
@ -254,6 +254,11 @@ definitions:
|
||||
docker_client_config:
|
||||
type: string
|
||||
description: URI of the file containing the docker client configuration (e.g. hdfs:///tmp/config.json).
|
||||
dependencies:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
description: An array of services which should be in STABLE state, before this service can be started.
|
||||
ResourceInformation:
|
||||
description:
|
||||
ResourceInformation determines unit/value of resource types in addition to memory and vcores. It will be part of Resource object.
|
||||
|
@ -170,7 +170,8 @@ public State transition(ServiceManager serviceManager,
|
||||
} else {
|
||||
serviceManager.setServiceState(ServiceState.UPGRADING);
|
||||
}
|
||||
|
||||
ServiceApiUtil.checkServiceDependencySatisified(serviceManager
|
||||
.getServiceSpec());
|
||||
return State.UPGRADING;
|
||||
} catch (Throwable e) {
|
||||
LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(),
|
||||
|
@ -389,6 +389,8 @@ public void serviceStart() throws Exception {
|
||||
// Since AM has been started and registered, the service is in STARTED state
|
||||
app.setState(ServiceState.STARTED);
|
||||
|
||||
ServiceApiUtil.checkServiceDependencySatisified(context.service);
|
||||
|
||||
// recover components based on containers sent from RM
|
||||
recoverComponents(response);
|
||||
|
||||
|
@ -73,6 +73,7 @@ public class Service extends BaseResource {
|
||||
private String version = null;
|
||||
private String description = null;
|
||||
private String dockerClientConfig = null;
|
||||
private List<String> dependencies = new ArrayList<String>();
|
||||
|
||||
/**
|
||||
* A unique service name.
|
||||
@ -352,6 +353,18 @@ public void setQueue(String queue) {
|
||||
this.queue = queue;
|
||||
}
|
||||
|
||||
@ApiModelProperty(example = "null", value = "A list of dependent services.")
|
||||
@XmlElement(name = "dependencies")
|
||||
@JsonProperty("dependencies")
|
||||
public List<String> getDependencies() {
|
||||
return dependencies;
|
||||
}
|
||||
|
||||
public void setDependencies(List<String>
|
||||
dependencies) {
|
||||
this.dependencies = dependencies;
|
||||
}
|
||||
|
||||
public Service kerberosPrincipal(KerberosPrincipal kerberosPrincipal) {
|
||||
this.kerberosPrincipal = kerberosPrincipal;
|
||||
return this;
|
||||
@ -437,6 +450,8 @@ public String toString() {
|
||||
.append(toIndentedString(kerberosPrincipal)).append("\n");
|
||||
sb.append(" dockerClientConfig: ")
|
||||
.append(toIndentedString(dockerClientConfig)).append("\n");
|
||||
sb.append(" dependencies: ")
|
||||
.append(toIndentedString(dependencies)).append("\n");
|
||||
sb.append("}");
|
||||
return sb.toString();
|
||||
}
|
||||
|
@ -35,6 +35,8 @@
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||
import org.apache.hadoop.yarn.service.client.ServiceClient;
|
||||
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.Configuration;
|
||||
@ -705,4 +707,45 @@ public static List<String> resolveCompsDependency(Service service) {
|
||||
}
|
||||
return components;
|
||||
}
|
||||
|
||||
private static boolean serviceDependencySatisfied(Service service) {
|
||||
boolean result = true;
|
||||
try {
|
||||
List<String> dependencies = service
|
||||
.getDependencies();
|
||||
org.apache.hadoop.conf.Configuration conf =
|
||||
new org.apache.hadoop.conf.Configuration();
|
||||
if (dependencies != null && dependencies.size() > 0) {
|
||||
ServiceClient sc = new ServiceClient();
|
||||
sc.init(conf);
|
||||
sc.start();
|
||||
for (String dependent : dependencies) {
|
||||
Service dependentService = sc.getStatus(dependent);
|
||||
if (dependentService.getState() == null ||
|
||||
!dependentService.getState().equals(ServiceState.STABLE)) {
|
||||
result = false;
|
||||
LOG.info("Service dependency is not satisfied for " +
|
||||
"service: {} state: {}", dependent,
|
||||
dependentService.getState());
|
||||
}
|
||||
}
|
||||
sc.close();
|
||||
}
|
||||
} catch (IOException | YarnException e) {
|
||||
LOG.warn("Caught exception: ", e);
|
||||
LOG.info("Service dependency is not satisified.");
|
||||
result = false;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public static void checkServiceDependencySatisified(Service service) {
|
||||
while (!serviceDependencySatisfied(service)) {
|
||||
try {
|
||||
LOG.info("Waiting for service dependencies.");
|
||||
Thread.sleep(15000L);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -438,6 +438,8 @@ public void testExpressUpgrade() throws Exception {
|
||||
// wait for upgrade to complete
|
||||
waitForServiceToBeStable(client, service);
|
||||
Service active = client.getStatus(service.getName());
|
||||
Assert.assertEquals("version mismatch", service.getVersion(),
|
||||
active.getVersion());
|
||||
Assert.assertEquals("component not stable", ComponentState.STABLE,
|
||||
active.getComponent(component.getName()).getState());
|
||||
Assert.assertEquals("compa does not have new env", "val1",
|
||||
|
@ -30,6 +30,7 @@
|
||||
import org.apache.hadoop.yarn.service.api.records.PlacementType;
|
||||
import org.apache.hadoop.yarn.service.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
@ -733,6 +734,44 @@ public void testResolveNoCompsDependency() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 1500)
|
||||
public void testNoServiceDependencies() {
|
||||
Service service = createExampleApplication();
|
||||
Component compa = createComponent("compa");
|
||||
Component compb = createComponent("compb");
|
||||
service.addComponent(compa);
|
||||
service.addComponent(compb);
|
||||
List<String> dependencies = new ArrayList<String>();
|
||||
service.setDependencies(dependencies);
|
||||
ServiceApiUtil.checkServiceDependencySatisified(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServiceDependencies() {
|
||||
Thread thread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
Service service = createExampleApplication();
|
||||
Component compa = createComponent("compa");
|
||||
Component compb = createComponent("compb");
|
||||
service.addComponent(compa);
|
||||
service.addComponent(compb);
|
||||
List<String> dependencies = new ArrayList<String>();
|
||||
dependencies.add("abc");
|
||||
service.setDependencies(dependencies);
|
||||
Service dependent = createExampleApplication();
|
||||
dependent.setState(ServiceState.STOPPED);
|
||||
ServiceApiUtil.checkServiceDependencySatisified(service);
|
||||
}
|
||||
};
|
||||
thread.start();
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
Assert.assertTrue(thread.isAlive());
|
||||
}
|
||||
|
||||
public static Service createExampleApplication() {
|
||||
|
||||
Service exampleApp = new Service();
|
||||
|
@ -406,7 +406,7 @@ a service resource has the following attributes.
|
||||
|queue|The YARN queue that this service should be submitted to.|false|string||
|
||||
|kerberos_principal|The principal info of the user who launches the service|false|KerberosPrincipal||
|
||||
|docker_client_config|URI of the file containing the docker client configuration (e.g. hdfs:///tmp/config.json)|false|string||
|
||||
|
||||
|dependencies|A list of service names that this service depends on.| false | string array ||
|
||||
|
||||
### ServiceState
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user