YARN-3076. Add API/Implementation to YarnClient to retrieve label-to-node mapping (Varun Saxena via wangda)

This commit is contained in:
Wangda Tan 2015-02-19 11:00:57 -08:00
parent f0f2992686
commit d49ae725d5
19 changed files with 708 additions and 1 deletions

View File

@ -439,6 +439,18 @@ public Map<NodeId, Set<String>> getNodeToLabels() throws YarnException,
return client.getNodeToLabels();
}
@Override
public Map<String, Set<NodeId>> getLabelsToNodes() throws YarnException,
IOException {
return client.getLabelsToNodes();
}
@Override
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels)
throws YarnException, IOException {
return client.getLabelsToNodes(labels);
}
@Override
public Set<String> getClusterNodeLabels()
throws YarnException, IOException {

View File

@ -90,6 +90,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
@ -436,6 +438,12 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels(
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
return null;
}
@Override
public GetLabelsToNodesResponse getLabelsToNodes(
GetLabelsToNodesRequest request) throws YarnException, IOException {
return null;
}
}
class HistoryService extends AMService implements HSClientProtocol {

View File

@ -304,6 +304,9 @@ Release 2.7.0 - UNRELEASED
YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA.
(Tsuyoshi OZAWA via jianhe)
YARN-3076. Add API/Implementation to YarnClient to retrieve label-to-node
mapping. (Varun Saxena via wangda)
OPTIMIZATIONS
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and

View File

@ -47,6 +47,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
@ -676,6 +678,22 @@ public ReservationDeleteResponse deleteReservation(
public GetNodesToLabelsResponse getNodeToLabels(
GetNodesToLabelsRequest request) throws YarnException, IOException;
/**
* <p>
* The interface used by client to get labels to nodes mappings
* in existing cluster
* </p>
*
* @param request
* @return labels to nodes mappings
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
public GetLabelsToNodesResponse getLabelsToNodes(
GetLabelsToNodesRequest request) throws YarnException, IOException;
/**
* <p>
* The interface used by client to get node labels in the cluster

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.Set;
import org.apache.hadoop.yarn.util.Records;
public abstract class GetLabelsToNodesRequest {
public static GetLabelsToNodesRequest newInstance() {
return Records.newRecord(GetLabelsToNodesRequest.class);
}
public static GetLabelsToNodesRequest newInstance(Set<String> nodeLabels) {
GetLabelsToNodesRequest request =
Records.newRecord(GetLabelsToNodesRequest.class);
request.setNodeLabels(nodeLabels);
return request;
}
public abstract void setNodeLabels(Set<String> nodeLabels);
public abstract Set<String> getNodeLabels();
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
public abstract class GetLabelsToNodesResponse {
public static GetLabelsToNodesResponse newInstance(
Map<String, Set<NodeId>> map) {
GetLabelsToNodesResponse response =
Records.newRecord(GetLabelsToNodesResponse.class);
response.setLabelsToNodes(map);
return response;
}
@Public
@Evolving
public abstract void setLabelsToNodes(Map<String, Set<NodeId>> map);
@Public
@Evolving
public abstract Map<String, Set<NodeId>> getLabelsToNodes();
}

View File

@ -53,5 +53,6 @@ service ApplicationClientProtocolService {
rpc updateReservation (ReservationUpdateRequestProto) returns (ReservationUpdateResponseProto);
rpc deleteReservation (ReservationDeleteRequestProto) returns (ReservationDeleteResponseProto);
rpc getNodeToLabels (GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto);
rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
}

View File

@ -238,6 +238,11 @@ message NodeIdToLabelsProto {
repeated string nodeLabels = 2;
}
message LabelsToNodeIdsProto {
optional string nodeLabels = 1;
repeated NodeIdProto nodeId = 2;
}
////////////////////////////////////////////////////////////////////////
////// From AM_RM_Protocol /////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////

View File

@ -201,6 +201,14 @@ message GetNodesToLabelsResponseProto {
repeated NodeIdToLabelsProto nodeToLabels = 1;
}
message GetLabelsToNodesRequestProto {
repeated string nodeLabels = 1;
}
message GetLabelsToNodesResponseProto {
repeated LabelsToNodeIdsProto labelsToNodes = 1;
}
message GetClusterNodeLabelsRequestProto {
}

View File

@ -596,6 +596,37 @@ public abstract ReservationDeleteResponse deleteReservation(
public abstract Map<NodeId, Set<String>> getNodeToLabels()
throws YarnException, IOException;
/**
* <p>
* The interface used by client to get labels to nodes mapping
* in existing cluster
* </p>
*
* @return node to labels mappings
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
public abstract Map<String, Set<NodeId>> getLabelsToNodes()
throws YarnException, IOException;
/**
* <p>
* The interface used by client to get labels to nodes mapping
* for specified labels in existing cluster
* </p>
*
* @param labels labels for which labels to nodes mapping has to be retrieved
* @return labels to nodes mappings for specific labels
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
public abstract Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels)
throws YarnException, IOException;
/**
* <p>
* The interface used by client to get node labels in the cluster

View File

@ -61,6 +61,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
@ -777,6 +778,20 @@ public Map<NodeId, Set<String>> getNodeToLabels() throws YarnException,
.getNodeToLabels();
}
@Override
public Map<String, Set<NodeId>> getLabelsToNodes() throws YarnException,
IOException {
return rmClient.getLabelsToNodes(GetLabelsToNodesRequest.newInstance())
.getLabelsToNodes();
}
@Override
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels)
throws YarnException, IOException {
return rmClient.getLabelsToNodes(
GetLabelsToNodesRequest.newInstance(labels)).getLabelsToNodes();
}
@Override
public Set<String> getClusterNodeLabels() throws YarnException, IOException {
return rmClient.getClusterNodeLabels(

View File

@ -30,6 +30,7 @@
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@ -37,6 +38,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
@ -63,6 +65,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
@ -402,6 +406,32 @@ public void testGetContainerReport() throws YarnException, IOException {
client.stop();
}
@Test (timeout = 10000)
public void testGetLabelsToNodes() throws YarnException, IOException {
Configuration conf = new Configuration();
final YarnClient client = new MockYarnClient();
client.init(conf);
client.start();
// Get labels to nodes mapping
Map<String, Set<NodeId>> expectedLabelsToNodes =
((MockYarnClient)client).getLabelsToNodesMap();
Map<String, Set<NodeId>> labelsToNodes = client.getLabelsToNodes();
Assert.assertEquals(labelsToNodes, expectedLabelsToNodes);
Assert.assertEquals(labelsToNodes.size(), 3);
// Get labels to nodes for selected labels
Set<String> setLabels = new HashSet<String>(Arrays.asList("x", "z"));
expectedLabelsToNodes =
((MockYarnClient)client).getLabelsToNodesMap(setLabels);
labelsToNodes = client.getLabelsToNodes(setLabels);
Assert.assertEquals(labelsToNodes, expectedLabelsToNodes);
Assert.assertEquals(labelsToNodes.size(), 2);
client.stop();
client.close();
}
private static class MockYarnClient extends YarnClientImpl {
private ApplicationReport mockReport;
private List<ApplicationReport> reports;
@ -422,6 +452,8 @@ private static class MockYarnClient extends YarnClientImpl {
mock(GetContainersResponse.class);
GetContainerReportResponse mockContainerResponse =
mock(GetContainerReportResponse.class);
GetLabelsToNodesResponse mockLabelsToNodesResponse =
mock(GetLabelsToNodesResponse.class);
public MockYarnClient() {
super();
@ -457,6 +489,9 @@ public void start() {
when(rmClient.getContainerReport(any(GetContainerReportRequest.class)))
.thenReturn(mockContainerResponse);
when(rmClient.getLabelsToNodes(any(GetLabelsToNodesRequest.class)))
.thenReturn(mockLabelsToNodesResponse);
historyClient = mock(AHSClient.class);
@ -617,7 +652,45 @@ private List<ApplicationReport> getApplicationReports(
}
return appReports;
}
@Override
public Map<String, Set<NodeId>> getLabelsToNodes()
throws YarnException, IOException {
when(mockLabelsToNodesResponse.getLabelsToNodes()).thenReturn(
getLabelsToNodesMap());
return super.getLabelsToNodes();
}
@Override
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels)
throws YarnException, IOException {
when(mockLabelsToNodesResponse.getLabelsToNodes()).thenReturn(
getLabelsToNodesMap(labels));
return super.getLabelsToNodes(labels);
}
public Map<String, Set<NodeId>> getLabelsToNodesMap() {
Map<String, Set<NodeId>> map = new HashMap<String, Set<NodeId>>();
Set<NodeId> setNodeIds =
new HashSet<NodeId>(Arrays.asList(
NodeId.newInstance("host1", 0), NodeId.newInstance("host2", 0)));
map.put("x", setNodeIds);
map.put("y", setNodeIds);
map.put("z", setNodeIds);
return map;
}
public Map<String, Set<NodeId>> getLabelsToNodesMap(Set<String> labels) {
Map<String, Set<NodeId>> map = new HashMap<String, Set<NodeId>>();
Set<NodeId> setNodeIds =
new HashSet<NodeId>(Arrays.asList(
NodeId.newInstance("host1", 0), NodeId.newInstance("host2", 0)));
for(String label : labels) {
map.put(label, setNodeIds);
}
return map;
}
@Override
public List<ApplicationAttemptReport> getApplicationAttempts(
ApplicationId appId) throws YarnException, IOException {

View File

@ -53,6 +53,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
@ -97,6 +99,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLabelsToNodesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLabelsToNodesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
@ -474,6 +478,21 @@ public GetNodesToLabelsResponse getNodeToLabels(
}
}
@Override
public GetLabelsToNodesResponse getLabelsToNodes(
GetLabelsToNodesRequest request)
throws YarnException, IOException {
YarnServiceProtos.GetLabelsToNodesRequestProto requestProto =
((GetLabelsToNodesRequestPBImpl) request).getProto();
try {
return new GetLabelsToNodesResponsePBImpl(proxy.getLabelsToNodes(
null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
@Override
public GetClusterNodeLabelsResponse getClusterNodeLabels(
GetClusterNodeLabelsRequest request) throws YarnException, IOException {

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
@ -73,6 +74,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLabelsToNodesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLabelsToNodesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
@ -114,6 +117,8 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerReportResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNodesToLabelsRequestProto;
@ -470,6 +475,22 @@ public GetNodesToLabelsResponseProto getNodeToLabels(
}
}
@Override
public GetLabelsToNodesResponseProto getLabelsToNodes(
RpcController controller, GetLabelsToNodesRequestProto proto)
throws ServiceException {
GetLabelsToNodesRequestPBImpl request =
new GetLabelsToNodesRequestPBImpl(proto);
try {
GetLabelsToNodesResponse response = real.getLabelsToNodes(request);
return ((GetLabelsToNodesResponsePBImpl) response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetClusterNodeLabelsResponseProto getClusterNodeLabels(
RpcController controller, GetClusterNodeLabelsRequestProto proto)

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesRequestProtoOrBuilder;
import com.google.protobuf.TextFormat;
public class GetLabelsToNodesRequestPBImpl extends GetLabelsToNodesRequest {
Set<String> nodeLabels = null;
GetLabelsToNodesRequestProto proto =
GetLabelsToNodesRequestProto.getDefaultInstance();
GetLabelsToNodesRequestProto.Builder builder = null;
boolean viaProto = false;
public GetLabelsToNodesRequestPBImpl() {
builder = GetLabelsToNodesRequestProto.newBuilder();
}
public GetLabelsToNodesRequestPBImpl(GetLabelsToNodesRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public GetLabelsToNodesRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (nodeLabels != null && !nodeLabels.isEmpty()) {
builder.clearNodeLabels();
builder.addAllNodeLabels(nodeLabels);
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetLabelsToNodesRequestProto.newBuilder(proto);
}
viaProto = false;
}
private void initNodeLabels() {
if (this.nodeLabels != null) {
return;
}
GetLabelsToNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
List<String> nodeLabelsList = p.getNodeLabelsList();
this.nodeLabels = new HashSet<String>();
this.nodeLabels.addAll(nodeLabelsList);
}
@Override
public Set<String> getNodeLabels() {
initNodeLabels();
return this.nodeLabels;
}
@Override
public void setNodeLabels(Set<String> nodeLabels) {
maybeInitBuilder();
if (nodeLabels == null)
builder.clearNodeLabels();
this.nodeLabels = nodeLabels;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.proto.YarnProtos.LabelsToNodeIdsProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesResponseProtoOrBuilder;
public class GetLabelsToNodesResponsePBImpl extends
GetLabelsToNodesResponse {
GetLabelsToNodesResponseProto proto = GetLabelsToNodesResponseProto
.getDefaultInstance();
GetLabelsToNodesResponseProto.Builder builder = null;
boolean viaProto = false;
private Map<String, Set<NodeId>> labelsToNodes;
public GetLabelsToNodesResponsePBImpl() {
this.builder = GetLabelsToNodesResponseProto.newBuilder();
}
public GetLabelsToNodesResponsePBImpl(GetLabelsToNodesResponseProto proto) {
this.proto = proto;
this.viaProto = true;
}
private void initLabelsToNodes() {
if (this.labelsToNodes != null) {
return;
}
GetLabelsToNodesResponseProtoOrBuilder p = viaProto ? proto : builder;
List<LabelsToNodeIdsProto> list = p.getLabelsToNodesList();
this.labelsToNodes = new HashMap<String, Set<NodeId>>();
for (LabelsToNodeIdsProto c : list) {
Set<NodeId> setNodes = new HashSet<NodeId>();
for(NodeIdProto n : c.getNodeIdList()) {
NodeId node = new NodeIdPBImpl(n);
setNodes.add(node);
}
if(!setNodes.isEmpty()) {
this.labelsToNodes.put(c.getNodeLabels(), setNodes);
}
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetLabelsToNodesResponseProto.newBuilder(proto);
}
viaProto = false;
}
private void addLabelsToNodesToProto() {
maybeInitBuilder();
builder.clearLabelsToNodes();
if (labelsToNodes == null) {
return;
}
Iterable<LabelsToNodeIdsProto> iterable =
new Iterable<LabelsToNodeIdsProto>() {
@Override
public Iterator<LabelsToNodeIdsProto> iterator() {
return new Iterator<LabelsToNodeIdsProto>() {
Iterator<Entry<String, Set<NodeId>>> iter =
labelsToNodes.entrySet().iterator();
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public LabelsToNodeIdsProto next() {
Entry<String, Set<NodeId>> now = iter.next();
Set<NodeIdProto> nodeProtoSet = new HashSet<NodeIdProto>();
for(NodeId n : now.getValue()) {
nodeProtoSet.add(convertToProtoFormat(n));
}
return LabelsToNodeIdsProto.newBuilder()
.setNodeLabels(now.getKey()).addAllNodeId(nodeProtoSet)
.build();
}
@Override
public boolean hasNext() {
return iter.hasNext();
}
};
}
};
builder.addAllLabelsToNodes(iterable);
}
private void mergeLocalToBuilder() {
if (this.labelsToNodes != null) {
addLabelsToNodesToProto();
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
public GetLabelsToNodesResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private NodeIdProto convertToProtoFormat(NodeId t) {
return ((NodeIdPBImpl)t).getProto();
}
@Override
public int hashCode() {
assert false : "hashCode not designed";
return 0;
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
@Public
@Evolving
public void setLabelsToNodes(Map<String, Set<NodeId>> map) {
initLabelsToNodes();
labelsToNodes.clear();
labelsToNodes.putAll(map);
}
@Override
@Public
@Evolving
public Map<String, Set<NodeId>> getLabelsToNodes() {
initLabelsToNodes();
return this.labelsToNodes;
}
}

View File

@ -1003,4 +1003,16 @@ public void testGetNodeToLabelsResponsePBImpl() throws Exception {
validatePBImplRecord(GetNodesToLabelsResponsePBImpl.class,
GetNodesToLabelsResponseProto.class);
}
@Test
public void testGetLabelsToNodesRequestPBImpl() throws Exception {
validatePBImplRecord(GetLabelsToNodesRequestPBImpl.class,
GetLabelsToNodesRequestProto.class);
}
@Test
public void testGetLabelsToNodesResponsePBImpl() throws Exception {
validatePBImplRecord(GetLabelsToNodesResponsePBImpl.class,
GetLabelsToNodesResponseProto.class);
}
}

View File

@ -71,6 +71,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
@ -1222,6 +1224,19 @@ public GetNodesToLabelsResponse getNodeToLabels(
return response;
}
@Override
public GetLabelsToNodesResponse getLabelsToNodes(
GetLabelsToNodesRequest request) throws YarnException, IOException {
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
if (request.getNodeLabels() == null || request.getNodeLabels().isEmpty()) {
return GetLabelsToNodesResponse.newInstance(
labelsMgr.getLabelsToNodes());
} else {
return GetLabelsToNodesResponse.newInstance(
labelsMgr.getLabelsToNodes(request.getNodeLabels()));
}
}
@Override
public GetClusterNodeLabelsResponse getClusterNodeLabels(
GetClusterNodeLabelsRequest request) throws YarnException, IOException {

View File

@ -70,6 +70,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
@ -1437,4 +1439,77 @@ protected ClientRMService createClientRMService() {
rpc.stopProxy(client, conf);
rm.close();
}
@Test
public void testGetLabelsToNodes() throws Exception {
MockRM rm = new MockRM() {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager, this.getRMContext()
.getRMDelegationTokenSecretManager());
};
};
rm.start();
RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager();
labelsMgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>();
map.put(NodeId.newInstance("host1", 0), ImmutableSet.of("x"));
map.put(NodeId.newInstance("host1", 1), ImmutableSet.of("z"));
map.put(NodeId.newInstance("host2", 0), ImmutableSet.of("y"));
map.put(NodeId.newInstance("host3", 0), ImmutableSet.of("y"));
map.put(NodeId.newInstance("host3", 1), ImmutableSet.of("z"));
labelsMgr.replaceLabelsOnNode(map);
// Create a client.
Configuration conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client =
(ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
// Get node labels collection
GetClusterNodeLabelsResponse response =
client.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance());
Assert.assertTrue(response.getNodeLabels().containsAll(
Arrays.asList("x", "y", "z")));
// Get labels to nodes mapping
GetLabelsToNodesResponse response1 =
client.getLabelsToNodes(GetLabelsToNodesRequest.newInstance());
Map<String, Set<NodeId>> labelsToNodes = response1.getLabelsToNodes();
Assert.assertTrue(
labelsToNodes.keySet().containsAll(Arrays.asList("x", "y", "z")));
Assert.assertTrue(
labelsToNodes.get("x").containsAll(Arrays.asList(
NodeId.newInstance("host1", 0))));
Assert.assertTrue(
labelsToNodes.get("y").containsAll(Arrays.asList(
NodeId.newInstance("host2", 0), NodeId.newInstance("host3", 0))));
Assert.assertTrue(
labelsToNodes.get("z").containsAll(Arrays.asList(
NodeId.newInstance("host1", 1), NodeId.newInstance("host3", 1))));
// Get labels to nodes mapping for specific labels
Set<String> setlabels =
new HashSet<String>(Arrays.asList(new String[]{"x", "z"}));
GetLabelsToNodesResponse response2 =
client.getLabelsToNodes(GetLabelsToNodesRequest.newInstance(setlabels));
labelsToNodes = response2.getLabelsToNodes();
Assert.assertTrue(
labelsToNodes.keySet().containsAll(Arrays.asList("x", "z")));
Assert.assertTrue(
labelsToNodes.get("x").containsAll(Arrays.asList(
NodeId.newInstance("host1", 0))));
Assert.assertTrue(
labelsToNodes.get("z").containsAll(Arrays.asList(
NodeId.newInstance("host1", 1), NodeId.newInstance("host3", 1))));
Assert.assertEquals(labelsToNodes.get("y"), null);
rpc.stopProxy(client, conf);
rm.close();
}
}