diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 44f6fec31b..79a0feb3fa 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -108,6 +108,9 @@ Release 2.2.1 - UNRELEASED YARN-1315. TestQueueACLs should also test FairScheduler (Sandy Ryza) + YARN-1335. Move duplicate code from FSSchedulerApp and FiCaSchedulerApp + into SchedulerApplication (Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index cc9b872724..ade40c16c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -17,44 +17,385 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; /** - * Represents an Application from the viewpoint of the scheduler. - * Each running Application in the RM corresponds to one instance + * Represents an application attempt from the viewpoint of the scheduler. + * Each running app attempt in the RM corresponds to one instance * of this class. */ @Private @Unstable public abstract class SchedulerApplication { + + private static final Log LOG = LogFactory.getLog(SchedulerApplication.class); + + protected final AppSchedulingInfo appSchedulingInfo; + + protected final Map liveContainers = + new HashMap(); + protected final Map> reservedContainers = + new HashMap>(); + + private final Multiset reReservations = HashMultiset.create(); + + protected final Resource currentReservation = Resource.newInstance(0, 0); + private Resource resourceLimit = Resource.newInstance(0, 0); + protected final Resource currentConsumption = Resource.newInstance(0, 0); + + protected List newlyAllocatedContainers = + new ArrayList(); /** - * Get {@link ApplicationAttemptId} of the application master. - * @return ApplicationAttemptId of the application master + * Count how many times the application has been given an opportunity + * to schedule a task at each priority. Each time the scheduler + * asks the application for a task at this priority, it is incremented, + * and each time the application successfully schedules a task, it + * is reset to 0. */ - public abstract ApplicationAttemptId getApplicationAttemptId(); + Multiset schedulingOpportunities = HashMultiset.create(); + + // Time of the last container scheduled at the current allowed level + protected Map lastScheduledContainer = + new HashMap(); + + protected final Queue queue; + protected boolean isStopped = false; + + protected final RMContext rmContext; + + public SchedulerApplication(ApplicationAttemptId applicationAttemptId, + String user, Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext) { + this.rmContext = rmContext; + this.appSchedulingInfo = + new AppSchedulingInfo(applicationAttemptId, user, queue, + activeUsersManager); + this.queue = queue; + } /** * Get the live containers of the application. * @return live containers of the application */ - public abstract Collection getLiveContainers(); - - /** - * Get the reserved containers of the application. - * @return the reserved containers of the application - */ - public abstract Collection getReservedContainers(); + public synchronized Collection getLiveContainers() { + return new ArrayList(liveContainers.values()); + } /** * Is this application pending? * @return true if it is else false. */ - public abstract boolean isPending(); + public boolean isPending() { + return appSchedulingInfo.isPending(); + } + + /** + * Get {@link ApplicationAttemptId} of the application master. + * @return ApplicationAttemptId of the application master + */ + public ApplicationAttemptId getApplicationAttemptId() { + return appSchedulingInfo.getApplicationAttemptId(); + } + + public ApplicationId getApplicationId() { + return appSchedulingInfo.getApplicationId(); + } + + public String getUser() { + return appSchedulingInfo.getUser(); + } + + public Map getResourceRequests(Priority priority) { + return appSchedulingInfo.getResourceRequests(priority); + } + + public int getNewContainerId() { + return appSchedulingInfo.getNewContainerId(); + } + + public Collection getPriorities() { + return appSchedulingInfo.getPriorities(); + } + + public ResourceRequest getResourceRequest(Priority priority, String resourceName) { + return this.appSchedulingInfo.getResourceRequest(priority, resourceName); + } + + public synchronized int getTotalRequiredResources(Priority priority) { + return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers(); + } + + public Resource getResource(Priority priority) { + return appSchedulingInfo.getResource(priority); + } + + public String getQueueName() { + return appSchedulingInfo.getQueueName(); + } + + public synchronized RMContainer getRMContainer(ContainerId id) { + return liveContainers.get(id); + } + + protected synchronized void resetReReservations(Priority priority) { + reReservations.setCount(priority, 0); + } + + protected synchronized void addReReservation(Priority priority) { + reReservations.add(priority); + } + + public synchronized int getReReservations(Priority priority) { + return reReservations.count(priority); + } + + /** + * Get total current reservations. + * Used only by unit tests + * @return total current reservations + */ + @Stable + @Private + public synchronized Resource getCurrentReservation() { + return currentReservation; + } + + public Queue getQueue() { + return queue; + } + + public synchronized void updateResourceRequests( + List requests) { + if (!isStopped) { + appSchedulingInfo.updateResourceRequests(requests); + } + } + + public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { + // Cleanup all scheduling information + isStopped = true; + appSchedulingInfo.stop(rmAppAttemptFinalState); + } + + public synchronized boolean isStopped() { + return isStopped; + } + + /** + * Get the list of reserved containers + * @return All of the reserved containers. + */ + public synchronized List getReservedContainers() { + List reservedContainers = new ArrayList(); + for (Map.Entry> e : + this.reservedContainers.entrySet()) { + reservedContainers.addAll(e.getValue().values()); + } + return reservedContainers; + } + + public synchronized RMContainer reserve(SchedulerNode node, Priority priority, + RMContainer rmContainer, Container container) { + // Create RMContainer if necessary + if (rmContainer == null) { + rmContainer = + new RMContainerImpl(container, getApplicationAttemptId(), + node.getNodeID(), rmContext.getDispatcher().getEventHandler(), + rmContext.getContainerAllocationExpirer()); + + Resources.addTo(currentReservation, container.getResource()); + + // Reset the re-reservation count + resetReReservations(priority); + } else { + // Note down the re-reservation + addReReservation(priority); + } + rmContainer.handle(new RMContainerReservedEvent(container.getId(), + container.getResource(), node.getNodeID(), priority)); + + Map reservedContainers = + this.reservedContainers.get(priority); + if (reservedContainers == null) { + reservedContainers = new HashMap(); + this.reservedContainers.put(priority, reservedContainers); + } + reservedContainers.put(node.getNodeID(), rmContainer); + + LOG.info("Application " + getApplicationId() + + " reserved container " + rmContainer + + " on node " + node + ", currently has " + reservedContainers.size() + + " at priority " + priority + + "; currentReservation " + currentReservation.getMemory()); + + return rmContainer; + } + + /** + * Has the application reserved the given node at the + * given priority? + * @param node node to be checked + * @param priority priority of reserved container + * @return true is reserved, false if not + */ + public synchronized boolean isReserved(SchedulerNode node, Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + if (reservedContainers != null) { + return reservedContainers.containsKey(node.getNodeID()); + } + return false; + } + + public synchronized void setHeadroom(Resource globalLimit) { + this.resourceLimit = globalLimit; + } + + /** + * Get available headroom in terms of resources for the application's user. + * @return available resource headroom + */ + public synchronized Resource getHeadroom() { + // Corner case to deal with applications being slightly over-limit + if (resourceLimit.getMemory() < 0) { + resourceLimit.setMemory(0); + } + + return resourceLimit; + } + + public synchronized int getNumReservedContainers(Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + return (reservedContainers == null) ? 0 : reservedContainers.size(); + } + + @SuppressWarnings("unchecked") + public synchronized void containerLaunchedOnNode(ContainerId containerId, + NodeId nodeId) { + // Inform the container + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + // Some unknown container sneaked into the system. Kill it. + rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); + return; + } + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.LAUNCHED)); + } + + public synchronized void showRequests() { + if (LOG.isDebugEnabled()) { + for (Priority priority : getPriorities()) { + Map requests = getResourceRequests(priority); + if (requests != null) { + LOG.debug("showRequests:" + " application=" + getApplicationId() + + " headRoom=" + getHeadroom() + + " currentConsumption=" + currentConsumption.getMemory()); + for (ResourceRequest request : requests.values()) { + LOG.debug("showRequests:" + " application=" + getApplicationId() + + " request=" + request); + } + } + } + } + } + + public Resource getCurrentConsumption() { + return currentConsumption; + } + + public synchronized List pullNewlyAllocatedContainers() { + List returnContainerList = new ArrayList( + newlyAllocatedContainers.size()); + for (RMContainer rmContainer : newlyAllocatedContainers) { + rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), + RMContainerEventType.ACQUIRED)); + returnContainerList.add(rmContainer.getContainer()); + } + newlyAllocatedContainers.clear(); + return returnContainerList; + } + + public synchronized void updateBlacklist( + List blacklistAdditions, List blacklistRemovals) { + if (!isStopped) { + this.appSchedulingInfo.updateBlacklist( + blacklistAdditions, blacklistRemovals); + } + } + + public boolean isBlacklisted(String resourceName) { + return this.appSchedulingInfo.isBlacklisted(resourceName); + } + + public synchronized void addSchedulingOpportunity(Priority priority) { + schedulingOpportunities.setCount(priority, + schedulingOpportunities.count(priority) + 1); + } + + public synchronized void subtractSchedulingOpportunity(Priority priority) { + int count = schedulingOpportunities.count(priority) - 1; + this.schedulingOpportunities.setCount(priority, Math.max(count, 0)); + } + + /** + * Return the number of times the application has been given an opportunity + * to schedule a task at the given priority since the last time it + * successfully did so. + */ + public synchronized int getSchedulingOpportunities(Priority priority) { + return schedulingOpportunities.count(priority); + } + + /** + * Should be called when an application has successfully scheduled a container, + * or when the scheduling locality threshold is relaxed. + * Reset various internal counters which affect delay scheduling + * + * @param priority The priority of the container scheduled. + */ + public synchronized void resetSchedulingOpportunities(Priority priority) { + resetSchedulingOpportunities(priority, System.currentTimeMillis()); + } + // used for continuous scheduling + public synchronized void resetSchedulingOpportunities(Priority priority, + long currentTimeMs) { + lastScheduledContainer.put(priority, currentTimeMs); + schedulingOpportunities.setCount(priority, 0); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 2974b9dc05..05872f9a31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -72,4 +73,11 @@ public abstract class SchedulerNode { * @return total resources on the node. */ public abstract Resource getTotalResource(); + + /** + * Get the ID of the node which contains both its hostname and port. + * @return the ID of the node + */ + public abstract NodeId getNodeID(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index b93965cdc3..7f51126fec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -18,22 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -41,194 +35,39 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import com.google.common.collect.HashMultiset; -import com.google.common.collect.Multiset; - /** - * Represents an Application from the viewpoint of the scheduler. - * Each running Application in the RM corresponds to one instance - * of this class. + * Represents an application attempt from the viewpoint of the FIFO or Capacity + * scheduler. */ -@SuppressWarnings("unchecked") @Private @Unstable public class FiCaSchedulerApp extends SchedulerApplication { private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class); - private final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - - private final AppSchedulingInfo appSchedulingInfo; - private final Queue queue; - - private final Resource currentConsumption = recordFactory - .newRecordInstance(Resource.class); - private Resource resourceLimit = recordFactory - .newRecordInstance(Resource.class); - - private Map liveContainers = - new HashMap(); - private List newlyAllocatedContainers = - new ArrayList(); - - final Map> reservedContainers = - new HashMap>(); - - private boolean isStopped = false; - private final Set containersToPreempt = new HashSet(); - /** - * Count how many times the application has been given an opportunity - * to schedule a task at each priority. Each time the scheduler - * asks the application for a task at this priority, it is incremented, - * and each time the application successfully schedules a task, it - * is reset to 0. - */ - Multiset schedulingOpportunities = HashMultiset.create(); - - Multiset reReservations = HashMultiset.create(); - - Resource currentReservation = recordFactory - .newRecordInstance(Resource.class); - - private final RMContext rmContext; public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { - this.rmContext = rmContext; - this.appSchedulingInfo = - new AppSchedulingInfo(applicationAttemptId, user, queue, - activeUsersManager); - this.queue = queue; - } - - public ApplicationId getApplicationId() { - return this.appSchedulingInfo.getApplicationId(); - } - - @Override - public ApplicationAttemptId getApplicationAttemptId() { - return this.appSchedulingInfo.getApplicationAttemptId(); - } - - public String getUser() { - return this.appSchedulingInfo.getUser(); - } - - public synchronized void updateResourceRequests( - List requests) { - if (!isStopped) { - this.appSchedulingInfo.updateResourceRequests(requests); - } - } - - public synchronized void updateBlacklist( - List blacklistAdditions, List blacklistRemovals) { - if (!isStopped) { - this.appSchedulingInfo.updateBlacklist( - blacklistAdditions, blacklistRemovals); - } - } - - public Map getResourceRequests(Priority priority) { - return this.appSchedulingInfo.getResourceRequests(priority); - } - - public int getNewContainerId() { - return this.appSchedulingInfo.getNewContainerId(); - } - - public Collection getPriorities() { - return this.appSchedulingInfo.getPriorities(); - } - - public ResourceRequest getResourceRequest(Priority priority, String resourceName) { - return this.appSchedulingInfo.getResourceRequest(priority, resourceName); - } - - public synchronized int getTotalRequiredResources(Priority priority) { - return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers(); - } - - public Resource getResource(Priority priority) { - return this.appSchedulingInfo.getResource(priority); - } - - public boolean isBlacklisted(String resourceName) { - return this.appSchedulingInfo.isBlacklisted(resourceName); - } - - /** - * Is this application pending? - * @return true if it is else false. - */ - @Override - public boolean isPending() { - return this.appSchedulingInfo.isPending(); - } - - public synchronized boolean isStopped() { - return this.isStopped; - } - - public String getQueueName() { - return this.appSchedulingInfo.getQueueName(); - } - - /** - * Get the list of live containers - * @return All of the live containers - */ - @Override - public synchronized Collection getLiveContainers() { - return new ArrayList(liveContainers.values()); - } - - public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { - // Cleanup all scheduling information - this.isStopped = true; - this.appSchedulingInfo.stop(rmAppAttemptFinalState); - } - - public synchronized void containerLaunchedOnNode(ContainerId containerId, - NodeId nodeId) { - // Inform the container - RMContainer rmContainer = - getRMContainer(containerId); - if (rmContainer == null) { - // Some unknown container sneaked into the system. Kill it. - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); - return; - } - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.LAUNCHED)); + super(applicationAttemptId, user, queue, activeUsersManager, rmContext); } synchronized public boolean containerCompleted(RMContainer rmContainer, @@ -310,133 +149,6 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, return rmContainer; } - - synchronized public List pullNewlyAllocatedContainers() { - List returnContainerList = new ArrayList( - newlyAllocatedContainers.size()); - for (RMContainer rmContainer : newlyAllocatedContainers) { - rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), - RMContainerEventType.ACQUIRED)); - returnContainerList.add(rmContainer.getContainer()); - } - newlyAllocatedContainers.clear(); - return returnContainerList; - } - - public Resource getCurrentConsumption() { - return this.currentConsumption; - } - - synchronized public void showRequests() { - if (LOG.isDebugEnabled()) { - for (Priority priority : getPriorities()) { - Map requests = getResourceRequests(priority); - if (requests != null) { - LOG.debug("showRequests:" + " application=" + getApplicationId() + - " headRoom=" + getHeadroom() + - " currentConsumption=" + currentConsumption.getMemory()); - for (ResourceRequest request : requests.values()) { - LOG.debug("showRequests:" + " application=" + getApplicationId() - + " request=" + request); - } - } - } - } - } - - public synchronized RMContainer getRMContainer(ContainerId id) { - return liveContainers.get(id); - } - - synchronized public void resetSchedulingOpportunities(Priority priority) { - this.schedulingOpportunities.setCount(priority, 0); - } - - synchronized public void addSchedulingOpportunity(Priority priority) { - this.schedulingOpportunities.setCount(priority, - schedulingOpportunities.count(priority) + 1); - } - - synchronized public void subtractSchedulingOpportunity(Priority priority) { - int count = schedulingOpportunities.count(priority) - 1; - this.schedulingOpportunities.setCount(priority, Math.max(count, 0)); - } - - /** - * @param priority Target priority - * @return the number of times the application has been given an opportunity - * to schedule a task at the given priority since the last time it - * successfully did so. - */ - synchronized public int getSchedulingOpportunities(Priority priority) { - return this.schedulingOpportunities.count(priority); - } - - synchronized void resetReReservations(Priority priority) { - this.reReservations.setCount(priority, 0); - } - - synchronized void addReReservation(Priority priority) { - this.reReservations.add(priority); - } - - synchronized public int getReReservations(Priority priority) { - return this.reReservations.count(priority); - } - - public synchronized int getNumReservedContainers(Priority priority) { - Map reservedContainers = - this.reservedContainers.get(priority); - return (reservedContainers == null) ? 0 : reservedContainers.size(); - } - - /** - * Get total current reservations. - * Used only by unit tests - * @return total current reservations - */ - @Stable - @Private - public synchronized Resource getCurrentReservation() { - return currentReservation; - } - - public synchronized RMContainer reserve(FiCaSchedulerNode node, Priority priority, - RMContainer rmContainer, Container container) { - // Create RMContainer if necessary - if (rmContainer == null) { - rmContainer = - new RMContainerImpl(container, getApplicationAttemptId(), - node.getNodeID(), rmContext.getDispatcher().getEventHandler(), - rmContext.getContainerAllocationExpirer()); - - Resources.addTo(currentReservation, container.getResource()); - - // Reset the re-reservation count - resetReReservations(priority); - } else { - // Note down the re-reservation - addReReservation(priority); - } - rmContainer.handle(new RMContainerReservedEvent(container.getId(), - container.getResource(), node.getNodeID(), priority)); - - Map reservedContainers = - this.reservedContainers.get(priority); - if (reservedContainers == null) { - reservedContainers = new HashMap(); - this.reservedContainers.put(priority, reservedContainers); - } - reservedContainers.put(node.getNodeID(), rmContainer); - - LOG.info("Application " + getApplicationId() - + " reserved container " + rmContainer - + " on node " + node + ", currently has " + reservedContainers.size() - + " at priority " + priority - + "; currentReservation " + currentReservation.getMemory()); - - return rmContainer; - } public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) { Map reservedContainers = @@ -470,22 +182,6 @@ public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) return false; } - /** - * Has the application reserved the given node at the - * given priority? - * @param node node to be checked - * @param priority priority of reserved container - * @return true is reserved, false if not - */ - public synchronized boolean isReserved(FiCaSchedulerNode node, Priority priority) { - Map reservedContainers = - this.reservedContainers.get(priority); - if (reservedContainers != null) { - return reservedContainers.containsKey(node.getNodeID()); - } - return false; - } - public synchronized float getLocalityWaitFactor( Priority priority, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) @@ -497,41 +193,6 @@ public synchronized float getLocalityWaitFactor( return Math.min(((float)requiredResources / clusterNodes), 1.0f); } - /** - * Get the list of reserved containers - * @return All of the reserved containers. - */ - @Override - public synchronized List getReservedContainers() { - List reservedContainers = new ArrayList(); - for (Map.Entry> e : - this.reservedContainers.entrySet()) { - reservedContainers.addAll(e.getValue().values()); - } - return reservedContainers; - } - - public synchronized void setHeadroom(Resource globalLimit) { - this.resourceLimit = globalLimit; - } - - /** - * Get available headroom in terms of resources for the application's user. - * @return available resource headroom - */ - public synchronized Resource getHeadroom() { - // Corner case to deal with applications being slightly over-limit - if (resourceLimit.getMemory() < 0) { - resourceLimit.setMemory(0); - } - - return resourceLimit; - } - - public Queue getQueue() { - return queue; - } - public Resource getTotalPendingRequests() { Resource ret = Resource.newInstance(0, 0); for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index 8b5d454305..caf2a97d71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -18,10 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; @@ -30,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -38,92 +34,39 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.HashMultiset; -import com.google.common.collect.Multiset; - +/** + * Represents an application attempt from the viewpoint of the Fair Scheduler. + */ @Private @Unstable public class FSSchedulerApp extends SchedulerApplication { private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class); - private final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - - private final AppSchedulingInfo appSchedulingInfo; private AppSchedulable appSchedulable; - private final Queue queue; - - private final Resource currentConsumption = recordFactory - .newRecordInstance(Resource.class); - private Resource resourceLimit = recordFactory - .newRecordInstance(Resource.class); - - private Map liveContainers - = new HashMap(); - private List newlyAllocatedContainers = - new ArrayList(); - - final Map> reservedContainers = - new HashMap>(); final Map preemptionMap = new HashMap(); - - /** - * Count how many times the application has been given an opportunity - * to schedule a task at each priority. Each time the scheduler - * asks the application for a task at this priority, it is incremented, - * and each time the application successfully schedules a task, it - * is reset to 0. - */ - Multiset schedulingOpportunities = HashMultiset.create(); - Multiset reReservations = HashMultiset.create(); - - Resource currentReservation = recordFactory - .newRecordInstance(Resource.class); - - private final RMContext rmContext; public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { - this.rmContext = rmContext; - this.appSchedulingInfo = - new AppSchedulingInfo(applicationAttemptId, user, queue, - activeUsersManager); - this.queue = queue; + super(applicationAttemptId, user, queue, activeUsersManager, rmContext); } - public ApplicationId getApplicationId() { - return appSchedulingInfo.getApplicationId(); - } - - @Override - public ApplicationAttemptId getApplicationAttemptId() { - return appSchedulingInfo.getApplicationAttemptId(); - } - public void setAppSchedulable(AppSchedulable appSchedulable) { this.appSchedulable = appSchedulable; } @@ -132,83 +75,6 @@ public AppSchedulable getAppSchedulable() { return appSchedulable; } - public String getUser() { - return appSchedulingInfo.getUser(); - } - - public synchronized void updateResourceRequests( - List requests) { - this.appSchedulingInfo.updateResourceRequests(requests); - } - - public Map getResourceRequests(Priority priority) { - return appSchedulingInfo.getResourceRequests(priority); - } - - public int getNewContainerId() { - return appSchedulingInfo.getNewContainerId(); - } - - public Collection getPriorities() { - return appSchedulingInfo.getPriorities(); - } - - public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) { - return appSchedulingInfo.getResourceRequest(priority, nodeAddress); - } - - public synchronized int getTotalRequiredResources(Priority priority) { - return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers(); - } - - public Resource getResource(Priority priority) { - return appSchedulingInfo.getResource(priority); - } - - /** - * Is this application pending? - * @return true if it is else false. - */ - @Override - public boolean isPending() { - return appSchedulingInfo.isPending(); - } - - public String getQueueName() { - return appSchedulingInfo.getQueueName(); - } - - /** - * Get the list of live containers - * @return All of the live containers - */ - @Override - public synchronized Collection getLiveContainers() { - return new ArrayList(liveContainers.values()); - } - - public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { - // Cleanup all scheduling information - appSchedulingInfo.stop(rmAppAttemptFinalState); - } - - @SuppressWarnings("unchecked") - public synchronized void containerLaunchedOnNode(ContainerId containerId, - NodeId nodeId) { - // Inform the container - RMContainer rmContainer = - getRMContainer(containerId); - if (rmContainer == null) { - // Some unknown container sneaked into the system. Kill it. - rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); - return; - } - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.LAUNCHED)); - } - synchronized public void containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { @@ -241,122 +107,6 @@ synchronized public void containerCompleted(RMContainer rmContainer, preemptionMap.remove(rmContainer); } - synchronized public List pullNewlyAllocatedContainers() { - List returnContainerList = new ArrayList( - newlyAllocatedContainers.size()); - for (RMContainer rmContainer : newlyAllocatedContainers) { - rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), - RMContainerEventType.ACQUIRED)); - returnContainerList.add(rmContainer.getContainer()); - } - newlyAllocatedContainers.clear(); - return returnContainerList; - } - - public Resource getCurrentConsumption() { - return this.currentConsumption; - } - - synchronized public void showRequests() { - if (LOG.isDebugEnabled()) { - for (Priority priority : getPriorities()) { - Map requests = getResourceRequests(priority); - if (requests != null) { - LOG.debug("showRequests:" + " application=" + getApplicationId() + - " headRoom=" + getHeadroom() + - " currentConsumption=" + currentConsumption.getMemory()); - for (ResourceRequest request : requests.values()) { - LOG.debug("showRequests:" + " application=" + getApplicationId() - + " request=" + request); - } - } - } - } - } - - public synchronized RMContainer getRMContainer(ContainerId id) { - return liveContainers.get(id); - } - - synchronized public void addSchedulingOpportunity(Priority priority) { - schedulingOpportunities.setCount(priority, - schedulingOpportunities.count(priority) + 1); - } - - /** - * Return the number of times the application has been given an opportunity - * to schedule a task at the given priority since the last time it - * successfully did so. - */ - synchronized public int getSchedulingOpportunities(Priority priority) { - return schedulingOpportunities.count(priority); - } - - synchronized void resetReReservations(Priority priority) { - reReservations.setCount(priority, 0); - } - - synchronized void addReReservation(Priority priority) { - reReservations.add(priority); - } - - synchronized public int getReReservations(Priority priority) { - return reReservations.count(priority); - } - - public synchronized int getNumReservedContainers(Priority priority) { - Map reservedContainers = - this.reservedContainers.get(priority); - return (reservedContainers == null) ? 0 : reservedContainers.size(); - } - - /** - * Get total current reservations. - * Used only by unit tests - * @return total current reservations - */ - @VisibleForTesting - public synchronized Resource getCurrentReservation() { - return currentReservation; - } - - public synchronized RMContainer reserve(FSSchedulerNode node, Priority priority, - RMContainer rmContainer, Container container) { - // Create RMContainer if necessary - if (rmContainer == null) { - rmContainer = - new RMContainerImpl(container, getApplicationAttemptId(), - node.getNodeID(), rmContext.getDispatcher().getEventHandler(), - rmContext.getContainerAllocationExpirer()); - - Resources.addTo(currentReservation, container.getResource()); - - // Reset the re-reservation count - resetReReservations(priority); - } else { - // Note down the re-reservation - addReReservation(priority); - } - rmContainer.handle(new RMContainerReservedEvent(container.getId(), - container.getResource(), node.getNodeID(), priority)); - - Map reservedContainers = - this.reservedContainers.get(priority); - if (reservedContainers == null) { - reservedContainers = new HashMap(); - this.reservedContainers.put(priority, reservedContainers); - } - reservedContainers.put(node.getNodeID(), rmContainer); - - LOG.info("Application " + getApplicationId() - + " reserved container " + rmContainer - + " on node " + node + ", currently has " + reservedContainers.size() - + " at priority " + priority - + "; currentReservation " + currentReservation.getMemory()); - - return rmContainer; - } - public synchronized void unreserve(FSSchedulerNode node, Priority priority) { Map reservedContainers = this.reservedContainers.get(priority); @@ -376,22 +126,6 @@ public synchronized void unreserve(FSSchedulerNode node, Priority priority) { + priority + "; currentReservation " + currentReservation); } - /** - * Has the application reserved the given node at the - * given priority? - * @param node node to be checked - * @param priority priority of reserved container - * @return true is reserved, false if not - */ - public synchronized boolean isReserved(FSSchedulerNode node, Priority priority) { - Map reservedContainers = - this.reservedContainers.get(priority); - if (reservedContainers != null) { - return reservedContainers.containsKey(node.getNodeID()); - } - return false; - } - public synchronized float getLocalityWaitFactor( Priority priority, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) @@ -402,42 +136,7 @@ public synchronized float getLocalityWaitFactor( // i.e. no point skipping more than clustersize opportunities return Math.min(((float)requiredResources / clusterNodes), 1.0f); } - - /** - * Get the list of reserved containers - * @return All of the reserved containers. - */ - @Override - public synchronized List getReservedContainers() { - List reservedContainers = new ArrayList(); - for (Map.Entry> e : - this.reservedContainers.entrySet()) { - reservedContainers.addAll(e.getValue().values()); - } - return reservedContainers; - } - public synchronized void setHeadroom(Resource globalLimit) { - this.resourceLimit = globalLimit; - } - - /** - * Get available headroom in terms of resources for the application's user. - * @return available resource headroom - */ - public synchronized Resource getHeadroom() { - // Corner case to deal with applications being slightly over-limit - if (resourceLimit.getMemory() < 0) { - resourceLimit.setMemory(0); - } - - return resourceLimit; - } - - public Queue getQueue() { - return queue; - } - /** * Delay scheduling: We often want to prioritize scheduling of node-local * containers over rack-local or off-switch containers. To acheive this @@ -453,26 +152,6 @@ public Queue getQueue() { final Map allowedLocalityLevel = new HashMap< Priority, NodeType>(); - // Time of the last container scheduled at the current allowed level - Map lastScheduledContainer = new HashMap(); - - /** - * Should be called when an application has successfully scheduled a container, - * or when the scheduling locality threshold is relaxed. - * Reset various internal counters which affect delay scheduling - * - * @param priority The priority of the container scheduled. - */ - synchronized public void resetSchedulingOpportunities(Priority priority) { - resetSchedulingOpportunities(priority, System.currentTimeMillis()); - } - // used for continuous scheduling - synchronized public void resetSchedulingOpportunities(Priority priority, - long currentTimeMs) { - lastScheduledContainer.put(priority, currentTimeMs); - schedulingOpportunities.setCount(priority, 0); - } - /** * Return the level at which we are allowed to schedule containers, given the * current size of the cluster and thresholds indicating how many nodes to