YARN-7142. Support placement policy in yarn native services. (Gour Saha via wangda)

Change-Id: I166c67a7a34430627c17365f60bac75b6da1b434
This commit is contained in:
Wangda Tan 2018-04-02 07:26:01 -07:00
parent dc8e343201
commit a0bde7d525
22 changed files with 1088 additions and 163 deletions

View File

@ -130,6 +130,11 @@ public abstract static class AbstractConstraint implements Visitable {
public PlacementConstraint build() {
return new PlacementConstraint(this);
}
@Override
public String toString() {
return super.toString();
}
}
static final String NODE_SCOPE = "node";

View File

@ -229,9 +229,6 @@ definitions:
type: integer
format: int64
description: Life time (in seconds) of the service from the time it reaches the STARTED state (after which it is automatically destroyed by YARN). For unlimited lifetime do not set a lifetime value.
placement_policy:
description: (TBD) Advanced scheduling and placement policies. If not specified, it defaults to the default placement policy of the service owner. The design of placement policies are in the works. It is not very clear at this point, how policies in conjunction with labels be exposed to service owners. This is a placeholder for now. The advanced structure of this attribute will be determined by YARN-4902.
$ref: '#/definitions/PlacementPolicy'
components:
description: Components of a service.
type: array
@ -256,7 +253,7 @@ definitions:
$ref: '#/definitions/KerberosPrincipal'
ResourceInformation:
description:
ResourceInformation determines unit/value of resource types in addition to memory and vcores. It will be part of Resource object
ResourceInformation determines unit/value of resource types in addition to memory and vcores. It will be part of Resource object.
properties:
value:
type: integer
@ -264,8 +261,7 @@ definitions:
description: Integer value of the resource.
unit:
type: string
description:
Unit of the resource, acceptable values are: p/n/u/m/k/M/G/T/P/Ki/Mi/Gi/Ti/Pi. By default it is empty means no unit
description: Unit of the resource, acceptable values are - p/n/u/m/k/M/G/T/P/Ki/Mi/Gi/Ti/Pi. By default it is empty means no unit.
Resource:
description:
Resource determines the amount of resources (vcores, memory, network, etc.) usable by a container. This field determines the resource to be applied for all the containers of a component or service. The resource specified at the service (or global) level can be overriden at the component level. Only one of profile OR cpu & memory are expected. It raises a validation exception otherwise.
@ -286,11 +282,75 @@ definitions:
$ref: '#/definitions/ResourceInformation'
description: Map of resource name to ResourceInformation
PlacementPolicy:
description: Placement policy of an instance of a service. This feature is in the works in YARN-6592.
description: Advanced placement policy of the components of a service.
required:
- constraints
properties:
label:
constraints:
description: Placement constraint details.
type: array
items:
$ref: '#/definitions/PlacementConstraint'
PlacementConstraint:
description: Placement constraint details.
required:
- type
- scope
properties:
name:
description: An optional name associated to this constraint.
type: string
description: Assigns a service to a named partition of the cluster where the service desires to run (optional). If not specified all services are submitted to a default label of the service owner. One or more labels can be setup for each service owner account with required constraints like no-preemption, sla-99999, preemption-ok, etc.
example: C1
type:
description: The type of placement.
$ref: '#/definitions/PlacementType'
scope:
description: The scope of placement.
$ref: '#/definitions/PlacementScope'
target_tags:
description: The name of the components that this component's placement policy is depending upon are added as target tags. So for affinity say, this component's containers are requesting to be placed on hosts where containers of the target tag component(s) are running on. Target tags can also contain the name of this component, in which case it implies that for anti-affinity say, no more than one container of this component can be placed on a host. Similarly, for cardinality, it would mean that containers of this component is requesting to be placed on hosts where at least minCardinality but no more than maxCardinality containers of the target tag component(s) are running.
type: array
items:
type: string
node_attributes:
description: Node attributes are a set of key:value(s) pairs associated with nodes.
type: object
additionalProperties:
type: array
items:
type: string
node_partitions:
description: Node partitions where the containers of this component can run.
type: array
items:
type: string
min_cardinality:
type: integer
format: int64
description: When placement type is cardinality, the minimum number of containers of the depending component that a host should have, where containers of this component can be allocated on.
example: 2
max_cardinality:
type: integer
format: int64
description: When placement type is cardinality, the maximum number of containers of the depending component that a host should have, where containers of this component can be allocated on.
example: 3
PlacementType:
description: The type of placement - affinity/anti-affinity/affinity-with-cardinality with containers of another component or containers of the same component (self).
properties:
type:
type: string
enum:
- AFFINITY
- ANTI_AFFINITY
- AFFINITY_WITH_CARDINALITY
PlacementScope:
description: The scope of placement for the containers of a component.
properties:
type:
type: string
enum:
- NODE
- RACK
Artifact:
description: Artifact of a service component. If not specified, component will just run the bare launch command and no artifact will be localized.
required:
@ -342,11 +402,16 @@ definitions:
type: integer
format: int64
description: Number of containers for this component (optional). If not specified, the service level global number_of_containers takes effect.
containers:
type: array
description: Containers of a started component. Specifying a value for this attribute for the POST payload raises a validation error. This blob is available only in the GET response of a started service.
items:
$ref: '#/definitions/Container'
run_privileged_container:
type: boolean
description: Run all containers of this component in privileged mode (YARN-4262).
placement_policy:
description: Advanced scheduling and placement policies for all containers of this component (optional). If not specified, the service level placement_policy takes effect. Refer to the description at the global level for more details.
description: Advanced scheduling and placement policies for all containers of this component.
$ref: '#/definitions/PlacementPolicy'
configuration:
description: Config properties for this component.
@ -380,7 +445,7 @@ definitions:
properties:
properties:
type: object
description: A blob of key-value pairs for configuring the YARN service AM
description: A blob of key-value pairs for configuring the YARN service AM.
additionalProperties:
type: string
env:
@ -405,7 +470,6 @@ definitions:
- JSON
- YAML
- TEMPLATE
- ENV
- HADOOP_XML
dest_file:
type: string
@ -416,6 +480,8 @@ definitions:
properties:
type: object
description: A blob of key value pairs that will be dumped in the dest_file in the format as specified in type. If src_file is specified, src_file content are dumped in the dest_file and these properties will overwrite, if any, existing properties in src_file or be added as new properties in src_file.
additionalProperties:
type: string
Container:
description: An instance of a running service container.
properties:
@ -464,6 +530,7 @@ definitions:
- STABLE
- STOPPED
- FAILED
- FLEX
ContainerState:
description: The current state of the container of a service.
properties:

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@ -149,6 +150,12 @@ public class ServiceScheduler extends CompositeService {
new ConcurrentHashMap<>();
private long containerRecoveryTimeout;
// If even one component of a service uses placement constraints, then use
// placement scheduler to schedule containers for all components (including
// the ones with no constraints). Mixing of container requests and scheduling
// requests for a single service is not recommended.
private boolean hasAtLeastOnePlacementConstraint;
public ServiceScheduler(ServiceContext context) {
super(context.service.getName());
this.context = context;
@ -286,6 +293,9 @@ public void serviceStop() throws Exception {
public void serviceStart() throws Exception {
super.serviceStart();
InetSocketAddress bindAddress = context.clientAMService.getBindAddress();
// When yarn.resourcemanager.placement-constraints.handler is set to
// placement-processor then constraints need to be added during
// registerApplicationMaster.
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(bindAddress.getHostName(),
bindAddress.getPort(), "N/A");
@ -512,6 +522,12 @@ private void createAllComponents() {
componentsById.put(allocateId, component);
componentsByName.put(component.getName(), component);
allocateId++;
if (!hasAtLeastOnePlacementConstraint
&& compSpec.getPlacementPolicy() != null
&& compSpec.getPlacementPolicy().getConstraints() != null
&& !compSpec.getPlacementPolicy().getConstraints().isEmpty()) {
hasAtLeastOnePlacementConstraint = true;
}
}
}
@ -681,8 +697,14 @@ public void onContainersUpdated(List<UpdatedContainer> containers) {
@Override public void onError(Throwable e) {
LOG.error("Error in AMRMClient callback handler ", e);
}
}
@Override
public void onRequestsRejected(
List<RejectedSchedulingRequest> rejectedSchedulingRequests) {
LOG.error("Error in AMRMClient callback handler. Following scheduling "
+ "requests were rejected: {}", rejectedSchedulingRequests);
}
}
private class NMClientCallback extends NMClientAsync.AbstractCallbackHandler {
@ -810,4 +832,8 @@ public AsyncDispatcher getDispatcher() {
public BoundedAppender getDiagnostics() {
return diagnostics;
}
public boolean hasAtLeastOnePlacementConstraint() {
return hasAtLeastOnePlacementConstraint;
}
}

View File

@ -56,11 +56,6 @@ public List<Component> findTargetComponentSpecs(Service currentDef,
throw new UnsupportedOperationException("changes to queue " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getPlacementPolicy(),
targetDef.getPlacementPolicy())) {
throw new UnsupportedOperationException("changes to placement policy " +
"not supported by upgrade");
}
if (!Objects.equals(currentDef.getResource(), targetDef.getResource())) {
throw new UnsupportedOperationException("changes to resource " +

View File

@ -269,16 +269,15 @@ public void setRunPrivilegedContainer(Boolean runPrivilegedContainer) {
/**
* Advanced scheduling and placement policies for all containers of this
* component (optional). If not specified, the service level placement_policy
* takes effect. Refer to the description at the global level for more
* details.
* component.
**/
public Component placementPolicy(PlacementPolicy placementPolicy) {
this.placementPolicy = placementPolicy;
return this;
}
@ApiModelProperty(example = "null", value = "Advanced scheduling and placement policies for all containers of this component (optional). If not specified, the service level placement_policy takes effect. Refer to the description at the global level for more details.")
@ApiModelProperty(example = "null", value = "Advanced scheduling and "
+ "placement policies for all containers of this component.")
public PlacementPolicy getPlacementPolicy() {
return placementPolicy;
}

View File

@ -55,7 +55,7 @@ public class ConfigFile implements Serializable {
@XmlEnum
public enum TypeEnum {
XML("XML"), PROPERTIES("PROPERTIES"), JSON("JSON"), YAML("YAML"), TEMPLATE(
"TEMPLATE"), HADOOP_XML("HADOOP_XML"),;
"TEMPLATE"), HADOOP_XML("HADOOP_XML");
private String value;

View File

@ -0,0 +1,283 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.api.records;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.xml.bind.annotation.XmlElement;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**
* Placement constraint details.
**/
@InterfaceAudience.Public
@InterfaceStability.Unstable
@ApiModel(description = "Placement constraint details.")
@javax.annotation.Generated(
value = "class io.swagger.codegen.languages.JavaClientCodegen",
date = "2018-02-16T10:20:12.927-07:00")
public class PlacementConstraint implements Serializable {
private static final long serialVersionUID = 1518017165676511762L;
private String name = null;
private PlacementType type = null;
private PlacementScope scope = null;
@JsonProperty("target_tags")
@XmlElement(name = "target_tags")
private List<String> targetTags = new ArrayList<>();
@JsonProperty("node_attributes")
@XmlElement(name = "node_attributes")
private Map<String, List<String>> nodeAttributes = new HashMap<>();
@JsonProperty("node_partitions")
@XmlElement(name = "node_partitions")
private List<String> nodePartitions = new ArrayList<>();
@JsonProperty("min_cardinality")
@XmlElement(name = "min_cardinality")
private Long minCardinality = null;
@JsonProperty("max_cardinality")
@XmlElement(name = "max_cardinality")
private Long maxCardinality = null;
/**
* An optional name associated to this constraint.
**/
public PlacementConstraint name(String name) {
this.name = name;
return this;
}
@ApiModelProperty(example = "C1", required = true)
@JsonProperty("name")
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
/**
* The type of placement.
**/
public PlacementConstraint type(PlacementType type) {
this.type = type;
return this;
}
@ApiModelProperty(example = "null", required = true)
@JsonProperty("type")
public PlacementType getType() {
return type;
}
public void setType(PlacementType type) {
this.type = type;
}
/**
* The scope of placement.
**/
public PlacementConstraint scope(PlacementScope scope) {
this.scope = scope;
return this;
}
@ApiModelProperty(example = "null", required = true)
@JsonProperty("scope")
public PlacementScope getScope() {
return scope;
}
public void setScope(PlacementScope scope) {
this.scope = scope;
}
/**
* The name of the components that this component's placement policy is
* depending upon are added as target tags. So for affinity say, this
* component's containers are requesting to be placed on hosts where
* containers of the target tag component(s) are running on. Target tags can
* also contain the name of this component, in which case it implies that for
* anti-affinity say, no more than one container of this component can be
* placed on a host. Similarly, for cardinality, it would mean that containers
* of this component is requesting to be placed on hosts where at least
* minCardinality but no more than maxCardinality containers of the target tag
* component(s) are running.
**/
public PlacementConstraint targetTags(List<String> targetTags) {
this.targetTags = targetTags;
return this;
}
@ApiModelProperty(example = "[\"hbase-regionserver\"]")
public List<String> getTargetTags() {
return targetTags;
}
public void setTargetTags(List<String> targetTags) {
this.targetTags = targetTags;
}
/**
* Node attributes are a set of key:value(s) pairs associated with nodes.
*/
public PlacementConstraint nodeAttributes(
Map<String, List<String>> nodeAttributes) {
this.nodeAttributes = nodeAttributes;
return this;
}
@ApiModelProperty(example = "\"JavaVersion\":[\"1.7\", \"1.8\"]")
public Map<String, List<String>> getNodeAttributes() {
return nodeAttributes;
}
public void setNodeAttributes(Map<String, List<String>> nodeAttributes) {
this.nodeAttributes = nodeAttributes;
}
/**
* Node partitions where the containers of this component can run.
*/
public PlacementConstraint nodePartitions(
List<String> nodePartitions) {
this.nodePartitions = nodePartitions;
return this;
}
@ApiModelProperty(example = "[\"gpu\", \"fast_disk\"]")
public List<String> getNodePartitions() {
return nodePartitions;
}
public void setNodePartitions(List<String> nodePartitions) {
this.nodePartitions = nodePartitions;
}
/**
* When placement type is cardinality, the minimum number of containers of the
* depending component that a host should have, where containers of this
* component can be allocated on.
**/
public PlacementConstraint minCardinality(Long minCardinality) {
this.minCardinality = minCardinality;
return this;
}
@ApiModelProperty(example = "2")
public Long getMinCardinality() {
return minCardinality;
}
public void setMinCardinality(Long minCardinality) {
this.minCardinality = minCardinality;
}
/**
* When placement type is cardinality, the maximum number of containers of the
* depending component that a host should have, where containers of this
* component can be allocated on.
**/
public PlacementConstraint maxCardinality(Long maxCardinality) {
this.maxCardinality = maxCardinality;
return this;
}
@ApiModelProperty(example = "3")
public Long getMaxCardinality() {
return maxCardinality;
}
public void setMaxCardinality(Long maxCardinality) {
this.maxCardinality = maxCardinality;
}
@Override
public boolean equals(java.lang.Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PlacementConstraint placementConstraint = (PlacementConstraint) o;
return Objects.equals(this.name, placementConstraint.name)
&& Objects.equals(this.type, placementConstraint.type)
&& Objects.equals(this.scope, placementConstraint.scope)
&& Objects.equals(this.targetTags, placementConstraint.targetTags)
&& Objects.equals(this.nodeAttributes,
placementConstraint.nodeAttributes)
&& Objects.equals(this.nodePartitions,
placementConstraint.nodePartitions)
&& Objects.equals(this.minCardinality,
placementConstraint.minCardinality)
&& Objects.equals(this.maxCardinality,
placementConstraint.maxCardinality);
}
@Override
public int hashCode() {
return Objects.hash(name, type, scope, targetTags, nodeAttributes,
nodePartitions, minCardinality, maxCardinality);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class PlacementConstraint {\n");
sb.append(" name: ").append(toIndentedString(name)).append("\n");
sb.append(" type: ").append(toIndentedString(type)).append("\n");
sb.append(" scope: ").append(toIndentedString(scope)).append("\n");
sb.append(" targetTags: ").append(toIndentedString(targetTags))
.append("\n");
sb.append(" nodeAttributes: ").append(toIndentedString(nodeAttributes))
.append("\n");
sb.append(" nodePartitions: ").append(toIndentedString(nodePartitions))
.append("\n");
sb.append(" minCardinality: ").append(toIndentedString(minCardinality))
.append("\n");
sb.append(" maxCardinality: ").append(toIndentedString(maxCardinality))
.append("\n");
sb.append("}");
return sb.toString();
}
/**
* Convert the given object to string with each line indented by 4 spaces
* (except the first line).
*/
private String toIndentedString(java.lang.Object o) {
if (o == null) {
return "null";
}
return o.toString().replace("\n", "\n ");
}
}

View File

@ -17,49 +17,50 @@
package org.apache.hadoop.yarn.service.api.records;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**
* Placement policy of an instance of an service. This feature is in the
* works in YARN-4902.
* Advanced placement policy of the components of a service.
**/
@InterfaceAudience.Public
@InterfaceStability.Unstable
@ApiModel(description = "Placement policy of an instance of an service. This feature is in the works in YARN-4902.")
@javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
@ApiModel(description = "Advanced placement policy of the components of a "
+ "service.")
@javax.annotation.Generated(
value = "class io.swagger.codegen.languages.JavaClientCodegen",
date = "2018-02-16T10:20:12.927-07:00")
public class PlacementPolicy implements Serializable {
private static final long serialVersionUID = 4341110649551172231L;
private String label = null;
private List<PlacementConstraint> constraints = new ArrayList<>();
/**
* Assigns a service to a named partition of the cluster where the service
* desires to run (optional). If not specified all services are submitted to
* a default label of the service owner. One or more labels can be setup for
* each service owner account with required constraints like no-preemption,
* sla-99999, preemption-ok, etc.
* Placement constraint details.
**/
public PlacementPolicy label(String label) {
this.label = label;
public PlacementPolicy constraints(List<PlacementConstraint> constraints) {
this.constraints = constraints;
return this;
}
@ApiModelProperty(example = "null", value = "Assigns a service to a named partition of the cluster where the service desires to run (optional). If not specified all services are submitted to a default label of the service owner. One or more labels can be setup for each service owner account with required constraints like no-preemption, sla-99999, preemption-ok, etc.")
@JsonProperty("label")
public String getLabel() {
return label;
@ApiModelProperty(example = "null", required = true)
@JsonProperty("constraints")
public List<PlacementConstraint> getConstraints() {
return constraints;
}
public void setLabel(String label) {
this.label = label;
public void setConstraints(List<PlacementConstraint> constraints) {
this.constraints = constraints;
}
@Override
@ -71,12 +72,12 @@ public boolean equals(java.lang.Object o) {
return false;
}
PlacementPolicy placementPolicy = (PlacementPolicy) o;
return Objects.equals(this.label, placementPolicy.label);
return Objects.equals(this.constraints, placementPolicy.constraints);
}
@Override
public int hashCode() {
return Objects.hash(label);
return Objects.hash(constraints);
}
@Override
@ -84,7 +85,8 @@ public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class PlacementPolicy {\n");
sb.append(" label: ").append(toIndentedString(label)).append("\n");
sb.append(" constraints: ").append(toIndentedString(constraints))
.append("\n");
sb.append("}");
return sb.toString();
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.api.records;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import com.fasterxml.jackson.annotation.JsonValue;
import io.swagger.annotations.ApiModel;
/**
* The scope of placement for the containers of a component.
**/
@InterfaceAudience.Public
@InterfaceStability.Unstable
@ApiModel(description = "The scope of placement for the containers of a "
+ "component.")
@javax.annotation.Generated(
value = "class io.swagger.codegen.languages.JavaClientCodegen",
date = "2018-02-16T10:20:12.927-07:00")
public enum PlacementScope {
NODE(PlacementConstraints.NODE), RACK(PlacementConstraints.RACK);
private String value;
PlacementScope(String value) {
this.value = value;
}
public String getValue() {
return value;
}
@Override
@JsonValue
public String toString() {
return value;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.api.records;
import io.swagger.annotations.ApiModel;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* The type of placement - affinity/anti-affinity/affinity-with-cardinality with
* containers of another component or containers of the same component (self).
**/
@InterfaceAudience.Public
@InterfaceStability.Unstable
@ApiModel(description = "The type of placement - affinity/anti-affinity/"
+ "affinity-with-cardinality with containers of another component or "
+ "containers of the same component (self).")
@javax.annotation.Generated(
value = "class io.swagger.codegen.languages.JavaClientCodegen",
date = "2018-02-16T10:20:12.927-07:00")
public enum PlacementType {
AFFINITY, ANTI_AFFINITY, AFFINITY_WITH_CARDINALITY;
}

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import javax.xml.bind.annotation.XmlElement;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@ -49,7 +50,7 @@ public class Resource extends BaseResource implements Cloneable {
@JsonProperty("additional")
@XmlElement(name = "additional")
private Map<String, ResourceInformation> additional = null;
private Map<String, ResourceInformation> additional = new HashMap<>();
/**
* Each resource profile has a unique id which is associated with a

View File

@ -63,9 +63,6 @@ public class Service extends BaseResource {
@XmlElement(name = "number_of_running_containers")
private Long numberOfRunningContainers = null;
private Long lifetime = null;
@JsonProperty("placement_policy")
@XmlElement(name = "placement_policy")
private PlacementPolicy placementPolicy = null;
private List<Component> components = new ArrayList<>();
private Configuration configuration = new Configuration();
private ServiceState state = null;
@ -248,28 +245,6 @@ public void setLifetime(Long lifetime) {
this.lifetime = lifetime;
}
/**
* Advanced scheduling and placement policies (optional). If not specified, it
* defaults to the default placement policy of the service owner. The design of
* placement policies are in the works. It is not very clear at this point,
* how policies in conjunction with labels be exposed to service owners.
* This is a placeholder for now. The advanced structure of this attribute
* will be determined by YARN-4902.
**/
public Service placementPolicy(PlacementPolicy placementPolicy) {
this.placementPolicy = placementPolicy;
return this;
}
@ApiModelProperty(example = "null", value = "Advanced scheduling and placement policies (optional). If not specified, it defaults to the default placement policy of the service owner. The design of placement policies are in the works. It is not very clear at this point, how policies in conjunction with labels be exposed to service owners. This is a placeholder for now. The advanced structure of this attribute will be determined by YARN-4902.")
public PlacementPolicy getPlacementPolicy() {
return placementPolicy;
}
public void setPlacementPolicy(PlacementPolicy placementPolicy) {
this.placementPolicy = placementPolicy;
}
/**
* Components of an service.
**/
@ -429,8 +404,6 @@ public String toString() {
sb.append(" numberOfRunningContainers: ")
.append(toIndentedString(numberOfRunningContainers)).append("\n");
sb.append(" lifetime: ").append(toIndentedString(lifetime)).append("\n");
sb.append(" placementPolicy: ").append(toIndentedString(placementPolicy))
.append("\n");
sb.append(" components: ").append(toIndentedString(components))
.append("\n");
sb.append(" configuration: ").append(toIndentedString(configuration))

View File

@ -85,8 +85,8 @@ public void setState(ServiceState state) {
}
/**
* An error code specific to a scenario which service owners should be able to use
* to understand the failure in addition to the diagnostic information.
* An error code specific to a scenario which service owners should be able to
* use to understand the failure in addition to the diagnostic information.
**/
public ServiceStatus code(Integer code) {
this.code = code;

View File

@ -20,32 +20,41 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
import org.apache.hadoop.yarn.service.ContainerFailureTracker;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.ServiceMaster;
import org.apache.hadoop.yarn.service.ServiceMetrics;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
import org.apache.hadoop.yarn.service.monitor.probe.Probe;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
import org.apache.hadoop.yarn.service.monitor.probe.Probe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -55,6 +64,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -66,9 +76,8 @@
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
import static org.apache.hadoop.yarn.service.component.ComponentState.*;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_FAILURE_THRESHOLD;
public class Component implements EventHandler<ComponentEvent> {
@ -407,6 +416,8 @@ private void assignContainerToCompInstance(Container container) {
@SuppressWarnings({ "unchecked" })
public void requestContainers(long count) {
LOG.info("[COMPONENT {}] Requesting for {} container(s)",
componentSpec.getName(), count);
org.apache.hadoop.yarn.service.api.records.Resource componentResource =
componentSpec.getResource();
@ -439,12 +450,98 @@ public void requestContainers(long count) {
}
}
for (int i = 0; i < count; i++) {
//TODO Once YARN-5468 is done, use that for anti-affinity
ContainerRequest request =
ContainerRequest.newBuilder().capability(resource).priority(priority)
.allocationRequestId(allocateId).relaxLocality(true).build();
amrmClient.addContainerRequest(request);
if (!scheduler.hasAtLeastOnePlacementConstraint()) {
for (int i = 0; i < count; i++) {
ContainerRequest request = ContainerRequest.newBuilder()
.capability(resource).priority(priority)
.allocationRequestId(allocateId).relaxLocality(true).build();
LOG.info("[COMPONENT {}] Submitting container request : {}",
componentSpec.getName(), request);
amrmClient.addContainerRequest(request);
}
} else {
// Schedule placement requests. Validation of non-null target tags and
// that they refer to existing component names are already done. So, no
// need to validate here.
PlacementPolicy placementPolicy = componentSpec.getPlacementPolicy();
Collection<SchedulingRequest> schedulingRequests = new HashSet<>();
// We prepare an AND-ed composite constraint to be the final composite
// constraint. If placement expressions are specified to create advanced
// composite constraints then this AND-ed composite constraint is not
// used.
PlacementConstraint finalConstraint = null;
for (org.apache.hadoop.yarn.service.api.records.PlacementConstraint
yarnServiceConstraint : placementPolicy.getConstraints()) {
List<TargetExpression> targetExpressions = new ArrayList<>();
// Currently only intra-application allocation tags are supported.
if (!yarnServiceConstraint.getTargetTags().isEmpty()) {
targetExpressions.add(PlacementTargets.allocationTagToIntraApp(
yarnServiceConstraint.getTargetTags().toArray(new String[0])));
}
// Add all node attributes
for (Map.Entry<String, List<String>> attribute : yarnServiceConstraint
.getNodeAttributes().entrySet()) {
targetExpressions.add(PlacementTargets.nodeAttribute(
attribute.getKey(), attribute.getValue().toArray(new String[0])));
}
// Add all node partitions
if (!yarnServiceConstraint.getNodePartitions().isEmpty()) {
targetExpressions
.add(PlacementTargets.nodePartition(yarnServiceConstraint
.getNodePartitions().toArray(new String[0])));
}
PlacementConstraint constraint = null;
switch (yarnServiceConstraint.getType()) {
case AFFINITY:
constraint = PlacementConstraints
.targetIn(yarnServiceConstraint.getScope().getValue(),
targetExpressions.toArray(new TargetExpression[0]))
.build();
break;
case ANTI_AFFINITY:
constraint = PlacementConstraints
.targetNotIn(yarnServiceConstraint.getScope().getValue(),
targetExpressions.toArray(new TargetExpression[0]))
.build();
break;
case AFFINITY_WITH_CARDINALITY:
constraint = PlacementConstraints.targetCardinality(
yarnServiceConstraint.getScope().name().toLowerCase(),
yarnServiceConstraint.getMinCardinality() == null ? 0
: yarnServiceConstraint.getMinCardinality().intValue(),
yarnServiceConstraint.getMaxCardinality() == null
? Integer.MAX_VALUE
: yarnServiceConstraint.getMaxCardinality().intValue(),
targetExpressions.toArray(new TargetExpression[0])).build();
break;
}
// The default AND-ed final composite constraint
if (finalConstraint != null) {
finalConstraint = PlacementConstraints
.and(constraint.getConstraintExpr(),
finalConstraint.getConstraintExpr())
.build();
} else {
finalConstraint = constraint;
}
LOG.debug("[COMPONENT {}] Placement constraint: {}",
componentSpec.getName(), constraint.getConstraintExpr().toString());
}
ResourceSizing resourceSizing = ResourceSizing.newInstance((int) count,
resource);
LOG.debug("[COMPONENT {}] Resource sizing: {}", componentSpec.getName(),
resourceSizing);
SchedulingRequest request = SchedulingRequest.newBuilder()
.priority(priority).allocationRequestId(allocateId)
.allocationTags(Collections.singleton(componentSpec.getName()))
.executionType(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true))
.placementConstraintExpression(finalConstraint)
.resourceSizing(resourceSizing).build();
LOG.info("[COMPONENT {}] Submitting scheduling request: {}",
componentSpec.getName(), request);
schedulingRequests.add(request);
amrmClient.addSchedulingRequests(schedulingRequests);
}
}

View File

@ -91,4 +91,16 @@ public interface RestApiErrorMessages {
String ERROR_QUICKLINKS_FOR_COMP_INVALID = "Quicklinks specified at"
+ " component level, needs corresponding values set at service level";
String ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME = "Invalid target tag %s "
+ "specified in placement policy of component %s. For now, target tags "
+ "support self reference only. Specifying anything other than its "
+ "component name is not supported. Set target tag of component %s to "
+ "%s.";
String ERROR_PLACEMENT_POLICY_TAG_NAME_INVALID = "Invalid target tag %s "
+ "specified in placement policy of component %s. Target tags should be "
+ "a valid component name in the service.";
String ERROR_PLACEMENT_POLICY_EXPRESSION_ELEMENT_NAME_INVALID = "Invalid "
+ "expression element name %s specified in placement policy of component "
+ "%s. Expression element names should be a valid constraint name or an "
+ "expression name defined for this component only.";
}

View File

@ -268,10 +268,6 @@ private void publishComponents(List<Component> components) {
}
entityInfos.put(ServiceTimelineMetricsConstants.RUN_PRIVILEGED_CONTAINER,
component.getRunPrivilegedContainer().toString());
if (component.getPlacementPolicy() != null) {
entityInfos.put(ServiceTimelineMetricsConstants.PLACEMENT_POLICY,
component.getPlacementPolicy().getLabel());
}
entity.addInfo(entityInfos);
putEntity(entity);

View File

@ -27,17 +27,18 @@
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
import org.apache.hadoop.yarn.service.provider.ProviderFactory;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
import org.apache.hadoop.yarn.service.provider.ProviderFactory;
import org.codehaus.jackson.map.PropertyNamingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -202,6 +203,7 @@ public static void validateAndResolveService(Service service,
}
validateComponent(comp, fs.getFileSystem(), conf);
}
validatePlacementPolicy(service.getComponents(), componentNames);
// validate dependency tree
sortByDependencies(service.getComponents());
@ -263,6 +265,24 @@ public static void validateNameFormat(String name,
namePattern.validate(name);
}
private static void validatePlacementPolicy(List<Component> components,
Set<String> componentNames) {
for (Component comp : components) {
if (comp.getPlacementPolicy() != null) {
for (PlacementConstraint constraint : comp.getPlacementPolicy()
.getConstraints()) {
for (String targetTag : constraint.getTargetTags()) {
if (!comp.getName().equals(targetTag)) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME,
targetTag, comp.getName(), comp.getName(), comp.getName()));
}
}
}
}
}
}
@VisibleForTesting
public static List<Component> getComponents(SliderFileSystem
fs, String serviceName) throws IOException {

View File

@ -22,6 +22,8 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
@ -490,4 +492,37 @@ private static void testComponent(SliderFileSystem sfs)
Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
}
}
@Test
public void testPlacementPolicy() throws IOException {
SliderFileSystem sfs = ServiceTestUtils.initMockFs();
Service app = createValidApplication("comp-a");
Component comp = app.getComponents().get(0);
PlacementPolicy pp = new PlacementPolicy();
PlacementConstraint pc = new PlacementConstraint();
pc.setName("CA1");
pc.setTargetTags(Collections.singletonList("comp-invalid"));
pp.setConstraints(Collections.singletonList(pc));
comp.setPlacementPolicy(pp);
try {
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
Assert.fail(EXCEPTION_PREFIX + "service with empty placement");
} catch (IllegalArgumentException e) {
assertEquals(
String.format(
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME,
"comp-invalid", "comp-a", "comp-a", "comp-a"),
e.getMessage());
}
pc.setTargetTags(Collections.singletonList("comp-a"));
// now it should succeed
try {
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
} catch (IllegalArgumentException e) {
Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
}
}
}

View File

@ -24,14 +24,21 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
import org.apache.hadoop.yarn.service.api.records.PlacementScope;
import org.apache.hadoop.yarn.service.api.records.PlacementType;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
@ -355,6 +362,103 @@ public void testUpgradeService() throws Exception {
Assert.assertEquals(service.getVersion(), fromFs.getVersion());
}
// Test to verify ANTI_AFFINITY placement policy
// 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler
// 2. Create an example service with 3 containers
// 3. Verify no more than 1 container comes up in each of the 3 NMs
// 4. Flex the component to 4 containers
// 5. Verify that the 4th container does not even get allocated since there
// are only 3 NMs
@Test (timeout = 200000)
public void testCreateServiceWithPlacementPolicy() throws Exception {
// We need to enable scheduler placement-constraint at the cluster level to
// let apps use placement policies.
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
setConf(conf);
setupInternal(3);
ServiceClient client = createClient(getConf());
Service exampleApp = new Service();
exampleApp.setName("example-app");
exampleApp.setVersion("v1");
Component comp = createComponent("compa", 3L, "sleep 1000");
PlacementPolicy pp = new PlacementPolicy();
PlacementConstraint pc = new PlacementConstraint();
pc.setName("CA1");
pc.setTargetTags(Collections.singletonList("compa"));
pc.setScope(PlacementScope.NODE);
pc.setType(PlacementType.ANTI_AFFINITY);
pp.setConstraints(Collections.singletonList(pc));
comp.setPlacementPolicy(pp);
exampleApp.addComponent(comp);
client.actionCreate(exampleApp);
waitForServiceToBeStable(client, exampleApp);
// Check service is stable and all 3 containers are running
Service service = client.getStatus(exampleApp.getName());
Component component = service.getComponent("compa");
Assert.assertEquals("Service state should be STABLE", ServiceState.STABLE,
service.getState());
Assert.assertEquals("3 containers are expected to be running", 3,
component.getContainers().size());
// Prepare a map of non-AM containers for later lookup
Set<String> nonAMContainerIdSet = new HashSet<>();
for (Container cont : component.getContainers()) {
nonAMContainerIdSet.add(cont.getId());
}
// Verify that no more than 1 non-AM container came up on each of the 3 NMs
Set<String> hosts = new HashSet<>();
ApplicationReport report = client.getYarnClient()
.getApplicationReport(ApplicationId.fromString(exampleApp.getId()));
GetContainersRequest req = GetContainersRequest
.newInstance(report.getCurrentApplicationAttemptId());
ResourceManager rm = getYarnCluster().getResourceManager();
for (ContainerReport contReport : rm.getClientRMService().getContainers(req)
.getContainerList()) {
if (!nonAMContainerIdSet
.contains(contReport.getContainerId().toString())) {
continue;
}
if (hosts.contains(contReport.getNodeHttpAddress())) {
Assert.fail("Container " + contReport.getContainerId()
+ " came up in the same host as another container.");
} else {
hosts.add(contReport.getNodeHttpAddress());
}
}
// Flex compa up to 4, which is more containers than the no of NMs
Map<String, Long> compCounts = new HashMap<>();
compCounts.put("compa", 4L);
exampleApp.getComponent("compa").setNumberOfContainers(4L);
client.flexByRestService(exampleApp.getName(), compCounts);
try {
// 10 secs is enough for the container to be started. The down side of
// this test is that it has to wait that long. Setting a higher wait time
// will add to the total time taken by tests to run.
waitForServiceToBeStable(client, exampleApp, 10000);
Assert.fail("Service should not be in a stable state. It should throw "
+ "a timeout exception.");
} catch (Exception e) {
// Check that service state is not STABLE and only 3 containers are
// running and the fourth one should not get allocated.
service = client.getStatus(exampleApp.getName());
component = service.getComponent("compa");
Assert.assertNotEquals("Service state should not be STABLE",
ServiceState.STABLE, service.getState());
Assert.assertEquals("Component state should be FLEXING",
ComponentState.FLEXING, component.getState());
Assert.assertEquals("3 containers are expected to be running", 3,
component.getContainers().size());
}
LOG.info("Stop/destroy service {}", exampleApp);
client.actionStop(exampleApp.getName(), true);
client.actionDestroy(exampleApp.getName());
}
// Check containers launched are in dependency order
// Get all containers into a list and sort based on container launch time e.g.
// compa-c1, compa-c2, compb-c1, compb-c2;
@ -492,7 +596,14 @@ private Multimap<String, String> waitForAllCompToBeReady(ServiceClient client,
*/
private void waitForServiceToBeStable(ServiceClient client,
Service exampleApp) throws TimeoutException, InterruptedException {
waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE);
waitForServiceToBeStable(client, exampleApp, 200000);
}
private void waitForServiceToBeStable(ServiceClient client,
Service exampleApp, int waitForMillis)
throws TimeoutException, InterruptedException {
waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE,
waitForMillis);
}
/**
@ -508,6 +619,12 @@ private void waitForServiceToBeStarted(ServiceClient client,
waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED);
}
private void waitForServiceToBeInState(ServiceClient client,
Service exampleApp, ServiceState desiredState) throws TimeoutException,
InterruptedException {
waitForServiceToBeInState(client, exampleApp, desiredState, 200000);
}
/**
* Wait until service is started. It does not have to reach a stable state.
*
@ -517,8 +634,8 @@ private void waitForServiceToBeStarted(ServiceClient client,
* @throws InterruptedException
*/
private void waitForServiceToBeInState(ServiceClient client,
Service exampleApp, ServiceState desiredState) throws TimeoutException,
InterruptedException {
Service exampleApp, ServiceState desiredState, int waitForMillis) throws
TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
@ -528,7 +645,7 @@ private void waitForServiceToBeInState(ServiceClient client,
e.printStackTrace();
return false;
}
}, 2000, 200000);
}, 2000, waitForMillis);
}
private int countTotalContainers(Service service) {

View File

@ -34,7 +34,9 @@
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
import org.apache.hadoop.yarn.service.api.records.PlacementType;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
@ -45,6 +47,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -208,8 +211,6 @@ private void verifyComponentTimelineEntity(TimelineEntity entity) {
info.get(ServiceTimelineMetricsConstants.LAUNCH_COMMAND));
assertEquals("false",
info.get(ServiceTimelineMetricsConstants.RUN_PRIVILEGED_CONTAINER));
assertEquals("label",
info.get(ServiceTimelineMetricsConstants.PLACEMENT_POLICY));
}
private static Service createMockApplication() {
@ -234,7 +235,10 @@ private static Service createMockApplication() {
when(component.getResource()).thenReturn(resource);
when(component.getLaunchCommand()).thenReturn("sleep 1");
PlacementPolicy placementPolicy = new PlacementPolicy();
placementPolicy.setLabel("label");
PlacementConstraint placementConstraint = new PlacementConstraint();
placementConstraint.setType(PlacementType.ANTI_AFFINITY);
placementPolicy
.setConstraints(Collections.singletonList(placementConstraint));
when(component.getPlacementPolicy()).thenReturn(placementPolicy);
when(component.getConfiguration()).thenReturn(
new org.apache.hadoop.yarn.service.api.records.Configuration());

View File

@ -291,6 +291,7 @@ public String toString() {
", executionType=" + getExecutionType() +
", allocationTags=" + getAllocationTags() +
", resourceSizing=" + getResourceSizing() +
", placementConstraint=" + getPlacementConstraint() +
'}';
}
}

View File

@ -131,29 +131,6 @@ Update the runtime properties of a service. Currently the following operations a
|default|Unexpected error|ServiceStatus|
### Destroy a service
```
DELETE /app/v1/services/{service_name}
```
#### Description
Destroy a service and release all resources. This API might have to return JSON data providing location of logs (TBD), etc.
#### Parameters
|Type|Name|Description|Required|Schema|Default|
|----|----|----|----|----|----|
|PathParameter|service_name|Service name|true|string||
#### Responses
|HTTP Code|Description|Schema|
|----|----|----|
|204|Destroy was successful|No Content|
|404|Service does not exist|No Content|
|default|Unexpected error|ServiceStatus|
### Get details of a service.
```
GET /app/v1/services/{service_name}
@ -177,6 +154,29 @@ Return the details (including containers) of a running service
|default|Unexpected error|ServiceStatus|
### Destroy a service
```
DELETE /app/v1/services/{service_name}
```
#### Description
Destroy a service and release all resources. This API might have to return JSON data providing location of logs (TBD), etc.
#### Parameters
|Type|Name|Description|Required|Schema|Default|
|----|----|----|----|----|----|
|PathParameter|service_name|Service name|true|string||
#### Responses
|HTTP Code|Description|Schema|
|----|----|----|
|204|Destroy was successful|No Content|
|404|Service does not exist|No Content|
|default|Unexpected error|ServiceStatus|
### Flex a component's number of instances.
```
PUT /app/v1/services/{service_name}/components/{component_name}
@ -201,6 +201,7 @@ Set a component's desired number of instanes
|404|Service does not exist|No Content|
|default|Unexpected error|ServiceStatus|
## Definitions
### Artifact
@ -227,8 +228,9 @@ One or more components of the service. If the service is HBase say, then the com
|launch_command|The custom launch command of this component (optional for DOCKER component, required otherwise). When specified at the component level, it overrides the value specified at the global level (if any).|false|string||
|resource|Resource of this component (optional). If not specified, the service level global resource takes effect.|false|Resource||
|number_of_containers|Number of containers for this component (optional). If not specified, the service level global number_of_containers takes effect.|false|integer (int64)||
|containers|Containers of a started component. Specifying a value for this attribute for the POST payload raises a validation error. This blob is available only in the GET response of a started service.|false|Container array||
|run_privileged_container|Run all containers of this component in privileged mode (YARN-4262).|false|boolean||
|placement_policy|Advanced scheduling and placement policies for all containers of this component (optional). If not specified, the service level placement_policy takes effect. Refer to the description at the global level for more details.|false|PlacementPolicy||
|placement_policy|Advanced scheduling and placement policies for all containers of this component.|false|PlacementPolicy||
|configuration|Config properties for this component.|false|Configuration||
|quicklinks|A list of quicklink keys defined at the service level, and to be resolved by this component.|false|string array||
@ -248,7 +250,7 @@ A config file that needs to be created and made available as a volume in a servi
|Name|Description|Required|Schema|Default|
|----|----|----|----|----|
|type|Config file in the standard format like xml, properties, json, yaml, template.|false|enum (XML, PROPERTIES, JSON, YAML, TEMPLATE, ENV, HADOOP_XML)||
|type|Config file in the standard format like xml, properties, json, yaml, template.|false|enum (XML, PROPERTIES, JSON, YAML, TEMPLATE, HADOOP_XML)||
|dest_file|The path that this configuration file should be created as. If it is an absolute path, it will be mounted into the DOCKER container. Absolute paths are only allowed for DOCKER containers. If it is a relative path, only the file name should be provided, and the file will be created in the container local working directory under a folder named conf.|false|string||
|src_file|This provides the source location of the configuration file, the content of which is dumped to dest_file post property substitutions, in the format as specified in type. Typically the src_file would point to a source controlled network accessible file maintained by tools like puppet, chef, or hdfs etc. Currently, only hdfs is supported.|false|string||
|properties|A blob of key value pairs that will be dumped in the dest_file in the format as specified in type. If src_file is specified, src_file content are dumped in the dest_file and these properties will overwrite, if any, existing properties in src_file or be added as new properties in src_file.|false|object||
@ -291,6 +293,7 @@ The current state of the container of a service.
|----|----|----|----|----|
|state|enum of the state of the container|false|enum (INIT, STARTED, READY)||
### KerberosPrincipal
The kerberos principal info of the user who launches the service.
@ -301,13 +304,47 @@ The kerberos principal info of the user who launches the service.
|keytab|The URI of the kerberos keytab. It supports two modes, URI starts with "hdfs://": A path on hdfs where the keytab is stored. The keytab will be localized by YARN to each host; URI starts with "file://": A path on the local host where the keytab is stored. It is assumed that the keytabs are pre-installed by admins before AM launches.|false|string||
### PlacementPolicy
### PlacementConstraint
Placement policy of an instance of a service. This feature is in the works in YARN-6592.
Placement constraint details.
|Name|Description|Required|Schema|Default|
|----|----|----|----|----|
|label|Assigns a service to a named partition of the cluster where the service desires to run (optional). If not specified all services are submitted to a default label of the service owner. One or more labels can be setup for each service owner account with required constraints like no-preemption, sla-99999, preemption-ok, etc.|false|string||
|name|An optional name associated to this constraint.|false|string||
|type|The type of placement.|true|PlacementType||
|scope|The scope of placement.|true|PlacementScope||
|target_tags|The name of the components that this component's placement policy is depending upon are added as target tags. So for affinity say, this component's containers are requesting to be placed on hosts where containers of the target tag component(s) are running on. Target tags can also contain the name of this component, in which case it implies that for anti-affinity say, no more than one container of this component can be placed on a host. Similarly, for cardinality, it would mean that containers of this component is requesting to be placed on hosts where at least minCardinality but no more than maxCardinality containers of the target tag component(s) are running.|false|string array||
|node_attributes|Node attributes are a set of key:value(s) pairs associated with nodes.|false|object||
|node_partitions|Node partitions where the containers of this component can run.|false|string array||
|min_cardinality|When placement type is cardinality, the minimum number of containers of the depending component that a host should have, where containers of this component can be allocated on.|false|integer (int64)||
|max_cardinality|When placement type is cardinality, the maximum number of containers of the depending component that a host should have, where containers of this component can be allocated on.|false|integer (int64)||
### PlacementPolicy
Advanced placement policy of the components of a service.
|Name|Description|Required|Schema|Default|
|----|----|----|----|----|
|constraints|Placement constraint details.|true|PlacementConstraint array||
### PlacementScope
The scope of placement for the containers of a component.
|Name|Description|Required|Schema|Default|
|----|----|----|----|----|
|type||false|enum (NODE, RACK)||
### PlacementType
The type of placement - affinity/anti-affinity/affinity-with-cardinality with containers of another component or containers of the same component (self).
|Name|Description|Required|Schema|Default|
|----|----|----|----|----|
|type||false|enum (AFFINITY, ANTI_AFFINITY, AFFINITY_WITH_CARDINALITY)||
### ReadinessCheck
@ -333,6 +370,16 @@ Resource determines the amount of resources (vcores, memory, network, etc.) usab
|additional|A map of resource type name to resource type information. Including value (integer), and unit (string). This will be used to specify resource other than cpu and memory. Please refer to example below. | false | object ||
### ResourceInformation
ResourceInformation determines unit/value of resource types in addition to memory and vcores. It will be part of Resource object.
|Name|Description|Required|Schema|Default|
|----|----|----|----|----|
|value|Integer value of the resource.|false|integer (int64)||
|unit|Unit of the resource, acceptable values are - p/n/u/m/k/M/G/T/P/Ki/Mi/Gi/Ti/Pi. By default it is empty means no unit.|false|string||
### Service
a service resource has the following attributes.
@ -348,13 +395,12 @@ a service resource has the following attributes.
|launch_time|The time when the service was created, e.g. 2016-03-16T01:01:49.000Z.|false|string (date)||
|number_of_running_containers|In get response this provides the total number of running containers for this service (across all components) at the time of request. Note, a subsequent request can return a different number as and when more containers get allocated until it reaches the total number of containers or if a flex request has been made between the two requests.|false|integer (int64)||
|lifetime|Life time (in seconds) of the service from the time it reaches the STARTED state (after which it is automatically destroyed by YARN). For unlimited lifetime do not set a lifetime value.|false|integer (int64)||
|placement_policy|(TBD) Advanced scheduling and placement policies. If not specified, it defaults to the default placement policy of the service owner. The design of placement policies are in the works. It is not very clear at this point, how policies in conjunction with labels be exposed to service owners. This is a placeholder for now. The advanced structure of this attribute will be determined by YARN-4902.|false|PlacementPolicy||
|components|Components of a service.|false|Component array||
|configuration|Config properties of a service. Configurations provided at the service/global level are available to all the components. Specific properties can be overridden at the component level.|false|Configuration||
|state|State of the service. Specifying a value for this attribute for the PUT payload means update the service to this desired state.|false|ServiceState||
|quicklinks|A blob of key-value pairs of quicklinks to be exported for a service.|false|object||
|queue|The YARN queue that this service should be submitted to.|false|string||
|kerberos_principal | The principal info of the user who launches the service|false||
|kerberos_principal | The principal info of the user who launches the service|false|KerberosPrincipal||
### ServiceState
@ -362,7 +408,7 @@ The current state of a service.
|Name|Description|Required|Schema|Default|
|----|----|----|----|----|
|state|enum of the state of the service|false|enum (ACCEPTED, STARTED, READY, STOPPED, FAILED)||
|state|enum of the state of the service|false|enum (ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX)||
### ServiceStatus
@ -392,7 +438,7 @@ POST URL - http://localhost:8088/app/v1/services
[
{
"name": "hello",
"number_of_containers": 1,
"number_of_containers": 2,
"artifact": {
"id": "nginx:latest",
"type": "DOCKER"
@ -400,13 +446,7 @@ POST URL - http://localhost:8088/app/v1/services
"launch_command": "./start_nginx.sh",
"resource": {
"cpus": 1,
"memory": "256",
"additional" : {
"yarn.io/gpu" : {
"value" : 4,
"unit" : ""
}
}
"memory": "256"
}
}
]
@ -425,10 +465,12 @@ Note, lifetime value of -1 means unlimited lifetime.
"description": "hello world example",
"id": "application_1503963985568_0002",
"lifetime": -1,
"state": "STABLE",
"components": [
{
"name": "hello",
"dependencies": [],
"state": "STABLE",
"resource": {
"cpus": 1,
"memory": "256"
@ -441,21 +483,21 @@ Note, lifetime value of -1 means unlimited lifetime.
"quicklinks": [],
"containers": [
{
"id": "container_e03_1503963985568_0002_01_000001",
"id": "container_e03_1503963985568_0002_01_000002",
"ip": "10.22.8.143",
"hostname": "myhost.local",
"hostname": "ctr-e03-1503963985568-0002-01-000002.example.site",
"state": "READY",
"launch_time": 1504051512412,
"bare_host": "10.22.8.143",
"bare_host": "host100.cloud.com",
"component_instance_name": "hello-0"
},
{
"id": "container_e03_1503963985568_0002_01_000002",
"ip": "10.22.8.143",
"hostname": "myhost.local",
"id": "container_e03_1503963985568_0002_01_000003",
"ip": "10.22.8.144",
"hostname": "ctr-e03-1503963985568-0002-01-000003.example.site",
"state": "READY",
"launch_time": 1504051536450,
"bare_host": "10.22.8.143",
"bare_host": "host100.cloud.com",
"component_instance_name": "hello-1"
}
],
@ -511,7 +553,6 @@ PUT URL - http://localhost:8088/app/v1/services/hello-world/components/hello
##### PUT Request JSON
```json
{
"name": "hello",
"number_of_containers": 3
}
```
@ -621,4 +662,161 @@ POST URL - http://localhost:8088:/app/v1/services/hbase-app-1
}
```
### Create a service requesting GPUs in addition to CPUs and RAM
POST URL - http://localhost:8088/app/v1/services
##### POST Request JSON
```json
{
"name": "hello-world",
"version": "1.0.0",
"description": "hello world example with GPUs",
"components" :
[
{
"name": "hello",
"number_of_containers": 2,
"artifact": {
"id": "nginx:latest",
"type": "DOCKER"
},
"launch_command": "./start_nginx.sh",
"resource": {
"cpus": 1,
"memory": "256",
"additional" : {
"yarn.io/gpu" : {
"value" : 4,
"unit" : ""
}
}
}
}
]
}
```
### Create a service with a component requesting anti-affinity placement policy
POST URL - http://localhost:8088/app/v1/services
##### POST Request JSON
```json
{
"name": "hello-world",
"version": "1.0.0",
"description": "hello world example with anti-affinity",
"components" :
[
{
"name": "hello",
"number_of_containers": 3,
"artifact": {
"id": "nginx:latest",
"type": "DOCKER"
},
"launch_command": "./start_nginx.sh",
"resource": {
"cpus": 1,
"memory": "256"
},
"placement_policy": {
"constraints": [
{
"type": "ANTI_AFFINITY",
"scope": "NODE",
"target_tags": [
"hello"
]
}
]
}
}
]
}
```
##### GET Response JSON
GET URL - http://localhost:8088/app/v1/services/hello-world
Note, that the 3 containers will come up on 3 different nodes. If there are less
than 3 NMs running in the cluster, then all 3 container requests will not be
fulfilled and the service will be in non-STABLE state.
```json
{
"name": "hello-world",
"version": "1.0.0",
"description": "hello world example with anti-affinity",
"id": "application_1503963985568_0003",
"lifetime": -1,
"state": "STABLE",
"components": [
{
"name": "hello",
"dependencies": [],
"state": "STABLE",
"resource": {
"cpus": 1,
"memory": "256"
},
"placement_policy": {
"constraints": [
{
"type": "ANTI_AFFINITY",
"scope": "NODE",
"node_attributes": {},
"node_partitions": [],
"target_tags": [
"hello"
]
}
]
},
"configuration": {
"properties": {},
"env": {},
"files": []
},
"quicklinks": [],
"containers": [
{
"id": "container_e03_1503963985568_0003_01_000002",
"ip": "10.22.8.143",
"hostname": "ctr-e03-1503963985568-0003-01-000002.example.site",
"state": "READY",
"launch_time": 1504051512412,
"bare_host": "host100.cloud.com",
"component_instance_name": "hello-0"
},
{
"id": "container_e03_1503963985568_0003_01_000003",
"ip": "10.22.8.144",
"hostname": "ctr-e03-1503963985568-0003-01-000003.example.site",
"state": "READY",
"launch_time": 1504051536450,
"bare_host": "host101.cloud.com",
"component_instance_name": "hello-1"
},
{
"id": "container_e03_1503963985568_0003_01_000004",
"ip": "10.22.8.145",
"hostname": "ctr-e03-1503963985568-0003-01-000004.example.site",
"state": "READY",
"launch_time": 1504051536450,
"bare_host": "host102.cloud.com",
"component_instance_name": "hello-2"
}
],
"launch_command": "./start_nginx.sh",
"number_of_containers": 1,
"run_privileged_container": false
}
],
"configuration": {
"properties": {},
"env": {},
"files": []
},
"quicklinks": {}
}
```