From e74d1be04be47969943b0501a4f335b0b5188287 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Mon, 11 Sep 2017 20:46:41 -0700 Subject: [PATCH] YARN-7173. Container update RM-NM communication fix for backward compatibility. (Arun Suresh via wangda) Change-Id: I1c39ed5c59dee739ba5044b61b3ef5ed203b79c1 --- .../NodeHeartbeatResponse.java | 5 ++ .../impl/pb/NodeHeartbeatResponsePBImpl.java | 65 +++++++++++++++++++ .../rmcontainer/RMContainerImpl.java | 4 +- .../resourcemanager/rmnode/RMNodeImpl.java | 20 +++++- .../rmnode/RMNodeUpdateContainerEvent.java | 9 +-- .../SchedulerApplicationAttempt.java | 3 +- 6 files changed, 97 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 2ebca57085..05a9c721e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -113,4 +113,9 @@ public abstract void addAllContainersToUpdate( public abstract void setContainerQueuingLimit( ContainerQueuingLimit containerQueuingLimit); + + public abstract List getContainersToDecrease(); + + public abstract void addAllContainersToDecrease( + Collection containersToDecrease); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 11f5f61416..bbd1294219 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -80,6 +80,8 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { private MasterKey nmTokenMasterKey = null; private ContainerQueuingLimit containerQueuingLimit = null; private List containersToUpdate = null; + // NOTE: This is required for backward compatibility. + private List containersToDecrease = null; private List containersToSignal = null; public NodeHeartbeatResponsePBImpl() { @@ -126,6 +128,9 @@ private void mergeLocalToBuilder() { if (this.containersToUpdate != null) { addContainersToUpdateToProto(); } + if (this.containersToDecrease != null) { + addContainersToDecreaseToProto(); + } if (this.containersToSignal != null) { addContainersToSignalToProto(); } @@ -572,6 +577,66 @@ public void remove() { builder.addAllContainersToUpdate(iterable); } + private void initContainersToDecrease() { + if (this.containersToDecrease != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainersToDecreaseList(); + this.containersToDecrease = new ArrayList<>(); + + for (ContainerProto c : list) { + this.containersToDecrease.add(convertFromProtoFormat(c)); + } + } + + @Override + public List getContainersToDecrease() { + initContainersToDecrease(); + return this.containersToDecrease; + } + + @Override + public void addAllContainersToDecrease( + final Collection containersToDecrease) { + if (containersToDecrease == null) { + return; + } + initContainersToDecrease(); + this.containersToDecrease.addAll(containersToDecrease); + } + + private void addContainersToDecreaseToProto() { + maybeInitBuilder(); + builder.clearContainersToDecrease(); + if (this.containersToDecrease == null) { + return; + } + + Iterable iterable = new + Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iter = containersToDecrease.iterator(); + @Override + public boolean hasNext() { + return iter.hasNext(); + } + @Override + public ContainerProto next() { + return convertToProtoFormat(iter.next()); + } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainersToDecrease(iterable); + } + @Override public Map getSystemCredentialsForApps() { if (this.systemCredentials != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index f49db7e761..8aa57884d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -641,7 +642,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { new AllocationExpirationInfo(event.getContainerId())); container.eventHandler.handle(new RMNodeUpdateContainerEvent( container.nodeId, - Collections.singletonList(container.getContainer()))); + Collections.singletonMap(container.getContainer(), + ContainerUpdateType.DECREASE_RESOURCE))); } else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) { // If nmContainerResource < rmContainerResource, this is caused by the // following sequence: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index d270aa3084..c547128f18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -173,7 +174,11 @@ public class RMNodeImpl implements RMNode, EventHandler { private final Map toBeUpdatedContainers = new HashMap<>(); - + + // NOTE: This is required for backward compatibility. + private final Map toBeDecreasedContainers = + new HashMap<>(); + private final Map nmReportedIncreasedContainers = new HashMap<>(); @@ -626,6 +631,10 @@ public void updateNodeHeartbeatResponseForUpdatedContainers( try { response.addAllContainersToUpdate(toBeUpdatedContainers.values()); toBeUpdatedContainers.clear(); + + // NOTE: This is required for backward compatibility. + response.addAllContainersToDecrease(toBeDecreasedContainers.values()); + toBeDecreasedContainers.clear(); } finally { this.writeLock.unlock(); } @@ -1043,8 +1052,13 @@ public static class UpdateContainersTransition public void transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeUpdateContainerEvent de = (RMNodeUpdateContainerEvent) event; - for (Container c : de.getToBeUpdatedContainers()) { - rmNode.toBeUpdatedContainers.put(c.getId(), c); + for (Map.Entry e : + de.getToBeUpdatedContainers().entrySet()) { + // NOTE: This is required for backward compatibility. + if (ContainerUpdateType.DECREASE_RESOURCE == e.getValue()) { + rmNode.toBeDecreasedContainers.put(e.getKey().getId(), e.getKey()); + } + rmNode.toBeUpdatedContainers.put(e.getKey().getId(), e.getKey()); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java index 73af563dba..b8f8e734f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; import java.util.List; +import java.util.Map; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.NodeId; /** @@ -29,16 +31,15 @@ * */ public class RMNodeUpdateContainerEvent extends RMNodeEvent { - private List toBeUpdatedContainers; + private Map toBeUpdatedContainers; public RMNodeUpdateContainerEvent(NodeId nodeId, - List toBeUpdatedContainers) { + Map toBeUpdatedContainers) { super(nodeId, RMNodeEventType.UPDATE_CONTAINER); - this.toBeUpdatedContainers = toBeUpdatedContainers; } - public List getToBeUpdatedContainers() { + public Map getToBeUpdatedContainers() { return toBeUpdatedContainers; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 6a44cae6c3..c807590c22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -690,7 +690,8 @@ private Container updateContainerAndNMToken(RMContainer rmContainer, if (autoUpdate) { this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeUpdateContainerEvent(rmContainer.getNodeId(), - Collections.singletonList(rmContainer.getContainer()))); + Collections.singletonMap( + rmContainer.getContainer(), updateType))); } else { rmContainer.handle(new RMContainerUpdatesAcquiredEvent( rmContainer.getContainerId(),