YARN-1008. MiniYARNCluster with multiple nodemanagers, all nodes have same key for allocations. (tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1517563 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-08-26 15:39:11 +00:00
parent 18e805677d
commit 942e2ebaa5
21 changed files with 156 additions and 37 deletions

View File

@ -96,6 +96,9 @@ Release 2.1.1-beta - UNRELEASED
YARN-1094. Fixed a blocker with RM restart code because of which RM crashes
when try to recover an existing app. (vinodkv)
YARN-1008. MiniYARNCluster with multiple nodemanagers, all nodes have same
key for allocations. (tucu)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -132,6 +132,15 @@ public class YarnConfiguration extends Configuration {
RM_PREFIX + "scheduler.client.thread-count";
public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50;
/** If the port should be included or not in the node name. The node name
* is used by the scheduler for resource requests allocation location
* matching. Typically this is just the hostname, using the port is needed
* when using minicluster and specific NM are required.*/
public static final String RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME =
YARN_PREFIX + "scheduler.include-port-in-node-name";
public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME =
false;
/**
* Enable periodic monitor threads.
* @see #RM_SCHEDULER_MONITOR_POLICIES

View File

@ -281,7 +281,7 @@ synchronized private void allocateNodeLocal(
// Update future requirements
nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
if (nodeLocalRequest.getNumContainers() == 0) {
this.requests.get(priority).remove(node.getHostName());
this.requests.get(priority).remove(node.getNodeName());
}
ResourceRequest rackLocalRequest = requests.get(priority).get(

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* Represents a YARN Cluster Node from the viewpoint of the scheduler.
@ -30,10 +31,17 @@
public abstract class SchedulerNode {
/**
* Get hostname.
* @return hostname
* Get the name of the node for scheduling matching decisions.
* <p/>
* Typically this is the 'hostname' reported by the node, but it could be
* configured to be 'hostname:port' reported by the node via the
* {@link YarnConfiguration#RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} constant.
* The main usecase of this is Yarn minicluster to be able to differentiate
* node manager instances by their port number.
*
* @return name of the node for scheduling matching decisions.
*/
public abstract String getHostName();
public abstract String getNodeName();
/**
* Get rackname.

View File

@ -185,7 +185,8 @@ public Configuration getConf() {
private boolean initialized = false;
private ResourceCalculator calculator;
private boolean usePortForNodeName;
public CapacityScheduler() {}
@Override
@ -256,6 +257,7 @@ public Resource getClusterResources() {
this.minimumAllocation = this.conf.getMinimumAllocation();
this.maximumAllocation = this.conf.getMaximumAllocation();
this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.rmContext = rmContext;
@ -759,7 +761,8 @@ public void handle(SchedulerEvent event) {
}
private synchronized void addNode(RMNode nodeManager) {
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager));
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
usePortForNodeName));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
++numNodeManagers;

View File

@ -338,6 +338,11 @@ public ResourceCalculator getResourceCalculator() {
this);
}
public boolean getUsePortForNodeName() {
return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
}
public void setResourceComparator(
Class<? extends ResourceCalculator> resourceCalculatorClass) {
setClass(

View File

@ -801,7 +801,7 @@ private synchronized FiCaSchedulerApp getApplication(
assignContainers(Resource clusterResource, FiCaSchedulerNode node) {
if(LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getHostName()
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " #applications=" + activeApplications.size());
}
@ -1130,7 +1130,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
// Data-local
ResourceRequest nodeLocalResourceRequest =
application.getResourceRequest(priority, node.getHostName());
application.getResourceRequest(priority, node.getNodeName());
if (nodeLocalResourceRequest != null) {
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
@ -1257,7 +1257,7 @@ boolean canAssign(FiCaSchedulerApp application, Priority priority,
if (type == NodeType.NODE_LOCAL) {
// Now check if we need containers on this host...
ResourceRequest nodeLocalRequest =
application.getResourceRequest(priority, node.getHostName());
application.getResourceRequest(priority, node.getNodeName());
if (nodeLocalRequest != null) {
return nodeLocalRequest.getNumContainers() > 0;
}
@ -1302,7 +1302,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getHostName()
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId().getId()
+ " priority=" + priority.getPriority()
+ " request=" + request + " type=" + type);

View File

@ -59,11 +59,17 @@ public class FiCaSchedulerNode extends SchedulerNode {
new HashMap<ContainerId, RMContainer>();
private final RMNode rmNode;
private final String nodeName;
public FiCaSchedulerNode(RMNode node) {
public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) {
this.rmNode = node;
this.availableResource.setMemory(node.getTotalCapability().getMemory());
this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores());
if (usePortForNodeName) {
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
} else {
nodeName = rmNode.getHostName();
}
}
public RMNode getRMNode() {
@ -79,8 +85,8 @@ public String getHttpAddress() {
}
@Override
public String getHostName() {
return this.rmNode.getHostName();
public String getNodeName() {
return nodeName;
}
@Override

View File

@ -24,9 +24,9 @@ public class FiCaSchedulerUtils {
public static boolean isBlacklisted(FiCaSchedulerApp application,
FiCaSchedulerNode node, Log LOG) {
if (application.isBlacklisted(node.getHostName())) {
if (application.isBlacklisted(node.getNodeName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping 'host' " + node.getHostName() +
LOG.debug("Skipping 'host' " + node.getNodeName() +
" for " + application.getApplicationId() +
" since it has been blacklisted");
}

View File

@ -185,7 +185,7 @@ public Container createContainer(
*/
private void reserve(Priority priority, FSSchedulerNode node,
Container container, boolean alreadyReserved) {
LOG.info("Making reservation: node=" + node.getHostName() +
LOG.info("Making reservation: node=" + node.getNodeName() +
" app_id=" + app.getApplicationId());
if (!alreadyReserved) {
getMetrics().reserveResource(app.getUser(), container.getResource());
@ -309,7 +309,7 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
node.getRackName());
ResourceRequest localRequest = app.getResourceRequest(priority,
node.getHostName());
node.getNodeName());
if (localRequest != null && !localRequest.getRelaxLocality()) {
LOG.warn("Relax locality off is not supported on local request: "
@ -369,7 +369,7 @@ public Resource assignContainer(FSSchedulerNode node) {
public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
ResourceRequest anyRequest = app.getResourceRequest(prio, ResourceRequest.ANY);
ResourceRequest rackRequest = app.getResourceRequest(prio, node.getRackName());
ResourceRequest nodeRequest = app.getResourceRequest(prio, node.getHostName());
ResourceRequest nodeRequest = app.getResourceRequest(prio, node.getNodeName());
return
// There must be outstanding requests at the given priority:

View File

@ -63,10 +63,16 @@ public class FSSchedulerNode extends SchedulerNode {
new HashMap<ContainerId, RMContainer>();
private final RMNode rmNode;
private final String nodeName;
public FSSchedulerNode(RMNode node) {
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
this.rmNode = node;
this.availableResource = Resources.clone(node.getTotalCapability());
if (usePortForNodeName) {
nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
} else {
nodeName = rmNode.getHostName();
}
}
public RMNode getRMNode() {
@ -82,8 +88,8 @@ public String getHttpAddress() {
}
@Override
public String getHostName() {
return rmNode.getHostName();
public String getNodeName() {
return nodeName;
}
@Override

View File

@ -35,7 +35,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -122,6 +121,7 @@ public class FairScheduler implements ResourceScheduler {
private Resource incrAllocation;
private QueueManager queueMgr;
private Clock clock;
private boolean usePortForNodeName;
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
@ -751,7 +751,7 @@ private synchronized void completedContainer(RMContainer rmContainer,
}
private synchronized void addNode(RMNode node) {
nodes.put(node.getNodeID(), new FSSchedulerNode(node));
nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
Resources.addTo(clusterCapacity, node.getTotalCapability());
updateRootQueueMetrics();
@ -1065,7 +1065,8 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
sizeBasedWeight = this.conf.getSizeBasedWeight();
preemptionInterval = this.conf.getPreemptionInterval();
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
usePortForNodeName = this.conf.getUsePortForNodeName();
if (!initialized) {
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;

View File

@ -166,7 +166,12 @@ public int getPreemptionInterval() {
public int getWaitTimeBeforeKill() {
return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
}
public boolean getUsePortForNodeName() {
return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
}
/**
* Parses a resource config value of a form like "1024", "1024 mb",
* or "1024 mb, 3 vcores". If no units are given, megabytes are assumed.

View File

@ -111,6 +111,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
private boolean initialized;
private Resource minimumAllocation;
private Resource maximumAllocation;
private boolean usePortForNodeName;
private Map<ApplicationAttemptId, FiCaSchedulerApp> applications
= new TreeMap<ApplicationAttemptId, FiCaSchedulerApp>();
@ -233,6 +234,9 @@ public Resource getMaximumResourceCapability() {
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
this.usePortForNodeName = conf.getBoolean(
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
conf);
this.activeUsersManager = new ActiveUsersManager(metrics);
@ -490,7 +494,7 @@ private int assignNodeLocalContainers(FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
application.getResourceRequest(priority, node.getHostName());
application.getResourceRequest(priority, node.getNodeName());
if (request != null) {
// Don't allocate on this node if we don't need containers on this rack
ResourceRequest rackRequest =
@ -801,7 +805,8 @@ public List<QueueUserACLInfo> getQueueUserAclInfo() {
}
private synchronized void addNode(RMNode nodeManager) {
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager));
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
usePortForNodeName));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
}

View File

@ -200,15 +200,14 @@ public long getLastHealthReportTime() {
};
private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) {
return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null);
return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123);
}
private static RMNode buildRMNode(int rack, final Resource perNode,
NodeState state, String httpAddr, int hostnum, String hostName) {
NodeState state, String httpAddr, int hostnum, String hostName, int port) {
final String rackName = "rack"+ rack;
final int nid = hostnum;
final String nodeAddr = hostName + ":" + nid;
final int port = 123;
if (hostName == null) {
hostName = "host"+ nid;
}
@ -230,12 +229,17 @@ public static RMNode newNodeInfo(int rack, final Resource perNode) {
}
public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) {
return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null);
return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123);
}
public static RMNode newNodeInfo(int rack, final Resource perNode,
int hostnum, String hostName) {
return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName);
return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, 123);
}
public static RMNode newNodeInfo(int rack, final Resource perNode,
int hostnum, String hostName, int port) {
return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port);
}
}

View File

@ -101,7 +101,7 @@ public NodeManager(String hostName, int containerManagerPort, int httpPort,
request.setNodeId(this.nodeId);
resourceTrackerService.registerNodeManager(request);
this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get(
this.nodeId));
this.nodeId), false);
// Sanity check
Assert.assertEquals(capability.getMemory(),

View File

@ -26,7 +26,6 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.util.HashMap;
@ -126,7 +125,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
throw new Exception();
} catch (Exception e) {
LOG.info("FOOBAR q.assignContainers q=" + queue.getQueueName() +
" alloc=" + allocation + " node=" + node.getHostName());
" alloc=" + allocation + " node=" + node.getNodeName());
}
final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) {

View File

@ -138,7 +138,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
throw new Exception();
} catch (Exception e) {
LOG.info("FOOBAR q.assignContainers q=" + queue.getQueueName() +
" alloc=" + allocation + " node=" + node.getHostName());
" alloc=" + allocation + " node=" + node.getNodeName());
}
final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) {

View File

@ -160,7 +160,7 @@ public static FiCaSchedulerNode getMockNode(
when(rmNode.getHostName()).thenReturn(host);
when(rmNode.getRackName()).thenReturn(rack);
FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode));
FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode, false));
LOG.info("node = " + host + " avail=" + node.getAvailableResource());
return node;
}

View File

@ -2146,4 +2146,54 @@ public void testDRFHierarchicalQueues() throws Exception {
Assert.assertEquals(2, app3.getLiveContainers().size());
Assert.assertEquals(2, app4.getLiveContainers().size());
}
@Test(timeout = 30000)
public void testHostPortNodeName() throws Exception {
scheduler.getConf().setBoolean(YarnConfiguration
.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
scheduler.reinitialize(scheduler.getConf(),
resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024),
1, "127.0.0.1", 1);
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024),
2, "127.0.0.1", 2);
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
"user1", 0);
ResourceRequest nodeRequest = createResourceRequest(1024,
node1.getNodeID().getHost() + ":" + node1.getNodeID().getPort(), 1,
1, true);
ResourceRequest rackRequest = createResourceRequest(1024,
node1.getRackName(), 1, 1, false);
ResourceRequest anyRequest = createResourceRequest(1024,
ResourceRequest.ANY, 1, 1, false);
createSchedulingRequestExistingApplication(nodeRequest, attId1);
createSchedulingRequestExistingApplication(rackRequest, attId1);
createSchedulingRequestExistingApplication(anyRequest, attId1);
scheduler.update();
NodeUpdateSchedulerEvent node1UpdateEvent = new
NodeUpdateSchedulerEvent(node1);
NodeUpdateSchedulerEvent node2UpdateEvent = new
NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
FSSchedulerApp app = scheduler.applications.get(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
assertEquals(0, app.getReservedContainers().size());
}
// then node1 should get the container
scheduler.handle(node1UpdateEvent);
assertEquals(1, app.getLiveContainers().size());
}
}

View File

@ -53,6 +53,21 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
/**
* Embedded Yarn minicluster for testcases that need to interact with a cluster.
* <p/>
* In a real cluster, resource request matching is done using the hostname, and
* by default Yarn minicluster works in the exact same way as a real cluster.
* <p/>
* If a testcase needs to use multiple nodes and exercise resource request
* matching to a specific node, then the property
* {@YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} should be set
* <code>true</code> in the configuration used to initialize the minicluster.
* <p/>
* With this property set to <code>true</code>, the matching will be done using
* the <code>hostname:port</code> of the namenodes. In such case, the AM must
* do resource request using <code>hostname:port</code> as the location.
*/
public class MiniYARNCluster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MiniYARNCluster.class);