YARN-7173. Container update RM-NM communication fix for backward compatibility. (Arun Suresh via wangda)
Change-Id: I1c39ed5c59dee739ba5044b61b3ef5ed203b79c1
This commit is contained in:
parent
fa531788fd
commit
e74d1be04b
@ -113,4 +113,9 @@ public abstract void addAllContainersToUpdate(
|
|||||||
|
|
||||||
public abstract void setContainerQueuingLimit(
|
public abstract void setContainerQueuingLimit(
|
||||||
ContainerQueuingLimit containerQueuingLimit);
|
ContainerQueuingLimit containerQueuingLimit);
|
||||||
|
|
||||||
|
public abstract List<Container> getContainersToDecrease();
|
||||||
|
|
||||||
|
public abstract void addAllContainersToDecrease(
|
||||||
|
Collection<Container> containersToDecrease);
|
||||||
}
|
}
|
||||||
|
@ -80,6 +80,8 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
|
|||||||
private MasterKey nmTokenMasterKey = null;
|
private MasterKey nmTokenMasterKey = null;
|
||||||
private ContainerQueuingLimit containerQueuingLimit = null;
|
private ContainerQueuingLimit containerQueuingLimit = null;
|
||||||
private List<Container> containersToUpdate = null;
|
private List<Container> containersToUpdate = null;
|
||||||
|
// NOTE: This is required for backward compatibility.
|
||||||
|
private List<Container> containersToDecrease = null;
|
||||||
private List<SignalContainerRequest> containersToSignal = null;
|
private List<SignalContainerRequest> containersToSignal = null;
|
||||||
|
|
||||||
public NodeHeartbeatResponsePBImpl() {
|
public NodeHeartbeatResponsePBImpl() {
|
||||||
@ -126,6 +128,9 @@ private void mergeLocalToBuilder() {
|
|||||||
if (this.containersToUpdate != null) {
|
if (this.containersToUpdate != null) {
|
||||||
addContainersToUpdateToProto();
|
addContainersToUpdateToProto();
|
||||||
}
|
}
|
||||||
|
if (this.containersToDecrease != null) {
|
||||||
|
addContainersToDecreaseToProto();
|
||||||
|
}
|
||||||
if (this.containersToSignal != null) {
|
if (this.containersToSignal != null) {
|
||||||
addContainersToSignalToProto();
|
addContainersToSignalToProto();
|
||||||
}
|
}
|
||||||
@ -572,6 +577,66 @@ public void remove() {
|
|||||||
builder.addAllContainersToUpdate(iterable);
|
builder.addAllContainersToUpdate(iterable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void initContainersToDecrease() {
|
||||||
|
if (this.containersToDecrease != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
List<ContainerProto> list = p.getContainersToDecreaseList();
|
||||||
|
this.containersToDecrease = new ArrayList<>();
|
||||||
|
|
||||||
|
for (ContainerProto c : list) {
|
||||||
|
this.containersToDecrease.add(convertFromProtoFormat(c));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Container> getContainersToDecrease() {
|
||||||
|
initContainersToDecrease();
|
||||||
|
return this.containersToDecrease;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addAllContainersToDecrease(
|
||||||
|
final Collection<Container> containersToDecrease) {
|
||||||
|
if (containersToDecrease == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
initContainersToDecrease();
|
||||||
|
this.containersToDecrease.addAll(containersToDecrease);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addContainersToDecreaseToProto() {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.clearContainersToDecrease();
|
||||||
|
if (this.containersToDecrease == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterable<ContainerProto> iterable = new
|
||||||
|
Iterable<ContainerProto>() {
|
||||||
|
@Override
|
||||||
|
public Iterator<ContainerProto> iterator() {
|
||||||
|
return new Iterator<ContainerProto>() {
|
||||||
|
private Iterator<Container> iter = containersToDecrease.iterator();
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return iter.hasNext();
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public ContainerProto next() {
|
||||||
|
return convertToProtoFormat(iter.next());
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
builder.addAllContainersToDecrease(iterable);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
|
public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
|
||||||
if (this.systemCredentials != null) {
|
if (this.systemCredentials != null) {
|
||||||
|
@ -35,6 +35,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
@ -641,7 +642,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {
|
|||||||
new AllocationExpirationInfo(event.getContainerId()));
|
new AllocationExpirationInfo(event.getContainerId()));
|
||||||
container.eventHandler.handle(new RMNodeUpdateContainerEvent(
|
container.eventHandler.handle(new RMNodeUpdateContainerEvent(
|
||||||
container.nodeId,
|
container.nodeId,
|
||||||
Collections.singletonList(container.getContainer())));
|
Collections.singletonMap(container.getContainer(),
|
||||||
|
ContainerUpdateType.DECREASE_RESOURCE)));
|
||||||
} else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) {
|
} else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) {
|
||||||
// If nmContainerResource < rmContainerResource, this is caused by the
|
// If nmContainerResource < rmContainerResource, this is caused by the
|
||||||
// following sequence:
|
// following sequence:
|
||||||
|
@ -48,6 +48,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
@ -174,6 +175,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||||||
private final Map<ContainerId, Container> toBeUpdatedContainers =
|
private final Map<ContainerId, Container> toBeUpdatedContainers =
|
||||||
new HashMap<>();
|
new HashMap<>();
|
||||||
|
|
||||||
|
// NOTE: This is required for backward compatibility.
|
||||||
|
private final Map<ContainerId, Container> toBeDecreasedContainers =
|
||||||
|
new HashMap<>();
|
||||||
|
|
||||||
private final Map<ContainerId, Container> nmReportedIncreasedContainers =
|
private final Map<ContainerId, Container> nmReportedIncreasedContainers =
|
||||||
new HashMap<>();
|
new HashMap<>();
|
||||||
|
|
||||||
@ -626,6 +631,10 @@ public void updateNodeHeartbeatResponseForUpdatedContainers(
|
|||||||
try {
|
try {
|
||||||
response.addAllContainersToUpdate(toBeUpdatedContainers.values());
|
response.addAllContainersToUpdate(toBeUpdatedContainers.values());
|
||||||
toBeUpdatedContainers.clear();
|
toBeUpdatedContainers.clear();
|
||||||
|
|
||||||
|
// NOTE: This is required for backward compatibility.
|
||||||
|
response.addAllContainersToDecrease(toBeDecreasedContainers.values());
|
||||||
|
toBeDecreasedContainers.clear();
|
||||||
} finally {
|
} finally {
|
||||||
this.writeLock.unlock();
|
this.writeLock.unlock();
|
||||||
}
|
}
|
||||||
@ -1043,8 +1052,13 @@ public static class UpdateContainersTransition
|
|||||||
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||||
RMNodeUpdateContainerEvent de = (RMNodeUpdateContainerEvent) event;
|
RMNodeUpdateContainerEvent de = (RMNodeUpdateContainerEvent) event;
|
||||||
|
|
||||||
for (Container c : de.getToBeUpdatedContainers()) {
|
for (Map.Entry<Container, ContainerUpdateType> e :
|
||||||
rmNode.toBeUpdatedContainers.put(c.getId(), c);
|
de.getToBeUpdatedContainers().entrySet()) {
|
||||||
|
// NOTE: This is required for backward compatibility.
|
||||||
|
if (ContainerUpdateType.DECREASE_RESOURCE == e.getValue()) {
|
||||||
|
rmNode.toBeDecreasedContainers.put(e.getKey().getId(), e.getKey());
|
||||||
|
}
|
||||||
|
rmNode.toBeUpdatedContainers.put(e.getKey().getId(), e.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,8 +19,10 @@
|
|||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -29,16 +31,15 @@
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class RMNodeUpdateContainerEvent extends RMNodeEvent {
|
public class RMNodeUpdateContainerEvent extends RMNodeEvent {
|
||||||
private List<Container> toBeUpdatedContainers;
|
private Map<Container, ContainerUpdateType> toBeUpdatedContainers;
|
||||||
|
|
||||||
public RMNodeUpdateContainerEvent(NodeId nodeId,
|
public RMNodeUpdateContainerEvent(NodeId nodeId,
|
||||||
List<Container> toBeUpdatedContainers) {
|
Map<Container, ContainerUpdateType> toBeUpdatedContainers) {
|
||||||
super(nodeId, RMNodeEventType.UPDATE_CONTAINER);
|
super(nodeId, RMNodeEventType.UPDATE_CONTAINER);
|
||||||
|
|
||||||
this.toBeUpdatedContainers = toBeUpdatedContainers;
|
this.toBeUpdatedContainers = toBeUpdatedContainers;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Container> getToBeUpdatedContainers() {
|
public Map<Container, ContainerUpdateType> getToBeUpdatedContainers() {
|
||||||
return toBeUpdatedContainers;
|
return toBeUpdatedContainers;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -690,7 +690,8 @@ private Container updateContainerAndNMToken(RMContainer rmContainer,
|
|||||||
if (autoUpdate) {
|
if (autoUpdate) {
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
new RMNodeUpdateContainerEvent(rmContainer.getNodeId(),
|
new RMNodeUpdateContainerEvent(rmContainer.getNodeId(),
|
||||||
Collections.singletonList(rmContainer.getContainer())));
|
Collections.singletonMap(
|
||||||
|
rmContainer.getContainer(), updateType)));
|
||||||
} else {
|
} else {
|
||||||
rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
|
rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
|
||||||
rmContainer.getContainerId(),
|
rmContainer.getContainerId(),
|
||||||
|
Loading…
Reference in New Issue
Block a user