YARN-6595. [API] Add Placement Constraints at the application level. (Arun Suresh via kkaranasos)

This commit is contained in:
Konstantinos Karanasos 2017-11-13 15:25:24 -08:00 committed by Arun Suresh
parent b57e8bc300
commit db928556c8
6 changed files with 313 additions and 9 deletions

View File

@ -18,11 +18,16 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.util.Records;
/**
* The request sent by the {@code ApplicationMaster} to {@code ResourceManager}
* on registration.
@ -132,4 +137,39 @@ public static RegisterApplicationMasterRequest newInstance(String host,
@Public
@Stable
public abstract void setTrackingUrl(String trackingUrl);
/**
* Return all Placement Constraints specified at the Application level. The
* mapping is from a set of allocation tags to a
* <code>PlacementConstraint</code> associated with the tags, i.e., each
* {@link org.apache.hadoop.yarn.api.records.SchedulingRequest} that has those
* tags will be placed taking into account the corresponding constraint.
*
* @return A map of Placement Constraints.
*/
@Public
@Unstable
public Map<Set<String>, PlacementConstraint> getPlacementConstraints() {
return new HashMap<>();
}
/**
* Set Placement Constraints applicable to the
* {@link org.apache.hadoop.yarn.api.records.SchedulingRequest}s
* of this application.
* The mapping is from a set of allocation tags to a
* <code>PlacementConstraint</code> associated with the tags.
* For example:
* Map &lt;
* &lt;hb_regionserver&gt; -&gt; node_anti_affinity,
* &lt;hb_regionserver, hb_master&gt; -&gt; rack_affinity,
* ...
* &gt;
* @param placementConstraints Placement Constraint Mapping.
*/
@Public
@Unstable
public void setPlacementConstraints(
Map<Set<String>, PlacementConstraint> placementConstraints) {
}
}

View File

@ -54,6 +54,26 @@ public AbstractConstraint getConstraintExpr() {
return constraintExpr;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof PlacementConstraint)) {
return false;
}
PlacementConstraint that = (PlacementConstraint) o;
return getConstraintExpr() != null ? getConstraintExpr().equals(that
.getConstraintExpr()) : that.getConstraintExpr() == null;
}
@Override
public int hashCode() {
return getConstraintExpr() != null ? getConstraintExpr().hashCode() : 0;
}
/**
* Interface used to enable the elements of the constraint tree to be visited.
*/
@ -173,6 +193,38 @@ public Set<TargetExpression> getTargetExpressions() {
return targetExpressions;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof SingleConstraint)) {
return false;
}
SingleConstraint that = (SingleConstraint) o;
if (getMinCardinality() != that.getMinCardinality()) {
return false;
}
if (getMaxCardinality() != that.getMaxCardinality()) {
return false;
}
if (!getScope().equals(that.getScope())) {
return false;
}
return getTargetExpressions().equals(that.getTargetExpressions());
}
@Override
public int hashCode() {
int result = getScope().hashCode();
result = 31 * result + getMinCardinality();
result = 31 * result + getMaxCardinality();
result = 31 * result + getTargetExpressions().hashCode();
return result;
}
@Override
public <T> T accept(Visitor<T> visitor) {
return visitor.visit(this);
@ -331,6 +383,34 @@ public Set<TargetExpression> getTargetExpressions() {
return targetExpressions;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TargetConstraint)) {
return false;
}
TargetConstraint that = (TargetConstraint) o;
if (getOp() != that.getOp()) {
return false;
}
if (!getScope().equals(that.getScope())) {
return false;
}
return getTargetExpressions().equals(that.getTargetExpressions());
}
@Override
public int hashCode() {
int result = getOp().hashCode();
result = 31 * result + getScope().hashCode();
result = 31 * result + getTargetExpressions().hashCode();
return result;
}
@Override
public <T> T accept(Visitor<T> visitor) {
return visitor.visit(this);
@ -388,6 +468,34 @@ public int getMaxCardinality() {
public <T> T accept(Visitor<T> visitor) {
return visitor.visit(this);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CardinalityConstraint that = (CardinalityConstraint) o;
if (minCardinality != that.minCardinality) {
return false;
}
if (maxCardinality != that.maxCardinality) {
return false;
}
return scope != null ? scope.equals(that.scope) : that.scope == null;
}
@Override
public int hashCode() {
int result = scope != null ? scope.hashCode() : 0;
result = 31 * result + minCardinality;
result = 31 * result + maxCardinality;
return result;
}
}
/**
@ -406,6 +514,25 @@ public abstract static class CompositeConstraint<R extends Visitable>
* @return the children of the composite constraint
*/
public abstract List<R> getChildren();
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return getChildren() != null ? getChildren().equals(
((CompositeConstraint)o).getChildren()) :
((CompositeConstraint)o).getChildren() == null;
}
@Override
public int hashCode() {
return getChildren() != null ? getChildren().hashCode() : 0;
}
}
/**
@ -563,5 +690,34 @@ public DelayUnit getDelayUnit() {
public <T> T accept(Visitor<T> visitor) {
return visitor.visit(this);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TimedPlacementConstraint that = (TimedPlacementConstraint) o;
if (schedulingDelay != that.schedulingDelay) {
return false;
}
if (constraint != null ? !constraint.equals(that.constraint) :
that.constraint != null) {
return false;
}
return delayUnit == that.delayUnit;
}
@Override
public int hashCode() {
int result = constraint != null ? constraint.hashCode() : 0;
result = 31 * result + (int) (schedulingDelay ^ (schedulingDelay >>> 32));
result = 31 * result + (delayUnit != null ? delayUnit.hashCode() : 0);
return result;
}
}
}

View File

@ -649,6 +649,12 @@ message CompositePlacementConstraintProto {
repeated TimedPlacementConstraintProto timedChildConstraints = 3;
}
// This associates a set of allocation tags to a Placement Constraint.
message PlacementConstraintMapEntryProto {
repeated string allocation_tags = 1;
optional PlacementConstraintProto placement_constraint = 2;
}
////////////////////////////////////////////////////////////////////////
////// From reservation_protocol /////////////////////////////////////
////////////////////////////////////////////////////////////////////////

View File

@ -38,6 +38,7 @@ message RegisterApplicationMasterRequestProto {
optional string host = 1;
optional int32 rpc_port = 2;
optional string tracking_url = 3;
repeated PlacementConstraintMapEntryProto placement_constraints = 4;
}
message RegisterApplicationMasterResponseProto {

View File

@ -21,24 +21,41 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.pb.PlacementConstraintFromProtoConverter;
import org.apache.hadoop.yarn.api.pb.PlacementConstraintToProtoConverter;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProtoOrBuilder;
import com.google.protobuf.TextFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Private
@Unstable
public class RegisterApplicationMasterRequestPBImpl extends RegisterApplicationMasterRequest {
RegisterApplicationMasterRequestProto proto = RegisterApplicationMasterRequestProto.getDefaultInstance();
RegisterApplicationMasterRequestProto.Builder builder = null;
public class RegisterApplicationMasterRequestPBImpl
extends RegisterApplicationMasterRequest {
private RegisterApplicationMasterRequestProto proto =
RegisterApplicationMasterRequestProto.getDefaultInstance();
private RegisterApplicationMasterRequestProto.Builder builder = null;
private Map<Set<String>, PlacementConstraint> placementConstraints = null;
boolean viaProto = false;
public RegisterApplicationMasterRequestPBImpl() {
builder = RegisterApplicationMasterRequestProto.newBuilder();
}
public RegisterApplicationMasterRequestPBImpl(RegisterApplicationMasterRequestProto proto) {
public RegisterApplicationMasterRequestPBImpl(
RegisterApplicationMasterRequestProto proto) {
this.proto = proto;
viaProto = true;
}
@ -71,6 +88,30 @@ public String toString() {
}
private void mergeLocalToBuilder() {
if (this.placementConstraints != null) {
addPlacementConstraintMap();
}
}
private void addPlacementConstraintMap() {
maybeInitBuilder();
builder.clearPlacementConstraints();
if (this.placementConstraints == null) {
return;
}
List<YarnProtos.PlacementConstraintMapEntryProto> protoList =
new ArrayList<>();
for (Map.Entry<Set<String>, PlacementConstraint> entry :
this.placementConstraints.entrySet()) {
protoList.add(
YarnProtos.PlacementConstraintMapEntryProto.newBuilder()
.addAllAllocationTags(entry.getKey())
.setPlacementConstraint(
new PlacementConstraintToProtoConverter(
entry.getValue()).convert())
.build());
}
builder.addAllPlacementConstraints(protoList);
}
private void mergeLocalToProto() {
@ -90,7 +131,8 @@ private void maybeInitBuilder() {
@Override
public String getHost() {
RegisterApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder;
RegisterApplicationMasterRequestProtoOrBuilder p =
viaProto ? proto : builder;
return p.getHost();
}
@ -106,7 +148,8 @@ public void setHost(String host) {
@Override
public int getRpcPort() {
RegisterApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder;
RegisterApplicationMasterRequestProtoOrBuilder p =
viaProto ? proto : builder;
return p.getRpcPort();
}
@ -118,7 +161,8 @@ public void setRpcPort(int port) {
@Override
public String getTrackingUrl() {
RegisterApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder;
RegisterApplicationMasterRequestProtoOrBuilder p =
viaProto ? proto : builder;
return p.getTrackingUrl();
}
@ -131,4 +175,50 @@ public void setTrackingUrl(String url) {
}
builder.setTrackingUrl(url);
}
private void initPlacementConstraintMap() {
if (this.placementConstraints != null) {
return;
}
RegisterApplicationMasterRequestProtoOrBuilder p =
viaProto ? proto : builder;
List<YarnProtos.PlacementConstraintMapEntryProto> pcmList =
p.getPlacementConstraintsList();
this.placementConstraints = new HashMap<>();
for (YarnProtos.PlacementConstraintMapEntryProto e : pcmList) {
this.placementConstraints.put(
new HashSet<>(e.getAllocationTagsList()),
new PlacementConstraintFromProtoConverter(
e.getPlacementConstraint()).convert());
}
}
@Override
public Map<Set<String>, PlacementConstraint> getPlacementConstraints() {
initPlacementConstraintMap();
return this.placementConstraints;
}
@Override
public void setPlacementConstraints(
Map<Set<String>, PlacementConstraint> constraints) {
maybeInitBuilder();
if (constraints == null) {
builder.clearPlacementConstraints();
} else {
removeEmptyKeys(constraints);
}
this.placementConstraints = constraints;
}
private void removeEmptyKeys(
Map<Set<String>, PlacementConstraint> constraintMap) {
Iterator<Set<String>> iter = constraintMap.keySet().iterator();
while (iter.hasNext()) {
Set<String> aTags = iter.next();
if (aTags.size() == 0) {
iter.remove();
}
}
}
}

View File

@ -22,12 +22,19 @@
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.junit.Assert;
import java.lang.reflect.*;
import java.nio.ByteBuffer;
import java.util.*;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints
.PlacementTargets.allocationTag;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
/**
* Generic helper class to validate protocol records.
*/
@ -85,6 +92,10 @@ private static Object genTypeValue(Type type) {
ByteBuffer buff = ByteBuffer.allocate(4);
rand.nextBytes(buff.array());
return buff;
} else if (type.equals(PlacementConstraint.class)) {
PlacementConstraint.AbstractConstraint sConstraintExpr =
targetIn(NODE, allocationTag("foo"));
ret = PlacementConstraints.build(sConstraintExpr);
}
} else if (type instanceof ParameterizedType) {
ParameterizedType pt = (ParameterizedType)type;