MAPREDUCE-2794. [MR-279] Incorrect metrics value for AvailableGB per queue per user. (John George via mahadev)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1179936 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c1c0e8c9ea
commit
b8102dbdf8
@ -1531,6 +1531,9 @@ Release 0.23.0 - Unreleased
|
|||||||
MAPREDUCE-2913. Fixed TestMRJobs.testFailingMapper to assert the correct
|
MAPREDUCE-2913. Fixed TestMRJobs.testFailingMapper to assert the correct
|
||||||
TaskCompletionEventStatus. (Jonathan Eagles via vinodkv)
|
TaskCompletionEventStatus. (Jonathan Eagles via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-2794. [MR-279] Incorrect metrics value for AvailableGB per
|
||||||
|
queue per user. (John George via mahadev)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -740,7 +740,7 @@ private synchronized SchedulerApp getApplication(
|
|||||||
|
|
||||||
// Book-keeping
|
// Book-keeping
|
||||||
allocateResource(clusterResource,
|
allocateResource(clusterResource,
|
||||||
application.getUser(), assignedResource);
|
application, assignedResource);
|
||||||
|
|
||||||
// Reset scheduling opportunities
|
// Reset scheduling opportunities
|
||||||
application.resetSchedulingOpportunities(priority);
|
application.resetSchedulingOpportunities(priority);
|
||||||
@ -810,7 +810,7 @@ private synchronized boolean assignToQueue(Resource clusterResource,
|
|||||||
private void setUserResourceLimit(SchedulerApp application,
|
private void setUserResourceLimit(SchedulerApp application,
|
||||||
Resource resourceLimit) {
|
Resource resourceLimit) {
|
||||||
application.setAvailableResourceLimit(resourceLimit);
|
application.setAvailableResourceLimit(resourceLimit);
|
||||||
metrics.setAvailableResourcesToUser(application.getUser(), resourceLimit);
|
metrics.setAvailableResourcesToUser(application.getUser(), application.getHeadroom());
|
||||||
}
|
}
|
||||||
|
|
||||||
private int roundUp(int memory) {
|
private int roundUp(int memory) {
|
||||||
@ -1216,7 +1216,7 @@ public void completedContainer(Resource clusterResource,
|
|||||||
|
|
||||||
// Book-keeping
|
// Book-keeping
|
||||||
releaseResource(clusterResource,
|
releaseResource(clusterResource,
|
||||||
application.getUser(), container.getResource());
|
application, container.getResource());
|
||||||
|
|
||||||
LOG.info("completedContainer" +
|
LOG.info("completedContainer" +
|
||||||
" container=" + container +
|
" container=" + container +
|
||||||
@ -1234,31 +1234,34 @@ public void completedContainer(Resource clusterResource,
|
|||||||
}
|
}
|
||||||
|
|
||||||
synchronized void allocateResource(Resource clusterResource,
|
synchronized void allocateResource(Resource clusterResource,
|
||||||
String userName, Resource resource) {
|
SchedulerApp application, Resource resource) {
|
||||||
// Update queue metrics
|
// Update queue metrics
|
||||||
Resources.addTo(usedResources, resource);
|
Resources.addTo(usedResources, resource);
|
||||||
updateResource(clusterResource);
|
updateResource(clusterResource);
|
||||||
++numContainers;
|
++numContainers;
|
||||||
|
|
||||||
// Update user metrics
|
// Update user metrics
|
||||||
|
String userName = application.getUser();
|
||||||
User user = getUser(userName);
|
User user = getUser(userName);
|
||||||
user.assignContainer(resource);
|
user.assignContainer(resource);
|
||||||
|
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
|
||||||
LOG.info(getQueueName() +
|
LOG.info(getQueueName() +
|
||||||
" used=" + usedResources + " numContainers=" + numContainers +
|
" used=" + usedResources + " numContainers=" + numContainers +
|
||||||
" user=" + userName + " resources=" + user.getConsumedResources());
|
" user=" + userName + " resources=" + user.getConsumedResources());
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void releaseResource(Resource clusterResource,
|
synchronized void releaseResource(Resource clusterResource,
|
||||||
String userName, Resource resource) {
|
SchedulerApp application, Resource resource) {
|
||||||
// Update queue metrics
|
// Update queue metrics
|
||||||
Resources.subtractFrom(usedResources, resource);
|
Resources.subtractFrom(usedResources, resource);
|
||||||
updateResource(clusterResource);
|
updateResource(clusterResource);
|
||||||
--numContainers;
|
--numContainers;
|
||||||
|
|
||||||
// Update user metrics
|
// Update user metrics
|
||||||
|
String userName = application.getUser();
|
||||||
User user = getUser(userName);
|
User user = getUser(userName);
|
||||||
user.releaseContainer(resource);
|
user.releaseContainer(resource);
|
||||||
|
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
|
||||||
|
|
||||||
LOG.info(getQueueName() +
|
LOG.info(getQueueName() +
|
||||||
" used=" + usedResources + " numContainers=" + numContainers +
|
" used=" + usedResources + " numContainers=" + numContainers +
|
||||||
@ -1282,7 +1285,7 @@ private synchronized void updateResource(Resource clusterResource) {
|
|||||||
usedResources.getMemory() / (clusterResource.getMemory() * capacity));
|
usedResources.getMemory() / (clusterResource.getMemory() * capacity));
|
||||||
|
|
||||||
Resource resourceLimit =
|
Resource resourceLimit =
|
||||||
Resources.createResource((int)queueLimit);
|
Resources.createResource(roundUp((int)queueLimit));
|
||||||
metrics.setAvailableResourcesToQueue(
|
metrics.setAvailableResourcesToQueue(
|
||||||
Resources.subtractFrom(resourceLimit, usedResources));
|
Resources.subtractFrom(resourceLimit, usedResources));
|
||||||
}
|
}
|
||||||
@ -1340,7 +1343,7 @@ public void recoverContainer(Resource clusterResource,
|
|||||||
SchedulerApp application, Container container) {
|
SchedulerApp application, Container container) {
|
||||||
// Careful! Locking order is important!
|
// Careful! Locking order is important!
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
allocateResource(clusterResource, application.getUser(), container.getResource());
|
allocateResource(clusterResource, application, container.getResource());
|
||||||
}
|
}
|
||||||
parent.recoverContainer(clusterResource, application, container);
|
parent.recoverContainer(clusterResource, application, container);
|
||||||
|
|
||||||
|
@ -158,6 +158,52 @@ public Container answer(InvocationOnMock invocation)
|
|||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleQueueOneUserMetrics() throws Exception {
|
||||||
|
|
||||||
|
// Manipulate queue 'a'
|
||||||
|
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B));
|
||||||
|
|
||||||
|
// Users
|
||||||
|
final String user_0 = "user_0";
|
||||||
|
|
||||||
|
// Submit applications
|
||||||
|
final ApplicationAttemptId appAttemptId_0 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||||
|
SchedulerApp app_0 =
|
||||||
|
new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
|
||||||
|
a.submitApplication(app_0, user_0, B);
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId_1 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||||
|
SchedulerApp app_1 =
|
||||||
|
new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
|
||||||
|
a.submitApplication(app_1, user_0, B); // same user
|
||||||
|
|
||||||
|
|
||||||
|
// Setup some nodes
|
||||||
|
String host_0 = "host_0";
|
||||||
|
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
||||||
|
|
||||||
|
final int numNodes = 1;
|
||||||
|
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
|
// Setup resource-requests
|
||||||
|
Priority priority = TestUtils.createMockPriority(1);
|
||||||
|
app_0.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, priority,
|
||||||
|
recordFactory)));
|
||||||
|
|
||||||
|
// Start testing...
|
||||||
|
|
||||||
|
// Only 1 container
|
||||||
|
a.assignContainers(clusterResource, node_0);
|
||||||
|
assertEquals(7, a.getMetrics().getAvailableGB());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSingleQueueWithOneUser() throws Exception {
|
public void testSingleQueueWithOneUser() throws Exception {
|
||||||
|
|
||||||
@ -180,6 +226,7 @@ public void testSingleQueueWithOneUser() throws Exception {
|
|||||||
new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
|
new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
|
||||||
a.submitApplication(app_1, user_0, A); // same user
|
a.submitApplication(app_1, user_0, A); // same user
|
||||||
|
|
||||||
|
|
||||||
// Setup some nodes
|
// Setup some nodes
|
||||||
String host_0 = "host_0";
|
String host_0 = "host_0";
|
||||||
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
||||||
@ -207,6 +254,7 @@ public void testSingleQueueWithOneUser() throws Exception {
|
|||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0, a.getMetrics().getReservedGB());
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
assertEquals(1, a.getMetrics().getAllocatedGB());
|
assertEquals(1, a.getMetrics().getAllocatedGB());
|
||||||
|
assertEquals(0, a.getMetrics().getAvailableGB());
|
||||||
|
|
||||||
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
||||||
// you can get one container more than user-limit
|
// you can get one container more than user-limit
|
||||||
@ -273,6 +321,7 @@ public void testSingleQueueWithOneUser() throws Exception {
|
|||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0, a.getMetrics().getReservedGB());
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
assertEquals(0, a.getMetrics().getAllocatedGB());
|
assertEquals(0, a.getMetrics().getAllocatedGB());
|
||||||
|
assertEquals(1, a.getMetrics().getAvailableGB());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -494,6 +543,7 @@ public void testReservation() throws Exception {
|
|||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0, a.getMetrics().getReservedGB());
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
assertEquals(1, a.getMetrics().getAllocatedGB());
|
assertEquals(1, a.getMetrics().getAllocatedGB());
|
||||||
|
assertEquals(0, a.getMetrics().getAvailableGB());
|
||||||
|
|
||||||
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
||||||
// you can get one container more than user-limit
|
// you can get one container more than user-limit
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
@ -81,6 +82,13 @@ private void setupSingleLevelQueues(CapacitySchedulerConfiguration conf) {
|
|||||||
LOG.info("Setup top-level queues a and b");
|
LOG.info("Setup top-level queues a and b");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private SchedulerApp getMockApplication(int appId, String user) {
|
||||||
|
SchedulerApp application = mock(SchedulerApp.class);
|
||||||
|
doReturn(user).when(application).getUser();
|
||||||
|
doReturn(null).when(application).getHeadroom();
|
||||||
|
return application;
|
||||||
|
}
|
||||||
|
|
||||||
private void stubQueueAllocation(final CSQueue queue,
|
private void stubQueueAllocation(final CSQueue queue,
|
||||||
final Resource clusterResource, final SchedulerNode node,
|
final Resource clusterResource, final SchedulerNode node,
|
||||||
final int allocation) {
|
final int allocation) {
|
||||||
@ -100,7 +108,8 @@ public Resource answer(InvocationOnMock invocation) throws Throwable {
|
|||||||
((ParentQueue)queue).allocateResource(clusterResource,
|
((ParentQueue)queue).allocateResource(clusterResource,
|
||||||
allocatedResource);
|
allocatedResource);
|
||||||
} else {
|
} else {
|
||||||
((LeafQueue)queue).allocateResource(clusterResource, "",
|
SchedulerApp app1 = getMockApplication(0, "");
|
||||||
|
((LeafQueue)queue).allocateResource(clusterResource, app1,
|
||||||
allocatedResource);
|
allocatedResource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user