YARN-2124. Fixed NPE in ProportionalCapacityPreemptionPolicy. Contributed by Wangda Tan

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1601964 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-06-11 17:30:18 +00:00
parent e98529858e
commit 710a8693e5
6 changed files with 82 additions and 17 deletions

View File

@ -227,6 +227,9 @@ Release 2.5.0 - UNRELEASED
YARN-2128. FairScheduler: Incorrect calculation of amResource usage. YARN-2128. FairScheduler: Incorrect calculation of amResource usage.
(Wei Yan via kasha) (Wei Yan via kasha)
YARN-2124. Fixed NPE in ProportionalCapacityPreemptionPolicy. (Wangda Tan
via jianhe)
Release 2.4.1 - UNRELEASED Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -327,7 +327,7 @@ protected static void validateConfigs(Configuration conf) {
* RMActiveServices handles all the Active services in the RM. * RMActiveServices handles all the Active services in the RM.
*/ */
@Private @Private
class RMActiveServices extends CompositeService { public class RMActiveServices extends CompositeService {
private DelegationTokenRenewer delegationTokenRenewer; private DelegationTokenRenewer delegationTokenRenewer;
private EventHandler<SchedulerEvent> schedulerDispatcher; private EventHandler<SchedulerEvent> schedulerDispatcher;
@ -526,11 +526,9 @@ protected void createPolicyMonitors() {
(PreemptableResourceScheduler) scheduler)); (PreemptableResourceScheduler) scheduler));
for (SchedulingEditPolicy policy : policies) { for (SchedulingEditPolicy policy : policies) {
LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
policy.init(conf, rmContext.getDispatcher().getEventHandler(),
(PreemptableResourceScheduler) scheduler);
// periodically check whether we need to take action to guarantee // periodically check whether we need to take action to guarantee
// constraints // constraints
SchedulingMonitor mon = new SchedulingMonitor(policy); SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
addService(mon); addService(mon);
} }
} else { } else {

View File

@ -21,6 +21,8 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -34,18 +36,29 @@ public class SchedulingMonitor extends AbstractService {
private Thread checkerThread; private Thread checkerThread;
private volatile boolean stopped; private volatile boolean stopped;
private long monitorInterval; private long monitorInterval;
private RMContext rmContext;
public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) { public SchedulingMonitor(RMContext rmContext,
SchedulingEditPolicy scheduleEditPolicy) {
super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")"); super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")");
this.scheduleEditPolicy = scheduleEditPolicy; this.scheduleEditPolicy = scheduleEditPolicy;
this.monitorInterval = scheduleEditPolicy.getMonitoringInterval(); this.rmContext = rmContext;
} }
public long getMonitorInterval() { public long getMonitorInterval() {
return monitorInterval; return monitorInterval;
} }
@VisibleForTesting
public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
return scheduleEditPolicy;
}
@SuppressWarnings("unchecked")
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
(PreemptableResourceScheduler) rmContext.getScheduler());
this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
super.serviceInit(conf); super.serviceInit(conf);
} }

View File

@ -165,6 +165,11 @@ public void init(Configuration config,
observeOnly = config.getBoolean(OBSERVE_ONLY, false); observeOnly = config.getBoolean(OBSERVE_ONLY, false);
rc = scheduler.getResourceCalculator(); rc = scheduler.getResourceCalculator();
} }
@VisibleForTesting
public ResourceCalculator getResourceCalculator() {
return rc;
}
@Override @Override
public void editSchedule(){ public void editSchedule(){

View File

@ -571,4 +571,8 @@ public void clearQueueMetrics(RMApp app) {
.getSchedulerApplications().get(app.getApplicationId()).getQueue() .getSchedulerApplications().get(app.getApplicationId()).getQueue()
.getMetrics().clearQueueMetrics(); .getMetrics().clearQueueMetrics();
} }
public RMActiveServices getRMActiveService() {
return activeServices;
}
} }

View File

@ -17,6 +17,25 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.Deque; import java.util.Deque;
@ -27,12 +46,16 @@
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
@ -52,17 +75,6 @@
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
public class TestProportionalCapacityPreemptionPolicy { public class TestProportionalCapacityPreemptionPolicy {
static final long TS = 3141592653L; static final long TS = 3141592653L;
@ -424,6 +436,36 @@ public void testContainerOrdering(){
assert containers.get(4).equals(rm5); assert containers.get(4).equals(rm5);
} }
@Test
public void testPolicyInitializeAfterSchedulerInitialized() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
@SuppressWarnings("resource")
MockRM rm = new MockRM(conf);
rm.init(conf);
// ProportionalCapacityPreemptionPolicy should be initialized after
// CapacityScheduler initialized. We will
// 1) find SchedulingMonitor from RMActiveService's service list,
// 2) check if ResourceCalculator in policy is null or not.
// If it's not null, we can come to a conclusion that policy initialized
// after scheduler got initialized
for (Service service : rm.getRMActiveService().getServices()) {
if (service instanceof SchedulingMonitor) {
ProportionalCapacityPreemptionPolicy policy =
(ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service)
.getSchedulingEditPolicy();
assertNotNull(policy.getResourceCalculator());
return;
}
}
fail("Failed to find SchedulingMonitor service, please check what happened");
}
static class IsPreemptionRequestFor static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> { extends ArgumentMatcher<ContainerPreemptEvent> {