From d9d93e392508da2e8b88faefe002128e245fc8f3 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Wed, 31 Jan 2018 20:28:41 +0800 Subject: [PATCH] YARN-7842. PB changes to carry node-attributes in NM heartbeat. Contributed by Weiwei Yang. --- .../protocolrecords/NodeHeartbeatRequest.java | 17 ++++++ .../impl/pb/NodeHeartbeatRequestPBImpl.java | 52 +++++++++++++++++++ .../yarn_server_common_service_protos.proto | 5 ++ .../protocolrecords/TestProtocolRecords.java | 12 +++++ 4 files changed, 86 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index f238f79f17..4f9922507b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.api.records.NodeAttribute; public abstract class NodeHeartbeatRequest { @@ -61,6 +62,18 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, return nodeHeartbeatRequest; } + public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, + MasterKey lastKnownContainerTokenMasterKey, + MasterKey lastKnownNMTokenMasterKey, Set nodeLabels, + Set nodeAttributes, + Map registeringCollectors) { + NodeHeartbeatRequest request = NodeHeartbeatRequest + .newInstance(nodeStatus, lastKnownContainerTokenMasterKey, + lastKnownNMTokenMasterKey, nodeLabels, registeringCollectors); + request.setNodeAttributes(nodeAttributes); + return request; + } + public abstract NodeStatus getNodeStatus(); public abstract void setNodeStatus(NodeStatus status); @@ -85,4 +98,8 @@ public abstract void setLogAggregationReportsForApps( public abstract void setRegisteringCollectors(Map appCollectorsMap); + + public abstract Set getNodeAttributes(); + + public abstract void setNodeAttributes(Set nodeAttributes); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 1ffd223f8a..c59127a74b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -27,6 +27,9 @@ import java.util.Set; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeAttributePBImpl; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; @@ -36,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; @@ -60,6 +64,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; private Set labels = null; + private Set attributes = null; private List logAggregationReportsForApps = null; private Map registeringCollectors = null; @@ -115,6 +120,15 @@ private void mergeLocalToBuilder() { } builder.setNodeLabels(newBuilder.build()); } + if (this.attributes != null) { + builder.clearNodeAttributes(); + YarnServerCommonServiceProtos.NodeAttributesProto.Builder attBuilder = + YarnServerCommonServiceProtos.NodeAttributesProto.newBuilder(); + for (NodeAttribute attribute : attributes) { + attBuilder.addNodeAttributes(convertToProtoFormat(attribute)); + } + builder.setNodeAttributes(attBuilder.build()); + } if (this.logAggregationReportsForApps != null) { addLogAggregationStatusForAppsToProto(); } @@ -371,6 +385,44 @@ private NodeLabelProto convertToProtoFormat(NodeLabel t) { return ((NodeLabelPBImpl)t).getProto(); } + @Override + public Set getNodeAttributes() { + initNodeAttributes(); + return this.attributes; + } + + private void initNodeAttributes() { + if (this.attributes != null) { + return; + } + NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeAttributes()) { + return; + } + YarnServerCommonServiceProtos.NodeAttributesProto nodeAttributes = + p.getNodeAttributes(); + attributes = new HashSet<>(); + for (NodeAttributeProto attributeProto : + nodeAttributes.getNodeAttributesList()) { + attributes.add(convertFromProtoFormat(attributeProto)); + } + } + + @Override + public void setNodeAttributes(Set nodeAttributes) { + maybeInitBuilder(); + builder.clearNodeAttributes(); + this.attributes = nodeAttributes; + } + + private NodeAttributePBImpl convertFromProtoFormat(NodeAttributeProto p) { + return new NodeAttributePBImpl(p); + } + + private NodeAttributeProto convertToProtoFormat(NodeAttribute attribute) { + return ((NodeAttributePBImpl) attribute).getProto(); + } + @Override public List getLogAggregationReportsForApps() { if (this.logAggregationReportsForApps != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 387ddb4321..0b8c4a384d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -58,6 +58,10 @@ message NodeLabelsProto { repeated NodeLabelProto nodeLabels = 1; } +message NodeAttributesProto { + repeated NodeAttributeProto nodeAttributes = 1; +} + message RegisterNodeManagerRequestProto { optional NodeIdProto node_id = 1; optional int32 http_port = 3; @@ -95,6 +99,7 @@ message NodeHeartbeatRequestProto { optional NodeLabelsProto nodeLabels = 4; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; repeated AppCollectorDataProto registering_collectors = 6; + optional NodeAttributesProto nodeAttributes = 7; } message LogAggregationReportProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java index 74f19e5a4b..e6e79d3f5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -24,7 +24,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import com.google.common.collect.Sets; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; @@ -39,6 +41,8 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; @@ -173,6 +177,13 @@ public void testNodeHeartBeatRequest() throws IOException { nodeStatus.setOpportunisticContainersStatus(opportunisticContainersStatus); record.setNodeStatus(nodeStatus); + Set attributeSet = + Sets.newHashSet(NodeAttribute.newInstance("attributeA", + NodeAttributeType.STRING, "valueA"), + NodeAttribute.newInstance("attributeB", + NodeAttributeType.STRING, "valueB")); + record.setNodeAttributes(attributeSet); + NodeHeartbeatRequestPBImpl pb = new NodeHeartbeatRequestPBImpl( ((NodeHeartbeatRequestPBImpl) record).getProto()); @@ -183,6 +194,7 @@ public void testNodeHeartBeatRequest() throws IOException { Assert.assertEquals(321, pb.getNodeStatus().getOpportunisticContainersStatus() .getWaitQueueLength()); + Assert.assertEquals(2, pb.getNodeAttributes().size()); } @Test