YARN-184. Remove unnecessary locking in fair scheduler, and address findbugs excludes. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1410826 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-11-18 05:00:56 +00:00
parent 6f80a2a76c
commit 576f96f0ac
4 changed files with 134 additions and 276 deletions

View File

@ -101,6 +101,9 @@ Release 2.0.3-alpha - Unreleased
YARN-169. Update log4j.appender.EventCounter to use
org.apache.hadoop.log.metrics.EventCounter (Anthony Rojas via tomwhite)
YARN-184. Remove unnecessary locking in fair scheduler, and address
findbugs excludes. (sandyr via tucu)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES

View File

@ -236,54 +236,5 @@
<Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl" />
<Bug pattern="EI_EXPOSE_REP2" />
</Match>
<!-- MAPREDUCE-4439 -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerEventLog" />
<Method name="shutdown" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler" />
<Field name="initialized" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler" />
<Field name="preemptionEnabled" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler" />
<Field name="queueMgr" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler" />
<Field name="sizeBasedWeight" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler" />
<Field name="userAsDefaultQueue" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler" />
<Field name="logDisabled" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler" />
<Field name="queueMaxAppsDefault" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler" />
<Field name="userMaxAppsDefault" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter>

View File

@ -94,7 +94,7 @@ public class FairScheduler implements ResourceScheduler {
protected long UPDATE_INTERVAL = 500;
// Whether to use username in place of "default" queue name
private boolean userAsDefaultQueue = false;
private volatile boolean userAsDefaultQueue = false;
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
@ -136,7 +136,11 @@ public class FairScheduler implements ResourceScheduler {
private FairSchedulerEventLog eventLog; // Machine-readable event log
protected boolean assignMultiple; // Allocate multiple containers per heartbeat
protected int maxAssign; // Max containers to assign per heartbeat
public FairScheduler() {
clock = new SystemClock();
queueMgr = new QueueManager(this);
}
public FairSchedulerConfiguration getConf() {
return conf;
@ -166,7 +170,7 @@ private RMContainer getRMContainer(ContainerId containerId) {
*/
private class UpdateThread implements Runnable {
public void run() {
while (initialized) {
while (true) {
try {
Thread.sleep(UPDATE_INTERVAL);
update();
@ -256,7 +260,7 @@ boolean isStarvedForFairShare(FSQueueSchedulable sched) {
* If such queues exist, compute how many tasks of each type need to be
* preempted and then select the right ones using preemptTasks.
*/
protected void preemptTasksIfNecessary() {
protected synchronized void preemptTasksIfNecessary() {
if (!preemptionEnabled) {
return;
}
@ -414,7 +418,8 @@ public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
}
public double getAppWeight(AppSchedulable app) {
// synchronized for sizeBasedWeight
public synchronized double getAppWeight(AppSchedulable app) {
if (!app.getRunnable()) {
// Job won't launch tasks, but don't return 0 to avoid division errors
return 1.0;
@ -885,7 +890,6 @@ public void recover(RMState state) throws Exception {
this.conf = new FairSchedulerConfiguration(conf);
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;
this.clock = new SystemClock();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
minimumAllocation = this.conf.getMinimumMemoryAllocation();
@ -897,21 +901,21 @@ public void recover(RMState state) throws Exception {
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
Thread updateThread = new Thread(new UpdateThread());
updateThread.start();
initialized = true;
sizeBasedWeight = this.conf.getSizeBasedWeight();
queueMgr = new QueueManager(this);
try {
queueMgr.initialize();
}
catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e);
}
Thread updateThread = new Thread(new UpdateThread());
updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true);
updateThread.start();
} else {
this.conf = new FairSchedulerConfiguration(conf);
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();

View File

@ -24,7 +24,6 @@
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -71,40 +70,6 @@ public class QueueManager {
private final FairScheduler scheduler;
// Minimum resource allocation for each queue
private Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
// Maximum amount of resources per queue
private Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
// Sharing weights for each queue
private Map<String, Double> queueWeights = new HashMap<String, Double>();
// Max concurrent running applications for each queue and for each user; in addition,
// for users that have no max specified, we use the userMaxJobsDefault.
private Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
private Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
private int userMaxAppsDefault = Integer.MAX_VALUE;
private int queueMaxAppsDefault = Integer.MAX_VALUE;
// ACL's for each queue. Only specifies non-default ACL's from configuration.
private Map<String, Map<QueueACL, AccessControlList>> queueAcls =
new HashMap<String, Map<QueueACL, AccessControlList>>();
// Min share preemption timeout for each queue in seconds. If a job in the queue
// waits this long without receiving its guaranteed share, it is allowed to
// preempt other jobs' tasks.
private Map<String, Long> minSharePreemptionTimeouts =
new HashMap<String, Long>();
// Default min share preemption timeout for queues where it is not set
// explicitly.
private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
// Preemption timeout for jobs below fair share in seconds. If a job remains
// below half its fair share for this long, it is allowed to preempt tasks.
private long fairSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
private Object allocFile; // Path to XML file containing allocations. This
// is either a URL to specify a classpath resource
// (if the fair-scheduler.xml on the classpath is
@ -113,40 +78,12 @@ public class QueueManager {
private Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private volatile QueueManagerInfo info = new QueueManagerInfo();
private long lastReloadAttempt; // Last time we tried to reload the queues file
private long lastSuccessfulReload; // Last time we successfully reloaded queues
private boolean lastReloadAttemptFailed = false;
// Monitor object for minQueueResources
private Object minQueueResourcesMO = new Object();
//Monitor object for maxQueueResources
private Object maxQueueResourcesMO = new Object();
//Monitor object for queueMaxApps
private Object queueMaxAppsMO = new Object();
//Monitor object for userMaxApps
private Object userMaxAppsMO = new Object();
//Monitor object for queueWeights
private Object queueWeightsMO = new Object();
//Monitor object for minSharePreemptionTimeouts
private Object minSharePreemptionTimeoutsMO = new Object();
//Monitor object for queueAcls
private Object queueAclsMO = new Object();
//Monitor object for userMaxAppsDefault
private Object userMaxAppsDefaultMO = new Object();
//Monitor object for queueMaxAppsDefault
private Object queueMaxAppsDefaultMO = new Object();
//Monitor object for defaultSchedulingMode
private Object defaultSchedulingModeMO = new Object();
public QueueManager(FairScheduler scheduler) {
this.scheduler = scheduler;
}
@ -180,9 +117,7 @@ public FSQueue getQueue(String name) {
FSQueue queue = queues.get(name);
if (queue == null) {
queue = new FSQueue(scheduler, name);
synchronized (defaultSchedulingModeMO){
queue.setSchedulingMode(defaultSchedulingMode);
}
queue.setSchedulingMode(info.defaultSchedulingMode);
queues.put(name, queue);
}
return queue;
@ -272,6 +207,8 @@ public void reloadAllocs() throws IOException, ParserConfigurationException,
new HashMap<String, Map<QueueACL, AccessControlList>>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
// Remember all queue names so we can display them on web UI, etc.
@ -389,16 +326,10 @@ else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
synchronized (this) {
setMinResources(minQueueResources);
setMaxResources(maxQueueResources);
setQueueMaxApps(queueMaxApps);
setUserMaxApps(userMaxApps);
setQueueWeights(queueWeights);
setUserMaxAppsDefault(userMaxAppsDefault);
setQueueMaxAppsDefault(queueMaxAppsDefault);
setDefaultSchedulingMode(defaultSchedulingMode);
setMinSharePreemptionTimeouts(minSharePreemptionTimeouts);
setQueueAcls(queueAcls);
info = new QueueManagerInfo(minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
for (String name: queueNamesInAllocFile) {
FSQueue queue = getQueue(name);
if (queueModes.containsKey(name)) {
@ -428,182 +359,87 @@ private SchedulingMode parseSchedulingMode(String text)
* @return the cap set on this queue, or 0 if not set.
*/
public Resource getMinResources(String queue) {
synchronized(minQueueResourcesMO) {
if (minQueueResources.containsKey(queue)) {
return minQueueResources.get(queue);
} else {
return Resources.createResource(0);
}
Resource minQueueResource = info.minQueueResources.get(queue);
if (minQueueResource != null) {
return minQueueResource;
} else {
return Resources.createResource(0);
}
}
private void setMinResources(Map<String, Resource> resources) {
synchronized (minQueueResourcesMO) {
minQueueResources = resources;
}
}
/**
* Get the maximum resource allocation for the given queue.
* @return the cap set on this queue, or Integer.MAX_VALUE if not set.
*/
public Resource getMaxResources(String queueName) {
synchronized (maxQueueResourcesMO) {
if (maxQueueResources.containsKey(queueName)) {
return maxQueueResources.get(queueName);
} else {
return Resources.createResource(Integer.MAX_VALUE);
}
Resource maxQueueResource = info.maxQueueResources.get(queueName);
if (maxQueueResource != null) {
return maxQueueResource;
} else {
return Resources.createResource(Integer.MAX_VALUE);
}
}
private void setMaxResources(Map<String, Resource> resources) {
synchronized (maxQueueResourcesMO) {
maxQueueResources = resources;
}
}
/**
* Add an app in the appropriate queue
*/
public synchronized void addApp(FSSchedulerApp app) {
getQueue(app.getQueueName()).addApp(app);
}
/**
* Remove an app
*/
public synchronized void removeApp(FSSchedulerApp app) {
getQueue(app.getQueueName()).removeApp(app);
}
/**
* Get a collection of all queues
*/
public Collection<FSQueue> getQueues() {
synchronized (queues) {
return queues.values();
return new ArrayList<FSQueue>(queues.values());
}
}
/**
* Get all queue names that have been seen either in the allocation file or in
* a submitted app.
*/
public synchronized Collection<String> getQueueNames() {
List<String> list = new ArrayList<String>();
for (FSQueue queue: getQueues()) {
list.add(queue.getName());
}
Collections.sort(list);
return list;
}
public int getUserMaxApps(String user) {
synchronized (userMaxAppsMO) {
if (userMaxApps.containsKey(user)) {
return userMaxApps.get(user);
} else {
return getUserMaxAppsDefault();
}
// save current info in case it gets changed under us
QueueManagerInfo info = this.info;
if (info.userMaxApps.containsKey(user)) {
return info.userMaxApps.get(user);
} else {
return info.userMaxAppsDefault;
}
}
private void setUserMaxApps(Map<String, Integer> userApps) {
synchronized (userMaxAppsMO) {
userMaxApps = userApps;
}
}
private int getUserMaxAppsDefault() {
synchronized (userMaxAppsDefaultMO){
return userMaxAppsDefault;
}
}
private void setUserMaxAppsDefault(int userMaxApps) {
synchronized (userMaxAppsDefaultMO){
userMaxAppsDefault = userMaxApps;
}
}
public int getQueueMaxApps(String queue) {
synchronized (queueMaxAppsMO) {
if (queueMaxApps.containsKey(queue)) {
return queueMaxApps.get(queue);
} else {
return getQueueMaxAppsDefault();
}
// save current info in case it gets changed under us
QueueManagerInfo info = this.info;
if (info.queueMaxApps.containsKey(queue)) {
return info.queueMaxApps.get(queue);
} else {
return info.queueMaxAppsDefault;
}
}
private void setQueueMaxApps(Map<String, Integer> queueApps) {
synchronized (queueMaxAppsMO) {
queueMaxApps = queueApps;
}
}
private int getQueueMaxAppsDefault(){
synchronized (queueMaxAppsDefaultMO) {
return queueMaxAppsDefault;
}
}
private void setQueueMaxAppsDefault(int queueMaxApps){
synchronized(queueMaxAppsDefaultMO) {
queueMaxAppsDefault = queueMaxApps;
}
}
private void setDefaultSchedulingMode(SchedulingMode schedulingMode){
synchronized(defaultSchedulingModeMO) {
defaultSchedulingMode = schedulingMode;
}
}
public double getQueueWeight(String queue) {
synchronized (queueWeightsMO) {
if (queueWeights.containsKey(queue)) {
return queueWeights.get(queue);
} else {
return 1.0;
}
Double weight = info.queueWeights.get(queue);
if (weight != null) {
return weight;
} else {
return 1.0;
}
}
private void setQueueWeights(Map<String, Double> weights) {
synchronized (queueWeightsMO) {
queueWeights = weights;
}
}
/**
* Get a queue's min share preemption timeout, in milliseconds. This is the
* time after which jobs in the queue may kill other queues' tasks if they
* are below their min share.
*/
public long getMinSharePreemptionTimeout(String queueName) {
synchronized (minSharePreemptionTimeoutsMO) {
if (minSharePreemptionTimeouts.containsKey(queueName)) {
return minSharePreemptionTimeouts.get(queueName);
}
// save current info in case it gets changed under us
QueueManagerInfo info = this.info;
if (info.minSharePreemptionTimeouts.containsKey(queueName)) {
return info.minSharePreemptionTimeouts.get(queueName);
}
return defaultMinSharePreemptionTimeout;
return info.defaultMinSharePreemptionTimeout;
}
private void setMinSharePreemptionTimeouts(
Map<String, Long> sharePreemptionTimeouts){
synchronized (minSharePreemptionTimeoutsMO) {
minSharePreemptionTimeouts = sharePreemptionTimeouts;
}
}
/**
* Get the fair share preemption, in milliseconds. This is the time
* after which any job may kill other jobs' tasks if it is below half
* its fair share.
*/
public long getFairSharePreemptionTimeout() {
return fairSharePreemptionTimeout;
return info.fairSharePreemptionTimeout;
}
/**
@ -612,10 +448,9 @@ public long getFairSharePreemptionTimeout() {
*/
public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
HashMap<QueueACL, AccessControlList> out = new HashMap<QueueACL, AccessControlList>();
synchronized (queueAclsMO) {
if (queueAcls.containsKey(queue)) {
out.putAll(queueAcls.get(queue));
}
Map<QueueACL, AccessControlList> queueAcl = info.queueAcls.get(queue);
if (queueAcl != null) {
out.putAll(queueAcl);
}
if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
@ -626,9 +461,74 @@ public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
return out;
}
private void setQueueAcls(Map<String, Map<QueueACL, AccessControlList>> queue) {
synchronized (queueAclsMO) {
queueAcls = queue;
static class QueueManagerInfo {
// Minimum resource allocation for each queue
public final Map<String, Resource> minQueueResources;
// Maximum amount of resources per queue
public final Map<String, Resource> maxQueueResources;
// Sharing weights for each queue
public final Map<String, Double> queueWeights;
// Max concurrent running applications for each queue and for each user; in addition,
// for users that have no max specified, we use the userMaxJobsDefault.
public final Map<String, Integer> queueMaxApps;
public final Map<String, Integer> userMaxApps;
public final int userMaxAppsDefault;
public final int queueMaxAppsDefault;
// ACL's for each queue. Only specifies non-default ACL's from configuration.
public final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
// Min share preemption timeout for each queue in seconds. If a job in the queue
// waits this long without receiving its guaranteed share, it is allowed to
// preempt other jobs' tasks.
public final Map<String, Long> minSharePreemptionTimeouts;
// Default min share preemption timeout for queues where it is not set
// explicitly.
public final long defaultMinSharePreemptionTimeout;
// Preemption timeout for jobs below fair share in seconds. If a job remains
// below half its fair share for this long, it is allowed to preempt tasks.
public final long fairSharePreemptionTimeout;
public final SchedulingMode defaultSchedulingMode;
public QueueManagerInfo(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
Map<String, Double> queueWeights, int userMaxAppsDefault,
int queueMaxAppsDefault, SchedulingMode defaultSchedulingMode,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) {
this.minQueueResources = minQueueResources;
this.maxQueueResources = maxQueueResources;
this.queueMaxApps = queueMaxApps;
this.userMaxApps = userMaxApps;
this.queueWeights = queueWeights;
this.userMaxAppsDefault = userMaxAppsDefault;
this.queueMaxAppsDefault = queueMaxAppsDefault;
this.defaultSchedulingMode = defaultSchedulingMode;
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
this.queueAcls = queueAcls;
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
}
public QueueManagerInfo() {
minQueueResources = new HashMap<String, Resource>();
maxQueueResources = new HashMap<String, Resource>();
queueWeights = new HashMap<String, Double>();
queueMaxApps = new HashMap<String, Integer>();
userMaxApps = new HashMap<String, Integer>();
userMaxAppsDefault = Integer.MAX_VALUE;
queueMaxAppsDefault = Integer.MAX_VALUE;
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
minSharePreemptionTimeouts = new HashMap<String, Long>();
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
fairSharePreemptionTimeout = Long.MAX_VALUE;
defaultSchedulingMode = SchedulingMode.FAIR;
}
}
}