YARN-879. Fixed tests w.r.t o.a.h.y.server.resourcemanager.Application. Contributed by Junping Du.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1530902 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Devarajulu K 2013-10-10 09:47:11 +00:00
parent 7429debd86
commit 22b332ff46
6 changed files with 127 additions and 73 deletions

View File

@ -95,6 +95,9 @@ Release 2.2.1 - UNRELEASED
so that clients don't need to do scheme-mangling. (Omkar Vinit Joshi via so that clients don't need to do scheme-mangling. (Omkar Vinit Joshi via
vinodkv) vinodkv)
YARN-879. Fixed tests w.r.t o.a.h.y.server.resourcemanager.Application.
(Junping Du via devaraj)
Release 2.2.0 - 2013-10-13 Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -34,6 +34,8 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@ -47,11 +49,16 @@
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.Task.State; import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@Private @Private
@ -89,16 +96,23 @@ public class Application {
Resource used = recordFactory.newRecordInstance(Resource.class); Resource used = recordFactory.newRecordInstance(Resource.class);
public Application(String user, ResourceManager resourceManager) { public Application(String user, ResourceManager resourceManager)
throws YarnException {
this(user, "default", resourceManager); this(user, "default", resourceManager);
} }
public Application(String user, String queue, ResourceManager resourceManager) { public Application(String user, String queue, ResourceManager resourceManager)
throws YarnException {
this.user = user; this.user = user;
this.queue = queue; this.queue = queue;
this.resourceManager = resourceManager; this.resourceManager = resourceManager;
this.applicationId = // register an application
this.resourceManager.getClientRMService().getNewApplicationId(); GetNewApplicationRequest request =
Records.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse newApp =
this.resourceManager.getClientRMService().getNewApplication(request);
this.applicationId = newApp.getApplicationId();
this.applicationAttemptId = this.applicationAttemptId =
ApplicationAttemptId.newInstance(this.applicationId, ApplicationAttemptId.newInstance(this.applicationId,
this.numAttempts.getAndIncrement()); this.numAttempts.getAndIncrement());
@ -115,6 +129,10 @@ public String getQueue() {
public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
return applicationId; return applicationId;
} }
public ApplicationAttemptId getApplicationAttemptId() {
return applicationAttemptId;
}
public static String resolve(String hostName) { public static String resolve(String hostName) {
return NetworkTopology.DEFAULT_RACK; return NetworkTopology.DEFAULT_RACK;
@ -132,10 +150,25 @@ public synchronized void submit() throws IOException, YarnException {
ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
context.setApplicationId(this.applicationId); context.setApplicationId(this.applicationId);
context.setQueue(this.queue); context.setQueue(this.queue);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer
= Records.newRecord(ContainerLaunchContext.class);
context.setAMContainerSpec(amContainer);
context.setResource(Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
SubmitApplicationRequest request = recordFactory SubmitApplicationRequest request = recordFactory
.newRecordInstance(SubmitApplicationRequest.class); .newRecordInstance(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(context); request.setApplicationSubmissionContext(context);
final ResourceScheduler scheduler = resourceManager.getResourceScheduler();
resourceManager.getClientRMService().submitApplication(request); resourceManager.getClientRMService().submitApplication(request);
// Notify scheduler
AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
this.applicationAttemptId, this.queue, this.user);
scheduler.handle(appAddedEvent1);
} }
public synchronized void addResourceRequestSpec( public synchronized void addResourceRequestSpec(
@ -267,17 +300,13 @@ public synchronized List<Container> getResources() throws IOException {
} }
// Get resources from the ResourceManager // Get resources from the ResourceManager
resourceManager.getResourceScheduler().allocate(applicationAttemptId, Allocation allocation = resourceManager.getResourceScheduler().allocate(
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(), null, null); applicationAttemptId, new ArrayList<ResourceRequest>(ask),
new ArrayList<ContainerId>(), null, null);
System.out.println("-=======" + applicationAttemptId); System.out.println("-=======" + applicationAttemptId);
System.out.println("----------" + resourceManager.getRMContext().getRMApps() System.out.println("----------" + resourceManager.getRMContext().getRMApps()
.get(applicationId).getRMAppAttempt(applicationAttemptId)); .get(applicationId).getRMAppAttempt(applicationAttemptId));
List<Container> containers = allocation.getContainers();
List<Container> containers = null;
// TODO: Fix
// resourceManager.getRMContext().getRMApps()
// .get(applicationId).getRMAppAttempt(applicationAttemptId)
// .pullNewlyAllocatedContainers();
// Clear state for next interaction with ResourceManager // Clear state for next interaction with ResourceManager
ask.clear(); ask.clear();

View File

@ -56,7 +56,6 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -71,28 +70,27 @@ public class NodeManager implements ContainerManagementProtocol {
final private String rackName; final private String rackName;
final private NodeId nodeId; final private NodeId nodeId;
final private Resource capability; final private Resource capability;
final private ResourceManager resourceManager;
Resource available = recordFactory.newRecordInstance(Resource.class); Resource available = recordFactory.newRecordInstance(Resource.class);
Resource used = recordFactory.newRecordInstance(Resource.class); Resource used = recordFactory.newRecordInstance(Resource.class);
final ResourceTrackerService resourceTrackerService; final ResourceTrackerService resourceTrackerService;
final FiCaSchedulerNode schedulerNode;
final Map<ApplicationId, List<Container>> containers = final Map<ApplicationId, List<Container>> containers =
new HashMap<ApplicationId, List<Container>>(); new HashMap<ApplicationId, List<Container>>();
final Map<Container, ContainerStatus> containerStatusMap = final Map<Container, ContainerStatus> containerStatusMap =
new HashMap<Container, ContainerStatus>(); new HashMap<Container, ContainerStatus>();
public NodeManager(String hostName, int containerManagerPort, int httpPort, public NodeManager(String hostName, int containerManagerPort, int httpPort,
String rackName, Resource capability, String rackName, Resource capability,
ResourceTrackerService resourceTrackerService, RMContext rmContext) ResourceManager resourceManager)
throws IOException, YarnException { throws IOException, YarnException {
this.containerManagerAddress = hostName + ":" + containerManagerPort; this.containerManagerAddress = hostName + ":" + containerManagerPort;
this.nodeHttpAddress = hostName + ":" + httpPort; this.nodeHttpAddress = hostName + ":" + httpPort;
this.rackName = rackName; this.rackName = rackName;
this.resourceTrackerService = resourceTrackerService; this.resourceTrackerService = resourceManager.getResourceTrackerService();
this.capability = capability; this.capability = capability;
Resources.addTo(available, capability); Resources.addTo(available, capability);
this.nodeId = NodeId.newInstance(hostName, containerManagerPort); this.nodeId = NodeId.newInstance(hostName, containerManagerPort);
RegisterNodeManagerRequest request = recordFactory RegisterNodeManagerRequest request = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class); .newRecordInstance(RegisterNodeManagerRequest.class);
@ -101,14 +99,8 @@ public NodeManager(String hostName, int containerManagerPort, int httpPort,
request.setNodeId(this.nodeId); request.setNodeId(this.nodeId);
request.setNMVersion(YarnVersionInfo.getVersion()); request.setNMVersion(YarnVersionInfo.getVersion());
resourceTrackerService.registerNodeManager(request); resourceTrackerService.registerNodeManager(request);
this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get( this.resourceManager = resourceManager;
this.nodeId), false); resourceManager.getResourceScheduler().getNodeReport(this.nodeId);
// Sanity check
Assert.assertEquals(capability.getMemory(),
schedulerNode.getAvailableResource().getMemory());
Assert.assertEquals(capability.getVirtualCores(),
schedulerNode.getAvailableResource().getVirtualCores());
} }
public String getHostName() { public String getHostName() {
@ -220,9 +212,11 @@ synchronized public StartContainersResponse startContainers(
synchronized public void checkResourceUsage() { synchronized public void checkResourceUsage() {
LOG.info("Checking resource usage for " + containerManagerAddress); LOG.info("Checking resource usage for " + containerManagerAddress);
Assert.assertEquals(available.getMemory(), Assert.assertEquals(available.getMemory(),
schedulerNode.getAvailableResource().getMemory()); resourceManager.getResourceScheduler().getNodeReport(
this.nodeId).getAvailableResource().getMemory());
Assert.assertEquals(used.getMemory(), Assert.assertEquals(used.getMemory(),
schedulerNode.getUsedResource().getMemory()); resourceManager.getResourceScheduler().getNodeReport(
this.nodeId).getUsedResource().getMemory());
} }
@Override @Override
@ -232,9 +226,9 @@ synchronized public StopContainersResponse stopContainers(StopContainersRequest
String applicationId = String applicationId =
String.valueOf(containerID.getApplicationAttemptId() String.valueOf(containerID.getApplicationAttemptId()
.getApplicationId().getId()); .getApplicationId().getId());
// Mark the container as COMPLETE // Mark the container as COMPLETE
List<Container> applicationContainers = containers.get(applicationId); List<Container> applicationContainers = containers.get(containerID.getApplicationAttemptId()
.getApplicationId());
for (Container c : applicationContainers) { for (Container c : applicationContainers) {
if (c.getId().compareTo(containerID) == 0) { if (c.getId().compareTo(containerID) == 0) {
ContainerStatus containerStatus = containerStatusMap.get(c); ContainerStatus containerStatus = containerStatusMap.get(c);

View File

@ -34,7 +34,11 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -62,13 +66,18 @@ public void tearDown() throws Exception {
registerNode(String hostName, int containerManagerPort, int httpPort, registerNode(String hostName, int containerManagerPort, int httpPort,
String rackName, Resource capability) throws IOException, String rackName, Resource capability) throws IOException,
YarnException { YarnException {
return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
hostName, containerManagerPort, httpPort, rackName, capability, new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
resourceManager.getResourceTrackerService(), resourceManager hostName, containerManagerPort, httpPort, rackName, capability,
.getRMContext()); resourceManager);
NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext()
.getRMNodes().get(nm.getNodeId()));
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
return nm;
} }
// @Test @Test
public void testResourceAllocation() throws IOException, public void testResourceAllocation() throws IOException,
YarnException { YarnException {
LOG.info("--- START: testResourceAllocation ---"); LOG.info("--- START: testResourceAllocation ---");
@ -80,14 +89,12 @@ public void testResourceAllocation() throws IOException,
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 =
registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(memory, 1)); Resources.createResource(memory, 1));
nm1.heartbeat();
// Register node2 // Register node2
String host2 = "host2"; String host2 = "host2";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 =
registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(memory/2, 1)); Resources.createResource(memory/2, 1));
nm2.heartbeat();
// Submit an application // Submit an application
Application application = new Application("user1", resourceManager); Application application = new Application("user1", resourceManager);
@ -105,23 +112,22 @@ public void testResourceAllocation() throws IOException,
Task t1 = new Task(application, priority1, new String[] {host1, host2}); Task t1 = new Task(application, priority1, new String[] {host1, host2});
application.addTask(t1); application.addTask(t1);
final int memory2 = 2048; final int memory2 = 2048;
Resource capability2 = Resources.createResource(memory2, 1); Resource capability2 = Resources.createResource(memory2, 1);
Priority priority0 = Priority priority0 =
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); // higher org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); // higher
application.addResourceRequestSpec(priority0, capability2); application.addResourceRequestSpec(priority0, capability2);
// Send resource requests to the scheduler // Send resource requests to the scheduler
application.schedule(); application.schedule();
// Send a heartbeat to kick the tires on the Scheduler // Send a heartbeat to kick the tires on the Scheduler
nm1.heartbeat(); nodeUpdate(nm1);
// Get allocations from the scheduler // Get allocations from the scheduler
application.schedule(); application.schedule();
nm1.heartbeat();
checkResourceUsage(nm1, nm2); checkResourceUsage(nm1, nm2);
LOG.info("Adding new tasks..."); LOG.info("Adding new tasks...");
@ -137,18 +143,13 @@ public void testResourceAllocation() throws IOException,
checkResourceUsage(nm1, nm2); checkResourceUsage(nm1, nm2);
// Send a heartbeat to kick the tires on the Scheduler // Send a heartbeat to kick the tires on the Scheduler
LOG.info("Sending hb from host2"); nodeUpdate(nm2);
nm2.heartbeat(); nodeUpdate(nm1);
LOG.info("Sending hb from host1");
nm1.heartbeat();
// Get allocations from the scheduler // Get allocations from the scheduler
LOG.info("Trying to allocate..."); LOG.info("Trying to allocate...");
application.schedule(); application.schedule();
nm1.heartbeat();
nm2.heartbeat();
checkResourceUsage(nm1, nm2); checkResourceUsage(nm1, nm2);
// Complete tasks // Complete tasks
@ -157,13 +158,23 @@ public void testResourceAllocation() throws IOException,
application.finishTask(t2); application.finishTask(t2);
application.finishTask(t3); application.finishTask(t3);
// Send heartbeat // Notify scheduler application is finished.
nm1.heartbeat(); AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
nm2.heartbeat(); application.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
resourceManager.getResourceScheduler().handle(appRemovedEvent1);
checkResourceUsage(nm1, nm2); checkResourceUsage(nm1, nm2);
LOG.info("--- END: testResourceAllocation ---"); LOG.info("--- END: testResourceAllocation ---");
} }
private void nodeUpdate(
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1) {
RMNode node = resourceManager.getRMContext().getRMNodes().get(nm1.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
resourceManager.getResourceScheduler().handle(nodeUpdate);
}
@Test @Test
public void testNodeHealthReportIsNotNull() throws Exception{ public void testNodeHealthReportIsNotNull() throws Exception{

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@ -111,6 +112,8 @@ public void setUp() throws Exception {
conf.setClass(YarnConfiguration.RM_SCHEDULER, conf.setClass(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class, ResourceScheduler.class); CapacityScheduler.class, ResourceScheduler.class);
resourceManager.init(conf); resourceManager.init(conf);
resourceManager.getRMContainerTokenSecretManager().rollMasterKey();
resourceManager.getRMNMTokenSecretManager().rollMasterKey();
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
} }
@ -156,13 +159,18 @@ public void testConfValidation() throws Exception {
registerNode(String hostName, int containerManagerPort, int httpPort, registerNode(String hostName, int containerManagerPort, int httpPort,
String rackName, Resource capability) String rackName, Resource capability)
throws IOException, YarnException { throws IOException, YarnException {
return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
hostName, containerManagerPort, httpPort, rackName, capability, new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
resourceManager.getResourceTrackerService(), resourceManager hostName, containerManagerPort, httpPort, rackName, capability,
.getRMContext()); resourceManager);
} NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext()
.getRMNodes().get(nm.getNodeId()));
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
return nm;
}
// @Test @Test
public void testCapacityScheduler() throws Exception { public void testCapacityScheduler() throws Exception {
LOG.info("--- START: testCapacityScheduler ---"); LOG.info("--- START: testCapacityScheduler ---");
@ -172,14 +180,12 @@ public void testCapacityScheduler() throws Exception {
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(4 * GB, 1)); Resources.createResource(4 * GB, 1));
nm_0.heartbeat();
// Register node2 // Register node2
String host_1 = "host_1"; String host_1 = "host_1";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 =
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(2 * GB, 1)); Resources.createResource(2 * GB, 1));
nm_1.heartbeat();
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority priority_0 =
@ -227,9 +233,13 @@ public void testCapacityScheduler() throws Exception {
// Send a heartbeat to kick the tires on the Scheduler // Send a heartbeat to kick the tires on the Scheduler
LOG.info("Kick!"); LOG.info("Kick!");
nm_0.heartbeat(); // task_0_0 and task_1_0 allocated, used=4G
nm_1.heartbeat(); // nothing allocated // task_0_0 and task_1_0 allocated, used=4G
nodeUpdate(nm_0);
// nothing allocated
nodeUpdate(nm_1);
// Get allocations from the scheduler // Get allocations from the scheduler
application_0.schedule(); // task_0_0 application_0.schedule(); // task_0_0
checkApplicationResourceUsage(1 * GB, application_0); checkApplicationResourceUsage(1 * GB, application_0);
@ -237,9 +247,6 @@ public void testCapacityScheduler() throws Exception {
application_1.schedule(); // task_1_0 application_1.schedule(); // task_1_0
checkApplicationResourceUsage(3 * GB, application_1); checkApplicationResourceUsage(3 * GB, application_1);
nm_0.heartbeat();
nm_1.heartbeat();
checkNodeResourceUsage(4*GB, nm_0); // task_0_0 (1G) and task_1_0 (3G) checkNodeResourceUsage(4*GB, nm_0); // task_0_0 (1G) and task_1_0 (3G)
checkNodeResourceUsage(0*GB, nm_1); // no tasks, 2G available checkNodeResourceUsage(0*GB, nm_1); // no tasks, 2G available
@ -259,10 +266,12 @@ public void testCapacityScheduler() throws Exception {
// Send a heartbeat to kick the tires on the Scheduler // Send a heartbeat to kick the tires on the Scheduler
LOG.info("Sending hb from " + nm_0.getHostName()); LOG.info("Sending hb from " + nm_0.getHostName());
nm_0.heartbeat(); // nothing new, used=4G // nothing new, used=4G
nodeUpdate(nm_0);
LOG.info("Sending hb from " + nm_1.getHostName()); LOG.info("Sending hb from " + nm_1.getHostName());
nm_1.heartbeat(); // task_0_3, used=2G // task_0_1 is prefer as locality, used=2G
nodeUpdate(nm_1);
// Get allocations from the scheduler // Get allocations from the scheduler
LOG.info("Trying to allocate..."); LOG.info("Trying to allocate...");
@ -272,13 +281,22 @@ public void testCapacityScheduler() throws Exception {
application_1.schedule(); application_1.schedule();
checkApplicationResourceUsage(5 * GB, application_1); checkApplicationResourceUsage(5 * GB, application_1);
nm_0.heartbeat(); nodeUpdate(nm_0);
nm_1.heartbeat(); nodeUpdate(nm_1);
checkNodeResourceUsage(4*GB, nm_0); checkNodeResourceUsage(4*GB, nm_0);
checkNodeResourceUsage(2*GB, nm_1); checkNodeResourceUsage(2*GB, nm_1);
LOG.info("--- END: testCapacityScheduler ---"); LOG.info("--- END: testCapacityScheduler ---");
} }
private void nodeUpdate(
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm) {
RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
resourceManager.getResourceScheduler().handle(nodeUpdate);
}
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {

View File

@ -96,8 +96,7 @@ public void tearDown() throws Exception {
YarnException { YarnException {
return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
hostName, containerManagerPort, nmHttpPort, rackName, capability, hostName, containerManagerPort, nmHttpPort, rackName, capability,
resourceManager.getResourceTrackerService(), resourceManager resourceManager);
.getRMContext());
} }
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {