MAPREDUCE-3036. Fixed metrics for reserved resources in CS. Contributed by Robert Evans.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1173453 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b8e8b8da75
commit
339b85b88e
@ -1372,6 +1372,9 @@ Release 0.23.0 - Unreleased
|
|||||||
|
|
||||||
MAPREDUCE-3018. Fixed -file option for streaming. (mahadev via acmurthy)
|
MAPREDUCE-3018. Fixed -file option for streaming. (mahadev via acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-3036. Fixed metrics for reserved resources in CS. (Robert Evans
|
||||||
|
via acmurthy)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -32,10 +32,8 @@
|
|||||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationState;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.util.Self;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.*;
|
import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.*;
|
||||||
|
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -282,4 +280,56 @@ public void unreserveResource(String user, Resource res) {
|
|||||||
parent.unreserveResource(user, res);
|
parent.unreserveResource(user, res);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getAppsSubmitted() {
|
||||||
|
return appsSubmitted.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getAppsRunning() {
|
||||||
|
return appsRunning.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getAppsPending() {
|
||||||
|
return appsPending.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getAppsCompleted() {
|
||||||
|
return appsCompleted.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getAppsKilled() {
|
||||||
|
return appsKilled.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getAppsFailed() {
|
||||||
|
return appsFailed.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getAllocatedGB() {
|
||||||
|
return allocatedGB.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getAllocatedContainers() {
|
||||||
|
return allocatedContainers.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getAvailableGB() {
|
||||||
|
return availableGB.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPendingGB() {
|
||||||
|
return pendingGB.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPendingContainers() {
|
||||||
|
return pendingContainers.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getReservedGB() {
|
||||||
|
return reservedGB.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getReservedContainers() {
|
||||||
|
return reservedContainers.value();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1152,14 +1152,17 @@ private Resource assignContainer(Resource clusterResource, SchedulerNode node,
|
|||||||
|
|
||||||
private void reserve(SchedulerApp application, Priority priority,
|
private void reserve(SchedulerApp application, Priority priority,
|
||||||
SchedulerNode node, RMContainer rmContainer, Container container) {
|
SchedulerNode node, RMContainer rmContainer, Container container) {
|
||||||
rmContainer = application.reserve(node, priority, rmContainer, container);
|
|
||||||
node.reserveResource(application, priority, rmContainer);
|
|
||||||
|
|
||||||
// Update reserved metrics if this is the first reservation
|
// Update reserved metrics if this is the first reservation
|
||||||
if (rmContainer == null) {
|
if (rmContainer == null) {
|
||||||
getMetrics().reserveResource(
|
getMetrics().reserveResource(
|
||||||
application.getUser(), container.getResource());
|
application.getUser(), container.getResource());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Inform the application
|
||||||
|
rmContainer = application.reserve(node, priority, rmContainer, container);
|
||||||
|
|
||||||
|
// Update the node
|
||||||
|
node.reserveResource(application, priority, rmContainer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void unreserve(SchedulerApp application, Priority priority,
|
private void unreserve(SchedulerApp application, Priority priority,
|
||||||
|
@ -202,6 +202,8 @@ public void testSingleQueueWithOneUser() throws Exception {
|
|||||||
assertEquals(1*GB, a.getUsedResources().getMemory());
|
assertEquals(1*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(1, a.getMetrics().getAllocatedGB());
|
||||||
|
|
||||||
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
||||||
// you can get one container more than user-limit
|
// you can get one container more than user-limit
|
||||||
@ -209,12 +211,16 @@ public void testSingleQueueWithOneUser() throws Exception {
|
|||||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(2, a.getMetrics().getAllocatedGB());
|
||||||
|
|
||||||
// Can't allocate 3rd due to user-limit
|
// Can't allocate 3rd due to user-limit
|
||||||
a.assignContainers(clusterResource, node_0);
|
a.assignContainers(clusterResource, node_0);
|
||||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(2, a.getMetrics().getAllocatedGB());
|
||||||
|
|
||||||
// Bump up user-limit-factor, now allocate should work
|
// Bump up user-limit-factor, now allocate should work
|
||||||
a.setUserLimitFactor(10);
|
a.setUserLimitFactor(10);
|
||||||
@ -222,12 +228,16 @@ public void testSingleQueueWithOneUser() throws Exception {
|
|||||||
assertEquals(3*GB, a.getUsedResources().getMemory());
|
assertEquals(3*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(3, a.getMetrics().getAllocatedGB());
|
||||||
|
|
||||||
// One more should work, for app_1, due to user-limit-factor
|
// One more should work, for app_1, due to user-limit-factor
|
||||||
a.assignContainers(clusterResource, node_0);
|
a.assignContainers(clusterResource, node_0);
|
||||||
assertEquals(4*GB, a.getUsedResources().getMemory());
|
assertEquals(4*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(4, a.getMetrics().getAllocatedGB());
|
||||||
|
|
||||||
// Test max-capacity
|
// Test max-capacity
|
||||||
// Now - no more allocs since we are at max-cap
|
// Now - no more allocs since we are at max-cap
|
||||||
@ -236,6 +246,8 @@ public void testSingleQueueWithOneUser() throws Exception {
|
|||||||
assertEquals(4*GB, a.getUsedResources().getMemory());
|
assertEquals(4*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(4, a.getMetrics().getAllocatedGB());
|
||||||
|
|
||||||
// Release each container from app_0
|
// Release each container from app_0
|
||||||
for (RMContainer rmContainer : app_0.getLiveContainers()) {
|
for (RMContainer rmContainer : app_0.getLiveContainers()) {
|
||||||
@ -245,6 +257,8 @@ public void testSingleQueueWithOneUser() throws Exception {
|
|||||||
assertEquals(1*GB, a.getUsedResources().getMemory());
|
assertEquals(1*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(1, a.getMetrics().getAllocatedGB());
|
||||||
|
|
||||||
// Release each container from app_1
|
// Release each container from app_1
|
||||||
for (RMContainer rmContainer : app_1.getLiveContainers()) {
|
for (RMContainer rmContainer : app_1.getLiveContainers()) {
|
||||||
@ -254,6 +268,8 @@ public void testSingleQueueWithOneUser() throws Exception {
|
|||||||
assertEquals(0*GB, a.getUsedResources().getMemory());
|
assertEquals(0*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(0, a.getMetrics().getAllocatedGB());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -473,6 +489,8 @@ public void testReservation() throws Exception {
|
|||||||
assertEquals(1*GB, a.getUsedResources().getMemory());
|
assertEquals(1*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(1, a.getMetrics().getAllocatedGB());
|
||||||
|
|
||||||
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
||||||
// you can get one container more than user-limit
|
// you can get one container more than user-limit
|
||||||
@ -480,6 +498,8 @@ public void testReservation() throws Exception {
|
|||||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(2, a.getMetrics().getAllocatedGB());
|
||||||
|
|
||||||
// Now, reservation should kick in for app_1
|
// Now, reservation should kick in for app_1
|
||||||
a.assignContainers(clusterResource, node_0);
|
a.assignContainers(clusterResource, node_0);
|
||||||
@ -488,6 +508,8 @@ public void testReservation() throws Exception {
|
|||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
|
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
|
||||||
assertEquals(2*GB, node_0.getUsedResource().getMemory());
|
assertEquals(2*GB, node_0.getUsedResource().getMemory());
|
||||||
|
assertEquals(4, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(2, a.getMetrics().getAllocatedGB());
|
||||||
|
|
||||||
// Now free 1 container from app_0 i.e. 1G
|
// Now free 1 container from app_0 i.e. 1G
|
||||||
a.completedContainer(clusterResource, app_0, node_0,
|
a.completedContainer(clusterResource, app_0, node_0,
|
||||||
@ -498,6 +520,8 @@ public void testReservation() throws Exception {
|
|||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
|
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
|
||||||
assertEquals(1*GB, node_0.getUsedResource().getMemory());
|
assertEquals(1*GB, node_0.getUsedResource().getMemory());
|
||||||
|
assertEquals(4, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(1, a.getMetrics().getAllocatedGB());
|
||||||
|
|
||||||
// Now finish another container from app_0 and fulfill the reservation
|
// Now finish another container from app_0 and fulfill the reservation
|
||||||
a.completedContainer(clusterResource, app_0, node_0,
|
a.completedContainer(clusterResource, app_0, node_0,
|
||||||
@ -508,6 +532,8 @@ public void testReservation() throws Exception {
|
|||||||
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0*GB, app_1.getCurrentReservation().getMemory());
|
assertEquals(0*GB, app_1.getCurrentReservation().getMemory());
|
||||||
assertEquals(4*GB, node_0.getUsedResource().getMemory());
|
assertEquals(4*GB, node_0.getUsedResource().getMemory());
|
||||||
|
assertEquals(0, a.getMetrics().getReservedGB());
|
||||||
|
assertEquals(4, a.getMetrics().getAllocatedGB());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
Reference in New Issue
Block a user