YARN-3583. Support of NodeLabel object instead of plain String in YarnClient side. (Sunil G via wangda)

This commit is contained in:
Wangda Tan 2015-05-19 16:54:38 -07:00
parent b37da52a1c
commit 563eb1ad2a
15 changed files with 225 additions and 119 deletions

View File

@ -444,19 +444,19 @@ public ReservationDeleteResponse deleteReservation(
} }
@Override @Override
public Map<NodeId, Set<String>> getNodeToLabels() throws YarnException, public Map<NodeId, Set<NodeLabel>> getNodeToLabels() throws YarnException,
IOException { IOException {
return client.getNodeToLabels(); return client.getNodeToLabels();
} }
@Override @Override
public Map<String, Set<NodeId>> getLabelsToNodes() throws YarnException, public Map<NodeLabel, Set<NodeId>> getLabelsToNodes() throws YarnException,
IOException { IOException {
return client.getLabelsToNodes(); return client.getLabelsToNodes();
} }
@Override @Override
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels) public Map<NodeLabel, Set<NodeId>> getLabelsToNodes(Set<String> labels)
throws YarnException, IOException { throws YarnException, IOException {
return client.getLabelsToNodes(labels); return client.getLabelsToNodes(labels);
} }

View File

@ -250,6 +250,9 @@ Release 2.8.0 - UNRELEASED
YARN-3565. NodeHeartbeatRequest/RegisterNodeManagerRequest should use YARN-3565. NodeHeartbeatRequest/RegisterNodeManagerRequest should use
NodeLabel object instead of String. (Naganarasimha G R via wangda) NodeLabel object instead of String. (Naganarasimha G R via wangda)
YARN-3583. Support of NodeLabel object instead of plain String
in YarnClient side. (Sunil G via wangda)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -24,11 +24,12 @@
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
public abstract class GetLabelsToNodesResponse { public abstract class GetLabelsToNodesResponse {
public static GetLabelsToNodesResponse newInstance( public static GetLabelsToNodesResponse newInstance(
Map<String, Set<NodeId>> map) { Map<NodeLabel, Set<NodeId>> map) {
GetLabelsToNodesResponse response = GetLabelsToNodesResponse response =
Records.newRecord(GetLabelsToNodesResponse.class); Records.newRecord(GetLabelsToNodesResponse.class);
response.setLabelsToNodes(map); response.setLabelsToNodes(map);
@ -37,9 +38,9 @@ public static GetLabelsToNodesResponse newInstance(
@Public @Public
@Evolving @Evolving
public abstract void setLabelsToNodes(Map<String, Set<NodeId>> map); public abstract void setLabelsToNodes(Map<NodeLabel, Set<NodeId>> map);
@Public @Public
@Evolving @Evolving
public abstract Map<String, Set<NodeId>> getLabelsToNodes(); public abstract Map<NodeLabel, Set<NodeId>> getLabelsToNodes();
} }

View File

@ -24,11 +24,12 @@
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
public abstract class GetNodesToLabelsResponse { public abstract class GetNodesToLabelsResponse {
public static GetNodesToLabelsResponse newInstance( public static GetNodesToLabelsResponse newInstance(
Map<NodeId, Set<String>> map) { Map<NodeId, Set<NodeLabel>> map) {
GetNodesToLabelsResponse response = GetNodesToLabelsResponse response =
Records.newRecord(GetNodesToLabelsResponse.class); Records.newRecord(GetNodesToLabelsResponse.class);
response.setNodeToLabels(map); response.setNodeToLabels(map);
@ -37,9 +38,9 @@ public static GetNodesToLabelsResponse newInstance(
@Public @Public
@Evolving @Evolving
public abstract void setNodeToLabels(Map<NodeId, Set<String>> map); public abstract void setNodeToLabels(Map<NodeId, Set<NodeLabel>> map);
@Public @Public
@Evolving @Evolving
public abstract Map<NodeId, Set<String>> getNodeToLabels(); public abstract Map<NodeId, Set<NodeLabel>> getNodeToLabels();
} }

View File

@ -91,7 +91,7 @@ message RemoveFromClusterNodeLabelsResponseProto {
} }
message ReplaceLabelsOnNodeRequestProto { message ReplaceLabelsOnNodeRequestProto {
repeated NodeIdToLabelsProto nodeToLabels = 1; repeated NodeIdToLabelsNameProto nodeToLabels = 1;
} }
message ReplaceLabelsOnNodeResponseProto { message ReplaceLabelsOnNodeResponseProto {
@ -107,6 +107,11 @@ message CheckForDecommissioningNodesResponseProto {
repeated NodeIdProto decommissioningNodes = 1; repeated NodeIdProto decommissioningNodes = 1;
} }
message NodeIdToLabelsNameProto {
optional NodeIdProto nodeId = 1;
repeated string nodeLabels = 2;
}
enum DecommissionTypeProto { enum DecommissionTypeProto {
NORMAL = 1; NORMAL = 1;
GRACEFUL = 2; GRACEFUL = 2;

View File

@ -248,13 +248,13 @@ message NodeReportProto {
repeated string node_labels = 10; repeated string node_labels = 10;
} }
message NodeIdToLabelsProto { message NodeIdToLabelsInfoProto {
optional NodeIdProto nodeId = 1; optional NodeIdProto nodeId = 1;
repeated string nodeLabels = 2; repeated NodeLabelProto nodeLabels = 2;
} }
message LabelsToNodeIdsProto { message LabelsToNodeIdsProto {
optional string nodeLabels = 1; optional NodeLabelProto nodeLabels = 1;
repeated NodeIdProto nodeId = 2; repeated NodeIdProto nodeId = 2;
} }

View File

@ -198,7 +198,7 @@ message GetNodesToLabelsRequestProto {
} }
message GetNodesToLabelsResponseProto { message GetNodesToLabelsResponseProto {
repeated NodeIdToLabelsProto nodeToLabels = 1; repeated NodeIdToLabelsInfoProto nodeToLabels = 1;
} }
message GetLabelsToNodesRequestProto { message GetLabelsToNodesRequestProto {

View File

@ -619,7 +619,7 @@ public abstract ReservationDeleteResponse deleteReservation(
*/ */
@Public @Public
@Unstable @Unstable
public abstract Map<NodeId, Set<String>> getNodeToLabels() public abstract Map<NodeId, Set<NodeLabel>> getNodeToLabels()
throws YarnException, IOException; throws YarnException, IOException;
/** /**
@ -634,7 +634,7 @@ public abstract Map<NodeId, Set<String>> getNodeToLabels()
*/ */
@Public @Public
@Unstable @Unstable
public abstract Map<String, Set<NodeId>> getLabelsToNodes() public abstract Map<NodeLabel, Set<NodeId>> getLabelsToNodes()
throws YarnException, IOException; throws YarnException, IOException;
/** /**
@ -650,8 +650,8 @@ public abstract Map<String, Set<NodeId>> getLabelsToNodes()
*/ */
@Public @Public
@Unstable @Unstable
public abstract Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels) public abstract Map<NodeLabel, Set<NodeId>> getLabelsToNodes(
throws YarnException, IOException; Set<String> labels) throws YarnException, IOException;
/** /**
* <p> * <p>

View File

@ -795,21 +795,21 @@ public ReservationDeleteResponse deleteReservation(
} }
@Override @Override
public Map<NodeId, Set<String>> getNodeToLabels() throws YarnException, public Map<NodeId, Set<NodeLabel>> getNodeToLabels() throws YarnException,
IOException { IOException {
return rmClient.getNodeToLabels(GetNodesToLabelsRequest.newInstance()) return rmClient.getNodeToLabels(GetNodesToLabelsRequest.newInstance())
.getNodeToLabels(); .getNodeToLabels();
} }
@Override @Override
public Map<String, Set<NodeId>> getLabelsToNodes() throws YarnException, public Map<NodeLabel, Set<NodeId>> getLabelsToNodes() throws YarnException,
IOException { IOException {
return rmClient.getLabelsToNodes(GetLabelsToNodesRequest.newInstance()) return rmClient.getLabelsToNodes(GetLabelsToNodesRequest.newInstance())
.getLabelsToNodes(); .getLabelsToNodes();
} }
@Override @Override
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels) public Map<NodeLabel, Set<NodeId>> getLabelsToNodes(Set<String> labels)
throws YarnException, IOException { throws YarnException, IOException {
return rmClient.getLabelsToNodes( return rmClient.getLabelsToNodes(
GetLabelsToNodesRequest.newInstance(labels)).getLabelsToNodes(); GetLabelsToNodesRequest.newInstance(labels)).getLabelsToNodes();

View File

@ -67,6 +67,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
@ -87,6 +89,7 @@
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
@ -458,9 +461,9 @@ public void testGetLabelsToNodes() throws YarnException, IOException {
client.start(); client.start();
// Get labels to nodes mapping // Get labels to nodes mapping
Map<String, Set<NodeId>> expectedLabelsToNodes = Map<NodeLabel, Set<NodeId>> expectedLabelsToNodes =
((MockYarnClient)client).getLabelsToNodesMap(); ((MockYarnClient)client).getLabelsToNodesMap();
Map<String, Set<NodeId>> labelsToNodes = client.getLabelsToNodes(); Map<NodeLabel, Set<NodeId>> labelsToNodes = client.getLabelsToNodes();
Assert.assertEquals(labelsToNodes, expectedLabelsToNodes); Assert.assertEquals(labelsToNodes, expectedLabelsToNodes);
Assert.assertEquals(labelsToNodes.size(), 3); Assert.assertEquals(labelsToNodes.size(), 3);
@ -476,7 +479,32 @@ public void testGetLabelsToNodes() throws YarnException, IOException {
client.close(); client.close();
} }
@Test (timeout = 10000)
public void testGetNodesToLabels() throws YarnException, IOException {
Configuration conf = new Configuration();
final YarnClient client = new MockYarnClient();
client.init(conf);
client.start();
// Get labels to nodes mapping
Map<NodeId, Set<NodeLabel>> expectedNodesToLabels = ((MockYarnClient) client)
.getNodeToLabelsMap();
Map<NodeId, Set<NodeLabel>> nodesToLabels = client.getNodeToLabels();
Assert.assertEquals(nodesToLabels, expectedNodesToLabels);
Assert.assertEquals(nodesToLabels.size(), 1);
// Verify exclusivity
Set<NodeLabel> labels = nodesToLabels.get(NodeId.newInstance("host", 0));
for (NodeLabel label : labels) {
Assert.assertFalse(label.isExclusive());
}
client.stop();
client.close();
}
private static class MockYarnClient extends YarnClientImpl { private static class MockYarnClient extends YarnClientImpl {
private ApplicationReport mockReport; private ApplicationReport mockReport;
private List<ApplicationReport> reports; private List<ApplicationReport> reports;
private HashMap<ApplicationId, List<ApplicationAttemptReport>> attempts = private HashMap<ApplicationId, List<ApplicationAttemptReport>> attempts =
@ -498,6 +526,8 @@ private static class MockYarnClient extends YarnClientImpl {
mock(GetContainerReportResponse.class); mock(GetContainerReportResponse.class);
GetLabelsToNodesResponse mockLabelsToNodesResponse = GetLabelsToNodesResponse mockLabelsToNodesResponse =
mock(GetLabelsToNodesResponse.class); mock(GetLabelsToNodesResponse.class);
GetNodesToLabelsResponse mockNodeToLabelsResponse =
mock(GetNodesToLabelsResponse.class);
public MockYarnClient() { public MockYarnClient() {
super(); super();
@ -537,6 +567,9 @@ public void start() {
when(rmClient.getLabelsToNodes(any(GetLabelsToNodesRequest.class))) when(rmClient.getLabelsToNodes(any(GetLabelsToNodesRequest.class)))
.thenReturn(mockLabelsToNodesResponse); .thenReturn(mockLabelsToNodesResponse);
when(rmClient.getNodeToLabels(any(GetNodesToLabelsRequest.class)))
.thenReturn(mockNodeToLabelsResponse);
historyClient = mock(AHSClient.class); historyClient = mock(AHSClient.class);
} catch (YarnException e) { } catch (YarnException e) {
@ -704,7 +737,7 @@ private List<ApplicationReport> getApplicationReports(
} }
@Override @Override
public Map<String, Set<NodeId>> getLabelsToNodes() public Map<NodeLabel, Set<NodeId>> getLabelsToNodes()
throws YarnException, IOException { throws YarnException, IOException {
when(mockLabelsToNodesResponse.getLabelsToNodes()).thenReturn( when(mockLabelsToNodesResponse.getLabelsToNodes()).thenReturn(
getLabelsToNodesMap()); getLabelsToNodesMap());
@ -712,35 +745,52 @@ public Map<String, Set<NodeId>> getLabelsToNodes()
} }
@Override @Override
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels) public Map<NodeLabel, Set<NodeId>> getLabelsToNodes(Set<String> labels)
throws YarnException, IOException { throws YarnException, IOException {
when(mockLabelsToNodesResponse.getLabelsToNodes()).thenReturn( when(mockLabelsToNodesResponse.getLabelsToNodes()).thenReturn(
getLabelsToNodesMap(labels)); getLabelsToNodesMap(labels));
return super.getLabelsToNodes(labels); return super.getLabelsToNodes(labels);
} }
public Map<String, Set<NodeId>> getLabelsToNodesMap() { public Map<NodeLabel, Set<NodeId>> getLabelsToNodesMap() {
Map<String, Set<NodeId>> map = new HashMap<String, Set<NodeId>>(); Map<NodeLabel, Set<NodeId>> map = new HashMap<NodeLabel, Set<NodeId>>();
Set<NodeId> setNodeIds = Set<NodeId> setNodeIds =
new HashSet<NodeId>(Arrays.asList( new HashSet<NodeId>(Arrays.asList(
NodeId.newInstance("host1", 0), NodeId.newInstance("host2", 0))); NodeId.newInstance("host1", 0), NodeId.newInstance("host2", 0)));
map.put("x", setNodeIds); map.put(NodeLabel.newInstance("x"), setNodeIds);
map.put("y", setNodeIds); map.put(NodeLabel.newInstance("y"), setNodeIds);
map.put("z", setNodeIds); map.put(NodeLabel.newInstance("z"), setNodeIds);
return map; return map;
} }
public Map<String, Set<NodeId>> getLabelsToNodesMap(Set<String> labels) { public Map<NodeLabel, Set<NodeId>> getLabelsToNodesMap(Set<String> labels) {
Map<String, Set<NodeId>> map = new HashMap<String, Set<NodeId>>(); Map<NodeLabel, Set<NodeId>> map = new HashMap<NodeLabel, Set<NodeId>>();
Set<NodeId> setNodeIds = Set<NodeId> setNodeIds =
new HashSet<NodeId>(Arrays.asList( new HashSet<NodeId>(Arrays.asList(
NodeId.newInstance("host1", 0), NodeId.newInstance("host2", 0))); NodeId.newInstance("host1", 0), NodeId.newInstance("host2", 0)));
for(String label : labels) { for(String label : labels) {
map.put(label, setNodeIds); map.put(NodeLabel.newInstance(label), setNodeIds);
} }
return map; return map;
} }
@Override
public Map<NodeId, Set<NodeLabel>> getNodeToLabels() throws YarnException,
IOException {
when(mockNodeToLabelsResponse.getNodeToLabels()).thenReturn(
getNodeToLabelsMap());
return super.getNodeToLabels();
}
public Map<NodeId, Set<NodeLabel>> getNodeToLabelsMap() {
Map<NodeId, Set<NodeLabel>> map = new HashMap<NodeId, Set<NodeLabel>>();
Set<NodeLabel> setNodeLabels = new HashSet<NodeLabel>(Arrays.asList(
NodeLabel.newInstance("x", false),
NodeLabel.newInstance("y", false)));
map.put(NodeId.newInstance("host", 0), setNodeLabels);
return map;
}
@Override @Override
public List<ApplicationAttemptReport> getApplicationAttempts( public List<ApplicationAttemptReport> getApplicationAttempts(
ApplicationId appId) throws YarnException, IOException { ApplicationId appId) throws YarnException, IOException {

View File

@ -29,11 +29,13 @@
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.proto.YarnProtos.LabelsToNodeIdsProto; import org.apache.hadoop.yarn.proto.YarnProtos.LabelsToNodeIdsProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesResponseProtoOrBuilder;
@ -44,7 +46,7 @@ public class GetLabelsToNodesResponsePBImpl extends
GetLabelsToNodesResponseProto.Builder builder = null; GetLabelsToNodesResponseProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private Map<String, Set<NodeId>> labelsToNodes; private Map<NodeLabel, Set<NodeId>> labelsToNodes;
public GetLabelsToNodesResponsePBImpl() { public GetLabelsToNodesResponsePBImpl() {
this.builder = GetLabelsToNodesResponseProto.newBuilder(); this.builder = GetLabelsToNodesResponseProto.newBuilder();
@ -61,7 +63,7 @@ private void initLabelsToNodes() {
} }
GetLabelsToNodesResponseProtoOrBuilder p = viaProto ? proto : builder; GetLabelsToNodesResponseProtoOrBuilder p = viaProto ? proto : builder;
List<LabelsToNodeIdsProto> list = p.getLabelsToNodesList(); List<LabelsToNodeIdsProto> list = p.getLabelsToNodesList();
this.labelsToNodes = new HashMap<String, Set<NodeId>>(); this.labelsToNodes = new HashMap<NodeLabel, Set<NodeId>>();
for (LabelsToNodeIdsProto c : list) { for (LabelsToNodeIdsProto c : list) {
Set<NodeId> setNodes = new HashSet<NodeId>(); Set<NodeId> setNodes = new HashSet<NodeId>();
@ -69,8 +71,9 @@ private void initLabelsToNodes() {
NodeId node = new NodeIdPBImpl(n); NodeId node = new NodeIdPBImpl(n);
setNodes.add(node); setNodes.add(node);
} }
if(!setNodes.isEmpty()) { if (!setNodes.isEmpty()) {
this.labelsToNodes.put(c.getNodeLabels(), setNodes); this.labelsToNodes
.put(new NodeLabelPBImpl(c.getNodeLabels()), setNodes);
} }
} }
} }
@ -94,7 +97,7 @@ private void addLabelsToNodesToProto() {
public Iterator<LabelsToNodeIdsProto> iterator() { public Iterator<LabelsToNodeIdsProto> iterator() {
return new Iterator<LabelsToNodeIdsProto>() { return new Iterator<LabelsToNodeIdsProto>() {
Iterator<Entry<String, Set<NodeId>>> iter = Iterator<Entry<NodeLabel, Set<NodeId>>> iter =
labelsToNodes.entrySet().iterator(); labelsToNodes.entrySet().iterator();
@Override @Override
@ -104,13 +107,14 @@ public void remove() {
@Override @Override
public LabelsToNodeIdsProto next() { public LabelsToNodeIdsProto next() {
Entry<String, Set<NodeId>> now = iter.next(); Entry<NodeLabel, Set<NodeId>> now = iter.next();
Set<NodeIdProto> nodeProtoSet = new HashSet<NodeIdProto>(); Set<NodeIdProto> nodeProtoSet = new HashSet<NodeIdProto>();
for(NodeId n : now.getValue()) { for(NodeId n : now.getValue()) {
nodeProtoSet.add(convertToProtoFormat(n)); nodeProtoSet.add(convertToProtoFormat(n));
} }
return LabelsToNodeIdsProto.newBuilder() return LabelsToNodeIdsProto.newBuilder()
.setNodeLabels(now.getKey()).addAllNodeId(nodeProtoSet) .setNodeLabels(convertToProtoFormat(now.getKey()))
.addAllNodeId(nodeProtoSet)
.build(); .build();
} }
@ -149,6 +153,10 @@ private NodeIdProto convertToProtoFormat(NodeId t) {
return ((NodeIdPBImpl)t).getProto(); return ((NodeIdPBImpl)t).getProto();
} }
private NodeLabelProto convertToProtoFormat(NodeLabel l) {
return ((NodeLabelPBImpl)l).getProto();
}
@Override @Override
public int hashCode() { public int hashCode() {
assert false : "hashCode not designed"; assert false : "hashCode not designed";
@ -168,7 +176,7 @@ public boolean equals(Object other) {
@Override @Override
@Public @Public
@Evolving @Evolving
public void setLabelsToNodes(Map<String, Set<NodeId>> map) { public void setLabelsToNodes(Map<NodeLabel, Set<NodeId>> map) {
initLabelsToNodes(); initLabelsToNodes();
labelsToNodes.clear(); labelsToNodes.clear();
labelsToNodes.putAll(map); labelsToNodes.putAll(map);
@ -177,7 +185,7 @@ public void setLabelsToNodes(Map<String, Set<NodeId>> map) {
@Override @Override
@Public @Public
@Evolving @Evolving
public Map<String, Set<NodeId>> getLabelsToNodes() { public Map<NodeLabel, Set<NodeId>> getLabelsToNodes() {
initLabelsToNodes(); initLabelsToNodes();
return this.labelsToNodes; return this.labelsToNodes;
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -26,12 +27,13 @@
import java.util.Set; import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsInfoProto;
import com.google.common.collect.Sets; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNodesToLabelsResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNodesToLabelsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNodesToLabelsResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNodesToLabelsResponseProtoOrBuilder;
@ -42,7 +44,7 @@ public class GetNodesToLabelsResponsePBImpl extends
GetNodesToLabelsResponseProto.Builder builder = null; GetNodesToLabelsResponseProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private Map<NodeId, Set<String>> nodeToLabels; private Map<NodeId, Set<NodeLabel>> nodeToLabels;
public GetNodesToLabelsResponsePBImpl() { public GetNodesToLabelsResponsePBImpl() {
this.builder = GetNodesToLabelsResponseProto.newBuilder(); this.builder = GetNodesToLabelsResponseProto.newBuilder();
@ -58,12 +60,15 @@ private void initNodeToLabels() {
return; return;
} }
GetNodesToLabelsResponseProtoOrBuilder p = viaProto ? proto : builder; GetNodesToLabelsResponseProtoOrBuilder p = viaProto ? proto : builder;
List<NodeIdToLabelsProto> list = p.getNodeToLabelsList(); List<NodeIdToLabelsInfoProto> list = p.getNodeToLabelsList();
this.nodeToLabels = new HashMap<NodeId, Set<String>>(); this.nodeToLabels = new HashMap<NodeId, Set<NodeLabel>>();
for (NodeIdToLabelsProto c : list) { for (NodeIdToLabelsInfoProto c : list) {
this.nodeToLabels.put(new NodeIdPBImpl(c.getNodeId()), Set<NodeLabel> labels = new HashSet<NodeLabel>();
Sets.newHashSet(c.getNodeLabelsList())); for (NodeLabelProto l : c.getNodeLabelsList()) {
labels.add(new NodeLabelPBImpl(l));
}
this.nodeToLabels.put(new NodeIdPBImpl(c.getNodeId()), labels);
} }
} }
@ -80,13 +85,13 @@ private void addNodeToLabelsToProto() {
if (nodeToLabels == null) { if (nodeToLabels == null) {
return; return;
} }
Iterable<NodeIdToLabelsProto> iterable = Iterable<NodeIdToLabelsInfoProto> iterable =
new Iterable<NodeIdToLabelsProto>() { new Iterable<NodeIdToLabelsInfoProto>() {
@Override @Override
public Iterator<NodeIdToLabelsProto> iterator() { public Iterator<NodeIdToLabelsInfoProto> iterator() {
return new Iterator<NodeIdToLabelsProto>() { return new Iterator<NodeIdToLabelsInfoProto>() {
Iterator<Entry<NodeId, Set<String>>> iter = nodeToLabels Iterator<Entry<NodeId, Set<NodeLabel>>> iter = nodeToLabels
.entrySet().iterator(); .entrySet().iterator();
@Override @Override
@ -95,11 +100,16 @@ public void remove() {
} }
@Override @Override
public NodeIdToLabelsProto next() { public NodeIdToLabelsInfoProto next() {
Entry<NodeId, Set<String>> now = iter.next(); Entry<NodeId, Set<NodeLabel>> now = iter.next();
return NodeIdToLabelsProto.newBuilder() Set<NodeLabelProto> labelProtoList =
new HashSet<NodeLabelProto>();
for (NodeLabel l : now.getValue()) {
labelProtoList.add(convertToProtoFormat(l));
}
return NodeIdToLabelsInfoProto.newBuilder()
.setNodeId(convertToProtoFormat(now.getKey())) .setNodeId(convertToProtoFormat(now.getKey()))
.addAllNodeLabels(now.getValue()).build(); .addAllNodeLabels(labelProtoList).build();
} }
@Override @Override
@ -134,13 +144,13 @@ public GetNodesToLabelsResponseProto getProto() {
} }
@Override @Override
public Map<NodeId, Set<String>> getNodeToLabels() { public Map<NodeId, Set<NodeLabel>> getNodeToLabels() {
initNodeToLabels(); initNodeToLabels();
return this.nodeToLabels; return this.nodeToLabels;
} }
@Override @Override
public void setNodeToLabels(Map<NodeId, Set<String>> map) { public void setNodeToLabels(Map<NodeId, Set<NodeLabel>> map) {
initNodeToLabels(); initNodeToLabels();
nodeToLabels.clear(); nodeToLabels.clear();
nodeToLabels.putAll(map); nodeToLabels.putAll(map);
@ -150,6 +160,10 @@ private NodeIdProto convertToProtoFormat(NodeId t) {
return ((NodeIdPBImpl)t).getProto(); return ((NodeIdPBImpl)t).getProto();
} }
private NodeLabelProto convertToProtoFormat(NodeLabel t) {
return ((NodeLabelPBImpl)t).getProto();
}
@Override @Override
public int hashCode() { public int hashCode() {
assert false : "hashCode not designed"; assert false : "hashCode not designed";

View File

@ -28,7 +28,7 @@
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.NodeIdToLabelsNameProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
@ -58,10 +58,10 @@ private void initNodeToLabels() {
return; return;
} }
ReplaceLabelsOnNodeRequestProtoOrBuilder p = viaProto ? proto : builder; ReplaceLabelsOnNodeRequestProtoOrBuilder p = viaProto ? proto : builder;
List<NodeIdToLabelsProto> list = p.getNodeToLabelsList(); List<NodeIdToLabelsNameProto> list = p.getNodeToLabelsList();
this.nodeIdToLabels = new HashMap<NodeId, Set<String>>(); this.nodeIdToLabels = new HashMap<NodeId, Set<String>>();
for (NodeIdToLabelsProto c : list) { for (NodeIdToLabelsNameProto c : list) {
this.nodeIdToLabels.put(new NodeIdPBImpl(c.getNodeId()), this.nodeIdToLabels.put(new NodeIdPBImpl(c.getNodeId()),
Sets.newHashSet(c.getNodeLabelsList())); Sets.newHashSet(c.getNodeLabelsList()));
} }
@ -80,11 +80,11 @@ private void addNodeToLabelsToProto() {
if (nodeIdToLabels == null) { if (nodeIdToLabels == null) {
return; return;
} }
Iterable<NodeIdToLabelsProto> iterable = Iterable<NodeIdToLabelsNameProto> iterable =
new Iterable<NodeIdToLabelsProto>() { new Iterable<NodeIdToLabelsNameProto>() {
@Override @Override
public Iterator<NodeIdToLabelsProto> iterator() { public Iterator<NodeIdToLabelsNameProto> iterator() {
return new Iterator<NodeIdToLabelsProto>() { return new Iterator<NodeIdToLabelsNameProto>() {
Iterator<Entry<NodeId, Set<String>>> iter = nodeIdToLabels Iterator<Entry<NodeId, Set<String>>> iter = nodeIdToLabels
.entrySet().iterator(); .entrySet().iterator();
@ -95,9 +95,9 @@ public void remove() {
} }
@Override @Override
public NodeIdToLabelsProto next() { public NodeIdToLabelsNameProto next() {
Entry<NodeId, Set<String>> now = iter.next(); Entry<NodeId, Set<String>> now = iter.next();
return NodeIdToLabelsProto.newBuilder() return NodeIdToLabelsNameProto.newBuilder()
.setNodeId(convertToProtoFormat(now.getKey())).clearNodeLabels() .setNodeId(convertToProtoFormat(now.getKey())).clearNodeLabels()
.addAllNodeLabels(now.getValue()).build(); .addAllNodeLabels(now.getValue()).build();
} }

View File

@ -1227,7 +1227,7 @@ public GetNodesToLabelsResponse getNodeToLabels(
GetNodesToLabelsRequest request) throws YarnException, IOException { GetNodesToLabelsRequest request) throws YarnException, IOException {
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
GetNodesToLabelsResponse response = GetNodesToLabelsResponse response =
GetNodesToLabelsResponse.newInstance(labelsMgr.getNodeLabels()); GetNodesToLabelsResponse.newInstance(labelsMgr.getNodeLabelsInfo());
return response; return response;
} }
@ -1237,10 +1237,10 @@ public GetLabelsToNodesResponse getLabelsToNodes(
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
if (request.getNodeLabels() == null || request.getNodeLabels().isEmpty()) { if (request.getNodeLabels() == null || request.getNodeLabels().isEmpty()) {
return GetLabelsToNodesResponse.newInstance( return GetLabelsToNodesResponse.newInstance(
labelsMgr.getLabelsToNodes()); labelsMgr.getLabelsInfoToNodes());
} else { } else {
return GetLabelsToNodesResponse.newInstance( return GetLabelsToNodesResponse.newInstance(
labelsMgr.getLabelsToNodes(request.getNodeLabels())); labelsMgr.getLabelsInfoToNodes(request.getNodeLabels()));
} }
} }

View File

@ -1407,8 +1407,10 @@ protected ClientRMService createClientRMService() {
}; };
}; };
rm.start(); rm.start();
NodeLabel labelX = NodeLabel.newInstance("x", false);
NodeLabel labelY = NodeLabel.newInstance("y");
RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager(); RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager();
labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); labelsMgr.addToCluserNodeLabels(ImmutableSet.of(labelX, labelY));
NodeId node1 = NodeId.newInstance("host1", 1234); NodeId node1 = NodeId.newInstance("host1", 1234);
NodeId node2 = NodeId.newInstance("host2", 1234); NodeId node2 = NodeId.newInstance("host2", 1234);
@ -1422,24 +1424,36 @@ protected ClientRMService createClientRMService() {
YarnRPC rpc = YarnRPC.create(conf); YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
(ApplicationClientProtocol) rpc.getProxy( .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
ApplicationClientProtocol.class, rmAddress, conf);
// Get node labels collection // Get node labels collection
GetClusterNodeLabelsResponse response = GetClusterNodeLabelsResponse response = client
client.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance()); .getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance());
Assert.assertTrue(response.getNodeLabels().containsAll( Assert.assertTrue(response.getNodeLabels().containsAll(
Arrays.asList(NodeLabel.newInstance("x"), NodeLabel.newInstance("y")))); Arrays.asList(labelX, labelY)));
// Get node labels mapping // Get node labels mapping
GetNodesToLabelsResponse response1 = GetNodesToLabelsResponse response1 = client
client.getNodeToLabels(GetNodesToLabelsRequest.newInstance()); .getNodeToLabels(GetNodesToLabelsRequest.newInstance());
Map<NodeId, Set<String>> nodeToLabels = response1.getNodeToLabels(); Map<NodeId, Set<NodeLabel>> nodeToLabels = response1.getNodeToLabels();
Assert.assertTrue(nodeToLabels.keySet().containsAll( Assert.assertTrue(nodeToLabels.keySet().containsAll(
Arrays.asList(node1, node2))); Arrays.asList(node1, node2)));
Assert.assertTrue(nodeToLabels.get(node1).containsAll(Arrays.asList("x"))); Assert.assertTrue(nodeToLabels.get(node1)
Assert.assertTrue(nodeToLabels.get(node2).containsAll(Arrays.asList("y"))); .containsAll(Arrays.asList(labelX)));
Assert.assertTrue(nodeToLabels.get(node2)
.containsAll(Arrays.asList(labelY)));
// Verify whether labelX's exclusivity is false
for (NodeLabel x : nodeToLabels.get(node1)) {
Assert.assertFalse(x.isExclusive());
}
// Verify whether labelY's exclusivity is true
for (NodeLabel y : nodeToLabels.get(node2)) {
Assert.assertTrue(y.isExclusive());
}
// Below label "x" is not present in the response as exclusivity is true
Assert.assertFalse(nodeToLabels.get(node1).containsAll(
Arrays.asList(NodeLabel.newInstance("x"))));
rpc.stopProxy(client, conf); rpc.stopProxy(client, conf);
rm.close(); rm.close();
@ -1456,8 +1470,12 @@ protected ClientRMService createClientRMService() {
}; };
}; };
rm.start(); rm.start();
NodeLabel labelX = NodeLabel.newInstance("x", false);
NodeLabel labelY = NodeLabel.newInstance("y", false);
NodeLabel labelZ = NodeLabel.newInstance("z", false);
RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager(); RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager();
labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z")); labelsMgr.addToCluserNodeLabels(ImmutableSet.of(labelX, labelY, labelZ));
NodeId node1A = NodeId.newInstance("host1", 1234); NodeId node1A = NodeId.newInstance("host1", 1234);
NodeId node1B = NodeId.newInstance("host1", 5678); NodeId node1B = NodeId.newInstance("host1", 5678);
@ -1477,43 +1495,49 @@ protected ClientRMService createClientRMService() {
YarnRPC rpc = YarnRPC.create(conf); YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
(ApplicationClientProtocol) rpc.getProxy( .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
ApplicationClientProtocol.class, rmAddress, conf);
// Get node labels collection // Get node labels collection
GetClusterNodeLabelsResponse response = GetClusterNodeLabelsResponse response = client
client.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance()); .getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance());
Assert.assertTrue(response.getNodeLabels().containsAll( Assert.assertTrue(response.getNodeLabels().containsAll(
Arrays.asList(NodeLabel.newInstance("x"), NodeLabel.newInstance("y"), Arrays.asList(labelX, labelY, labelZ)));
NodeLabel.newInstance("z"))));
// Get labels to nodes mapping // Get labels to nodes mapping
GetLabelsToNodesResponse response1 = GetLabelsToNodesResponse response1 = client
client.getLabelsToNodes(GetLabelsToNodesRequest.newInstance()); .getLabelsToNodes(GetLabelsToNodesRequest.newInstance());
Map<String, Set<NodeId>> labelsToNodes = response1.getLabelsToNodes(); Map<NodeLabel, Set<NodeId>> labelsToNodes = response1.getLabelsToNodes();
Assert.assertTrue( // Verify whether all NodeLabel's exclusivity are false
labelsToNodes.keySet().containsAll(Arrays.asList("x", "y", "z"))); for (Map.Entry<NodeLabel, Set<NodeId>> nltn : labelsToNodes.entrySet()) {
Assert.assertTrue( Assert.assertFalse(nltn.getKey().isExclusive());
labelsToNodes.get("x").containsAll(Arrays.asList(node1A))); }
Assert.assertTrue( Assert.assertTrue(labelsToNodes.keySet().containsAll(
labelsToNodes.get("y").containsAll(Arrays.asList(node2A, node3A))); Arrays.asList(labelX, labelY, labelZ)));
Assert.assertTrue( Assert.assertTrue(labelsToNodes.get(labelX).containsAll(
labelsToNodes.get("z").containsAll(Arrays.asList(node1B, node3B))); Arrays.asList(node1A)));
Assert.assertTrue(labelsToNodes.get(labelY).containsAll(
Arrays.asList(node2A, node3A)));
Assert.assertTrue(labelsToNodes.get(labelZ).containsAll(
Arrays.asList(node1B, node3B)));
// Get labels to nodes mapping for specific labels // Get labels to nodes mapping for specific labels
Set<String> setlabels = Set<String> setlabels = new HashSet<String>(Arrays.asList(new String[]{"x",
new HashSet<String>(Arrays.asList(new String[]{"x", "z"})); "z"}));
GetLabelsToNodesResponse response2 = GetLabelsToNodesResponse response2 = client
client.getLabelsToNodes(GetLabelsToNodesRequest.newInstance(setlabels)); .getLabelsToNodes(GetLabelsToNodesRequest.newInstance(setlabels));
labelsToNodes = response2.getLabelsToNodes(); labelsToNodes = response2.getLabelsToNodes();
Assert.assertTrue( // Verify whether all NodeLabel's exclusivity are false
labelsToNodes.keySet().containsAll(Arrays.asList("x", "z"))); for (Map.Entry<NodeLabel, Set<NodeId>> nltn : labelsToNodes.entrySet()) {
Assert.assertTrue( Assert.assertFalse(nltn.getKey().isExclusive());
labelsToNodes.get("x").containsAll(Arrays.asList(node1A))); }
Assert.assertTrue( Assert.assertTrue(labelsToNodes.keySet().containsAll(
labelsToNodes.get("z").containsAll(Arrays.asList(node1B, node3B))); Arrays.asList(labelX, labelZ)));
Assert.assertEquals(labelsToNodes.get("y"), null); Assert.assertTrue(labelsToNodes.get(labelX).containsAll(
Arrays.asList(node1A)));
Assert.assertTrue(labelsToNodes.get(labelZ).containsAll(
Arrays.asList(node1B, node3B)));
Assert.assertEquals(labelsToNodes.get(labelY), null);
rpc.stopProxy(client, conf); rpc.stopProxy(client, conf);
rm.close(); rm.close();