YARN-5483. Optimize RMAppAttempt#pullJustFinishedContainers. Contributed by sandflee
This commit is contained in:
parent
4d4fe07c3c
commit
e0b570dffb
@ -28,6 +28,7 @@
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
@ -144,14 +145,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||
private SecretKey clientTokenMasterKey = null;
|
||||
|
||||
private ConcurrentMap<NodeId, List<ContainerStatus>>
|
||||
justFinishedContainers =
|
||||
new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
|
||||
justFinishedContainers = new ConcurrentHashMap<>();
|
||||
// Tracks the previous finished containers that are waiting to be
|
||||
// verified as received by the AM. If the AM sends the next allocate
|
||||
// request it implicitly acks this list.
|
||||
private ConcurrentMap<NodeId, List<ContainerStatus>>
|
||||
finishedContainersSentToAM =
|
||||
new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
|
||||
finishedContainersSentToAM = new ConcurrentHashMap<>();
|
||||
private volatile Container masterContainer;
|
||||
|
||||
private float progress = 0;
|
||||
@ -759,7 +758,7 @@ public float getProgress() {
|
||||
public List<ContainerStatus> getJustFinishedContainers() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
List<ContainerStatus> returnList = new ArrayList<ContainerStatus>();
|
||||
List<ContainerStatus> returnList = new ArrayList<>();
|
||||
for (Collection<ContainerStatus> containerStatusList :
|
||||
justFinishedContainers.values()) {
|
||||
returnList.addAll(containerStatusList);
|
||||
@ -798,7 +797,7 @@ public List<ContainerStatus> pullJustFinishedContainers() {
|
||||
this.writeLock.lock();
|
||||
|
||||
try {
|
||||
List<ContainerStatus> returnList = new ArrayList<ContainerStatus>();
|
||||
List<ContainerStatus> returnList = new ArrayList<>();
|
||||
|
||||
// A new allocate means the AM received the previously sent
|
||||
// finishedContainers. We can ack this to NM now
|
||||
@ -806,15 +805,17 @@ public List<ContainerStatus> pullJustFinishedContainers() {
|
||||
|
||||
// Mark every containerStatus as being sent to AM though we may return
|
||||
// only the ones that belong to the current attempt
|
||||
boolean keepContainersAcressAttempts = this.submissionContext
|
||||
boolean keepContainersAcrossAppAttempts = this.submissionContext
|
||||
.getKeepContainersAcrossApplicationAttempts();
|
||||
for (NodeId nodeId:justFinishedContainers.keySet()) {
|
||||
for (Map.Entry<NodeId, List<ContainerStatus>> entry :
|
||||
justFinishedContainers.entrySet()) {
|
||||
NodeId nodeId = entry.getKey();
|
||||
List<ContainerStatus> finishedContainers = entry.getValue();
|
||||
if (finishedContainers.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Clear and get current values
|
||||
List<ContainerStatus> finishedContainers = justFinishedContainers.put
|
||||
(nodeId, new ArrayList<ContainerStatus>());
|
||||
|
||||
if (keepContainersAcressAttempts) {
|
||||
if (keepContainersAcrossAppAttempts) {
|
||||
returnList.addAll(finishedContainers);
|
||||
} else {
|
||||
// Filter out containers from previous attempt
|
||||
@ -826,12 +827,10 @@ public List<ContainerStatus> pullJustFinishedContainers() {
|
||||
}
|
||||
}
|
||||
|
||||
if (!finishedContainers.isEmpty()) {
|
||||
finishedContainersSentToAM.putIfAbsent(nodeId,
|
||||
new ArrayList<ContainerStatus>());
|
||||
finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList<>());
|
||||
finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);
|
||||
}
|
||||
}
|
||||
justFinishedContainers.clear();
|
||||
|
||||
return returnList;
|
||||
} finally {
|
||||
@ -936,8 +935,7 @@ public void transferStateFromAttempt(RMAppAttempt attempt) {
|
||||
for (NodeId nodeId : this.finishedContainersSentToAM.keySet()) {
|
||||
List<ContainerStatus> containerStatuses =
|
||||
this.finishedContainersSentToAM.get(nodeId);
|
||||
this.justFinishedContainers.putIfAbsent(nodeId,
|
||||
new ArrayList<ContainerStatus>());
|
||||
this.justFinishedContainers.putIfAbsent(nodeId, new ArrayList<>());
|
||||
this.justFinishedContainers.get(nodeId).addAll(containerStatuses);
|
||||
}
|
||||
this.finishedContainersSentToAM.clear();
|
||||
@ -1863,10 +1861,9 @@ private void sendFinishedContainersToNM() {
|
||||
|
||||
// Clear and get current values
|
||||
List<ContainerStatus> currentSentContainers =
|
||||
finishedContainersSentToAM.put(nodeId,
|
||||
new ArrayList<ContainerStatus>());
|
||||
finishedContainersSentToAM.put(nodeId, new ArrayList<>());
|
||||
List<ContainerId> containerIdList =
|
||||
new ArrayList<ContainerId>(currentSentContainers.size());
|
||||
new ArrayList<>(currentSentContainers.size());
|
||||
for (ContainerStatus containerStatus : currentSentContainers) {
|
||||
containerIdList.add(containerStatus.getContainerId());
|
||||
}
|
||||
@ -1897,7 +1894,7 @@ private static void amContainerFinished(RMAppAttemptImpl appAttempt,
|
||||
if (!appAttempt.getSubmissionContext()
|
||||
.getKeepContainersAcrossApplicationAttempts()) {
|
||||
appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,
|
||||
new ArrayList<ContainerStatus>());
|
||||
new ArrayList<>());
|
||||
appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);
|
||||
appAttempt.sendFinishedContainersToNM();
|
||||
} else {
|
||||
@ -1924,7 +1921,7 @@ public BlacklistManager getAMBlacklistManager() {
|
||||
private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
|
||||
appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
|
||||
.getNodeId(), new ArrayList<ContainerStatus>());
|
||||
.getNodeId(), new ArrayList<>());
|
||||
appAttempt.justFinishedContainers.get(containerFinishedEvent
|
||||
.getNodeId()).add(containerFinishedEvent.getContainerStatus());
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user