diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index eaca930fd4..a35fc51cd3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1426,6 +1426,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3090. Fix MR AM to use ApplicationAttemptId rather than (ApplicationId, startCount) consistently. (acmurthy) + MAPREDUCE-2646. Fixed AMRMProtocol to return containers based on + priority. (Sharad Agarwal and Arun C Murthy via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index ff232104bd..7b75cd1fbd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -586,37 +586,21 @@ private void assign(List allocatedContainers) { private ContainerRequest assign(Container allocated) { ContainerRequest assigned = null; - if (mapResourceReqt != reduceResourceReqt) { - //assign based on size - LOG.info("Assigning based on container size"); - if (allocated.getResource().getMemory() == mapResourceReqt) { - assigned = assignToFailedMap(allocated); - if (assigned == null) { - assigned = assignToMap(allocated); - } - } else if (allocated.getResource().getMemory() == reduceResourceReqt) { - assigned = assignToReduce(allocated); - } - - return assigned; - } - - //container can be given to either map or reduce - //assign based on priority - - //try to assign to earlierFailedMaps if present - assigned = assignToFailedMap(allocated); - - //Assign to reduces before assigning to maps ? - if (assigned == null) { + Priority priority = allocated.getPriority(); + if (PRIORITY_FAST_FAIL_MAP.equals(priority)) { + LOG.info("Assigning container " + allocated + " to fast fail map"); + assigned = assignToFailedMap(allocated); + } else if (PRIORITY_REDUCE.equals(priority)) { + LOG.info("Assigning container " + allocated + " to reduce"); assigned = assignToReduce(allocated); - } - - //try to assign to maps if present - if (assigned == null) { + } else if (PRIORITY_MAP.equals(priority)) { + LOG.info("Assigning container " + allocated + " to map"); assigned = assignToMap(allocated); + } else { + LOG.warn("Container allocated at unwanted priority: " + priority + + ". Returning to RM..."); } - + return assigned; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java index 97c84e4d10..ff054b22ac 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java @@ -43,6 +43,7 @@ * *
  • HTTP uri of the node.
  • *
  • {@link Resource} allocated to the container.
  • + *
  • {@link Priority} at which the container was allocated.
  • *
  • {@link ContainerState} of the container.
  • *
  • * {@link ContainerToken} of the container, used to securely verify @@ -111,6 +112,18 @@ public interface Container extends Comparable { @Private @Unstable void setResource(Resource resource); + + /** + * Get the Priority at which the Container was + * allocated. + * @return Priority at which the Container was + * allocated + */ + Priority getPriority(); + + @Private + @Unstable + void setPriority(Priority priority); /** * Get the current ContainerState of the container. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java index 388cad0f4d..39b15e0cef 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTokenProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.util.ProtoUtils; @@ -48,6 +50,7 @@ public class ContainerPBImpl extends ProtoBase implements Contai private ContainerId containerId = null; private NodeId nodeId = null; private Resource resource = null; + private Priority priority = null; private ContainerToken containerToken = null; private ContainerStatus containerStatus = null; @@ -84,6 +87,11 @@ private void mergeLocalToBuilder() { builder.getResource())) { builder.setResource(convertToProtoFormat(this.resource)); } + if (this.priority != null && + !((PriorityPBImpl) this.priority).getProto().equals( + builder.getPriority())) { + builder.setPriority(convertToProtoFormat(this.priority)); + } if (this.containerToken != null && !((ContainerTokenPBImpl) this.containerToken).getProto().equals( builder.getContainerToken())) { @@ -211,6 +219,29 @@ public void setResource(Resource resource) { builder.clearResource(); this.resource = resource; } + + @Override + public Priority getPriority() { + ContainerProtoOrBuilder p = viaProto ? proto : builder; + if (this.priority != null) { + return this.priority; + } + if (!p.hasPriority()) { + return null; + } + this.priority = convertFromProtoFormat(p.getPriority()); + return this.priority; + } + + @Override + public void setPriority(Priority priority) { + maybeInitBuilder(); + if (priority == null) { + builder.clearPriority(); + } + this.priority = priority; + } + @Override public ContainerToken getContainerToken() { ContainerProtoOrBuilder p = viaProto ? proto : builder; @@ -285,6 +316,14 @@ private ResourceProto convertToProtoFormat(Resource t) { return ((ResourcePBImpl)t).getProto(); } + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { + return new PriorityPBImpl(p); + } + + private PriorityProto convertToProtoFormat(Priority p) { + return ((PriorityPBImpl)p).getProto(); + } + private ContainerTokenPBImpl convertFromProtoFormat(ContainerTokenProto p) { return new ContainerTokenPBImpl(p); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index cd29a9431c..704c710996 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -48,6 +48,10 @@ message ResourceProto { optional int32 memory = 1; } +message PriorityProto { + optional int32 priority = 1; +} + enum ContainerStateProto { C_NEW = 1; C_RUNNING = 2; @@ -66,9 +70,10 @@ message ContainerProto { optional NodeIdProto nodeId = 2; optional string node_http_address = 3; optional ResourceProto resource = 4; - optional ContainerStateProto state = 5; - optional ContainerTokenProto container_token = 6; - optional ContainerStatusProto container_status = 7; + optional PriorityProto priority = 5; + optional ContainerStateProto state = 6; + optional ContainerTokenProto container_token = 7; + optional ContainerStatusProto container_status = 8; } enum ApplicationStateProto { @@ -253,10 +258,6 @@ message ContainerStatusProto { //////////////////////////////////////////////////////////////////////// ////// From common////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// -message PriorityProto { - optional int32 priority = 1; -} - message StringURLMapProto { optional string key = 1; optional URLProto value = 2; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java index 2caafdc19a..9df37ee03a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java @@ -184,32 +184,24 @@ public static ContainerId newContainerId(RecordFactory recordFactory, return id; } - public static Container clone(Container c) { - Container container = recordFactory.newRecordInstance(Container.class); - container.setId(c.getId()); - container.setContainerToken(c.getContainerToken()); - container.setNodeId(c.getNodeId()); - container.setNodeHttpAddress(c.getNodeHttpAddress()); - container.setResource(c.getResource()); - container.setState(c.getState()); - return container; - } - public static Container newContainer(RecordFactory recordFactory, ApplicationAttemptId appAttemptId, int containerId, NodeId nodeId, - String nodeHttpAddress, Resource resource) { + String nodeHttpAddress, Resource resource, Priority priority) { ContainerId containerID = newContainerId(recordFactory, appAttemptId, containerId); - return newContainer(containerID, nodeId, nodeHttpAddress, resource); + return newContainer(containerID, nodeId, nodeHttpAddress, + resource, priority); } public static Container newContainer(ContainerId containerId, - NodeId nodeId, String nodeHttpAddress, Resource resource) { + NodeId nodeId, String nodeHttpAddress, + Resource resource, Priority priority) { Container container = recordFactory.newRecordInstance(Container.class); container.setId(containerId); container.setNodeId(nodeId); container.setNodeHttpAddress(nodeHttpAddress); container.setResource(resource); + container.setPriority(priority); container.setState(ContainerState.NEW); ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class); containerStatus.setContainerId(containerId); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 0753e3795c..e67d371ee6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1046,19 +1046,20 @@ boolean canAssign(SchedulerApp application, Priority priority, } private Container getContainer(RMContainer rmContainer, - SchedulerApp application, SchedulerNode node, Resource capability) { + SchedulerApp application, SchedulerNode node, + Resource capability, Priority priority) { return (rmContainer != null) ? rmContainer.getContainer() : - createContainer(application, node, capability); + createContainer(application, node, capability, priority); } public Container createContainer(SchedulerApp application, SchedulerNode node, - Resource capability) { + Resource capability, Priority priority) { Container container = BuilderUtils.newContainer(this.recordFactory, application.getApplicationAttemptId(), application.getNewContainerId(), - node.getNodeID(), - node.getHttpAddress(), capability); + node.getNodeID(), node.getHttpAddress(), + capability, priority); // If security is enabled, send the container-tokens too. if (UserGroupInformation.isSecurityEnabled()) { @@ -1099,7 +1100,7 @@ private Resource assignContainer(Resource clusterResource, SchedulerNode node, // Create the container if necessary Container container = - getContainer(rmContainer, application, node, capability); + getContainer(rmContainer, application, node, capability, priority); // Can we allocate a container on this node? int availableContainers = diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 752b81ce5d..dfa4965d5d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -528,7 +528,8 @@ private int assignContainer(SchedulerNode node, SchedulerApp application, application.getApplicationAttemptId(), application.getNewContainerId(), node.getRMNode().getNodeID(), - node.getRMNode().getHttpAddress(), capability); + node.getRMNode().getHttpAddress(), + capability, priority); // If security is enabled, send the container-tokens too. if (UserGroupInformation.isSecurityEnabled()) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index a7b5d02c91..72ade5c1da 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -184,7 +185,9 @@ synchronized public StartContainerResponse startContainer( Container container = BuilderUtils.newContainer(containerLaunchContext.getContainerId(), this.nodeId, nodeHttpAddress, - containerLaunchContext.getResource()); + containerLaunchContext.getResource(), + null // DKDC - Doesn't matter + ); applicationContainers.add(container); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 70c4d1a1f4..639daf9e5a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -135,7 +135,8 @@ public Container answer(InvocationOnMock invocation) Container container = TestUtils.getMockContainer( containerId, ((SchedulerNode)(invocation.getArguments()[1])).getNodeID(), - (Resource)(invocation.getArguments()[2])); + (Resource)(invocation.getArguments()[2]), + ((Priority)invocation.getArguments()[3])); return container; } } @@ -143,7 +144,9 @@ public Container answer(InvocationOnMock invocation) when(queue).createContainer( any(SchedulerApp.class), any(SchedulerNode.class), - any(Resource.class)); + any(Resource.class), + any(Priority.class) + ); // 2. Stub out LeafQueue.parent.completedContainer CSQueue parent = queue.getParent(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 84dbbac867..8459e51d5c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -161,11 +161,13 @@ public static ContainerId getMockContainerId(SchedulerApp application) { } public static Container getMockContainer( - ContainerId containerId, NodeId nodeId, Resource resource) { + ContainerId containerId, NodeId nodeId, + Resource resource, Priority priority) { Container container = mock(Container.class); when(container.getId()).thenReturn(containerId); when(container.getNodeId()).thenReturn(nodeId); when(container.getResource()).thenReturn(resource); + when(container.getPriority()).thenReturn(priority); return container; } }