YARN-6952. Enable scheduling monitor in FS (Contributed by Yufei Gu via Daniel Templeton)
This commit is contained in:
parent
bbbf0e2a41
commit
218588be77
@ -90,7 +90,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
@ -698,8 +697,7 @@ protected void serviceInit(Configuration configuration) throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// creating monitors that handle preemption
|
createSchedulerMonitors();
|
||||||
createPolicyMonitors();
|
|
||||||
|
|
||||||
masterService = createApplicationMasterService();
|
masterService = createApplicationMasterService();
|
||||||
addService(masterService) ;
|
addService(masterService) ;
|
||||||
@ -800,9 +798,8 @@ protected void serviceStop() throws Exception {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void createPolicyMonitors() {
|
protected void createSchedulerMonitors() {
|
||||||
if (scheduler instanceof PreemptableResourceScheduler
|
if (conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
|
||||||
&& conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {
|
||||||
LOG.info("Loading policy monitors");
|
LOG.info("Loading policy monitors");
|
||||||
List<SchedulingEditPolicy> policies = conf.getInstances(
|
List<SchedulingEditPolicy> policies = conf.getInstances(
|
||||||
|
@ -19,12 +19,12 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
|
||||||
public interface SchedulingEditPolicy {
|
public interface SchedulingEditPolicy {
|
||||||
|
|
||||||
void init(Configuration config, RMContext context,
|
void init(Configuration config, RMContext context,
|
||||||
PreemptableResourceScheduler scheduler);
|
ResourceScheduler scheduler);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method is invoked at regular intervals. Internally the policy is
|
* This method is invoked at regular intervals. Internally the policy is
|
||||||
|
@ -29,7 +29,6 @@
|
|||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
@ -59,8 +58,7 @@ public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void serviceInit(Configuration conf) throws Exception {
|
public void serviceInit(Configuration conf) throws Exception {
|
||||||
scheduleEditPolicy.init(conf, rmContext,
|
scheduleEditPolicy.init(conf, rmContext, rmContext.getScheduler());
|
||||||
(PreemptableResourceScheduler) rmContext.getScheduler());
|
|
||||||
this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
|
this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
@ -150,7 +150,7 @@ public ProportionalCapacityPreemptionPolicy(RMContext context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void init(Configuration config, RMContext context,
|
public void init(Configuration config, RMContext context,
|
||||||
PreemptableResourceScheduler sched) {
|
ResourceScheduler sched) {
|
||||||
LOG.info("Preemption monitor:" + this.getClass().getCanonicalName());
|
LOG.info("Preemption monitor:" + this.getClass().getCanonicalName());
|
||||||
assert null == scheduler : "Unexpected duplicate call to init";
|
assert null == scheduler : "Unexpected duplicate call to init";
|
||||||
if (!(sched instanceof CapacityScheduler)) {
|
if (!(sched instanceof CapacityScheduler)) {
|
||||||
|
@ -20,7 +20,7 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -39,16 +39,16 @@ public abstract class InvariantsChecker implements SchedulingEditPolicy {
|
|||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private RMContext context;
|
private RMContext context;
|
||||||
private PreemptableResourceScheduler scheduler;
|
private ResourceScheduler scheduler;
|
||||||
private boolean throwOnInvariantViolation;
|
private boolean throwOnInvariantViolation;
|
||||||
private long monitoringInterval;
|
private long monitoringInterval;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Configuration config, RMContext rmContext,
|
public void init(Configuration config, RMContext rmContext,
|
||||||
PreemptableResourceScheduler preemptableResourceScheduler) {
|
ResourceScheduler scheduler) {
|
||||||
this.conf = config;
|
this.conf = config;
|
||||||
this.context = rmContext;
|
this.context = rmContext;
|
||||||
this.scheduler = preemptableResourceScheduler;
|
this.scheduler = scheduler;
|
||||||
this.throwOnInvariantViolation =
|
this.throwOnInvariantViolation =
|
||||||
conf.getBoolean(InvariantsChecker.THROW_ON_VIOLATION, false);
|
conf.getBoolean(InvariantsChecker.THROW_ON_VIOLATION, false);
|
||||||
this.monitoringInterval =
|
this.monitoringInterval =
|
||||||
@ -89,7 +89,7 @@ public RMContext getContext() {
|
|||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
public PreemptableResourceScheduler getScheduler() {
|
public ResourceScheduler getScheduler() {
|
||||||
return scheduler;
|
return scheduler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,8 +27,8 @@
|
|||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -42,7 +42,6 @@
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This policy checks at every invocation that a given set of invariants
|
* This policy checks at every invocation that a given set of invariants
|
||||||
@ -78,9 +77,9 @@ public class MetricsInvariantChecker extends InvariantsChecker {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Configuration config, RMContext rmContext,
|
public void init(Configuration config, RMContext rmContext,
|
||||||
PreemptableResourceScheduler preemptableResourceScheduler) {
|
ResourceScheduler scheduler) {
|
||||||
|
|
||||||
super.init(config, rmContext, preemptableResourceScheduler);
|
super.init(config, rmContext, scheduler);
|
||||||
|
|
||||||
this.metricsSystem = DefaultMetricsSystem.instance();
|
this.metricsSystem = DefaultMetricsSystem.instance();
|
||||||
this.queueMetrics =
|
this.queueMetrics =
|
||||||
|
Loading…
Reference in New Issue
Block a user