diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9953214b13..74aa6a47f2 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -32,3 +32,5 @@ Release 2.1.0-alpha - Unreleased BUG FIXES + YARN-12. Fix findbugs warnings in FairScheduler. (Junping Du via acmurthy) + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java index 1f2205b77b..68d86fdeb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java @@ -129,15 +129,17 @@ synchronized void log(String eventType, Object... params) { /** * Flush and close the log. */ - void shutdown() { + synchronized void shutdown() { try { if (appender != null) appender.close(); - } catch (Exception e) {} - logDisabled = true; + } catch (Exception e) { + LOG.error("Failed to close fair scheduler event log", e); + logDisabled = true; + } } - boolean isEnabled() { + synchronized boolean isEnabled() { return !logDisabled; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 8785891f5e..38e57b4068 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -116,7 +116,37 @@ public class QueueManager { private long lastReloadAttempt; // Last time we tried to reload the queues file private long lastSuccessfulReload; // Last time we successfully reloaded queues private boolean lastReloadAttemptFailed = false; - + + // Monitor object for minQueueResources + private Object minQueueResourcesMO = new Object(); + + //Monitor object for maxQueueResources + private Object maxQueueResourcesMO = new Object(); + + //Monitor object for queueMaxApps + private Object queueMaxAppsMO = new Object(); + + //Monitor object for userMaxApps + private Object userMaxAppsMO = new Object(); + + //Monitor object for queueWeights + private Object queueWeightsMO = new Object(); + + //Monitor object for minSharePreemptionTimeouts + private Object minSharePreemptionTimeoutsMO = new Object(); + + //Monitor object for queueAcls + private Object queueAclsMO = new Object(); + + //Monitor object for userMaxAppsDefault + private Object userMaxAppsDefaultMO = new Object(); + + //Monitor object for queueMaxAppsDefault + private Object queueMaxAppsDefaultMO = new Object(); + + //Monitor object for defaultSchedulingMode + private Object defaultSchedulingModeMO = new Object(); + public QueueManager(FairScheduler scheduler) { this.scheduler = scheduler; } @@ -145,21 +175,27 @@ public void initialize() throws IOException, SAXException, /** * Get a queue by name, creating it if necessary */ - public synchronized FSQueue getQueue(String name) { - FSQueue queue = queues.get(name); - if (queue == null) { - queue = new FSQueue(scheduler, name); - queue.setSchedulingMode(defaultSchedulingMode); - queues.put(name, queue); + public FSQueue getQueue(String name) { + synchronized (queues) { + FSQueue queue = queues.get(name); + if (queue == null) { + queue = new FSQueue(scheduler, name); + synchronized (defaultSchedulingModeMO){ + queue.setSchedulingMode(defaultSchedulingMode); + } + queues.put(name, queue); + } + return queue; } - return queue; } /** * Return whether a queue exists already. */ - public synchronized boolean exists(String name) { - return queues.containsKey(name); + public boolean exists(String name) { + synchronized (queues) { + return queues.containsKey(name); + } } /** @@ -353,16 +389,16 @@ else if ("defaultQueueSchedulingMode".equals(element.getTagName())) { // Commit the reload; also create any queue defined in the alloc file // if it does not already exist, so it can be displayed on the web UI. synchronized(this) { - this.minQueueResources = minQueueResources; - this.maxQueueResources = maxQueueResources; - this.queueMaxApps = queueMaxApps; - this.userMaxApps = userMaxApps; - this.queueWeights = queueWeights; - this.userMaxAppsDefault = userMaxAppsDefault; - this.queueMaxAppsDefault = queueMaxAppsDefault; - this.defaultSchedulingMode = defaultSchedulingMode; - this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; - this.queueAcls = queueAcls; + setMinResources(minQueueResources); + setMaxResources(maxQueueResources); + setQueueMaxApps(queueMaxApps); + setUserMaxApps(userMaxApps); + setQueueWeights(queueWeights); + setUserMaxAppsDefault(userMaxAppsDefault); + setQueueMaxAppsDefault(queueMaxAppsDefault); + setDefaultSchedulingMode(defaultSchedulingMode); + setMinSharePreemptionTimeouts(minSharePreemptionTimeouts); + setQueueAcls(queueAcls); for (String name: queueNamesInAllocFile) { FSQueue queue = getQueue(name); if (queueModes.containsKey(name)) { @@ -392,25 +428,40 @@ private SchedulingMode parseSchedulingMode(String text) * @return the cap set on this queue, or 0 if not set. */ public Resource getMinResources(String queue) { - if (minQueueResources.containsKey(queue)) { - return minQueueResources.get(queue); - } else{ - return Resources.createResource(0); + synchronized(minQueueResourcesMO) { + if (minQueueResources.containsKey(queue)) { + return minQueueResources.get(queue); + } else{ + return Resources.createResource(0); + } } } + private void setMinResources(Map resources) { + synchronized(minQueueResourcesMO) { + minQueueResources = resources; + } + } /** * Get the maximum resource allocation for the given queue. * @return the cap set on this queue, or Integer.MAX_VALUE if not set. */ Resource getMaxResources(String queueName) { - if (maxQueueResources.containsKey(queueName)) { - return maxQueueResources.get(queueName); - } else { - return Resources.createResource(Integer.MAX_VALUE); + synchronized (maxQueueResourcesMO) { + if (maxQueueResources.containsKey(queueName)) { + return maxQueueResources.get(queueName); + } else { + return Resources.createResource(Integer.MAX_VALUE); + } } } + private void setMaxResources(Map resources) { + synchronized(maxQueueResourcesMO) { + maxQueueResources = resources; + } + } + /** * Add an app in the appropriate queue */ @@ -428,11 +479,12 @@ public synchronized void removeJob(FSSchedulerApp app) { /** * Get a collection of all queues */ - public synchronized Collection getQueues() { - return queues.values(); + public Collection getQueues() { + synchronized (queues) { + return queues.values(); + } } - /** * Get all queue names that have been seen either in the allocation file or in * a submitted app. @@ -447,40 +499,102 @@ public synchronized Collection getQueueNames() { } public int getUserMaxApps(String user) { - if (userMaxApps.containsKey(user)) { - return userMaxApps.get(user); - } else { - return userMaxAppsDefault; + synchronized (userMaxAppsMO) { + if (userMaxApps.containsKey(user)) { + return userMaxApps.get(user); + } else { + return getUserMaxAppsDefault(); + } } } + private void setUserMaxApps(Map userApps) { + synchronized (userMaxAppsMO) { + userMaxApps = userApps; + } + } + + private int getUserMaxAppsDefault() { + synchronized (userMaxAppsDefaultMO){ + return userMaxAppsDefault; + } + } + + private void setUserMaxAppsDefault(int userMaxApps) { + synchronized (userMaxAppsDefaultMO){ + userMaxAppsDefault = userMaxApps; + } + } + public int getQueueMaxApps(String queue) { - if (queueMaxApps.containsKey(queue)) { - return queueMaxApps.get(queue); - } else { + synchronized (queueMaxAppsMO) { + if (queueMaxApps.containsKey(queue)) { + return queueMaxApps.get(queue); + } else { + return getQueueMaxAppsDefault(); + } + } + } + + private void setQueueMaxApps(Map queueApps) { + synchronized (queueMaxAppsMO) { + queueMaxApps = queueApps; + } + } + + private int getQueueMaxAppsDefault(){ + synchronized(queueMaxAppsDefaultMO) { return queueMaxAppsDefault; } } + + private void setQueueMaxAppsDefault(int queueMaxApps){ + synchronized(queueMaxAppsDefaultMO) { + queueMaxAppsDefault = queueMaxApps; + } + } + + private void setDefaultSchedulingMode(SchedulingMode schedulingMode){ + synchronized(defaultSchedulingModeMO) { + defaultSchedulingMode = schedulingMode; + } + } public double getQueueWeight(String queue) { - if (queueWeights.containsKey(queue)) { - return queueWeights.get(queue); - } else { - return 1.0; + synchronized (queueWeightsMO) { + if (queueWeights.containsKey(queue)) { + return queueWeights.get(queue); + } else { + return 1.0; + } } } + private void setQueueWeights(Map weights) { + synchronized (queueWeightsMO) { + queueWeights = weights; + } + } /** * Get a queue's min share preemption timeout, in milliseconds. This is the * time after which jobs in the queue may kill other queues' tasks if they * are below their min share. */ public long getMinSharePreemptionTimeout(String queueName) { - if (minSharePreemptionTimeouts.containsKey(queueName)) { - return minSharePreemptionTimeouts.get(queueName); + synchronized (minSharePreemptionTimeoutsMO) { + if (minSharePreemptionTimeouts.containsKey(queueName)) { + return minSharePreemptionTimeouts.get(queueName); + } } return defaultMinSharePreemptionTimeout; } + + private void setMinSharePreemptionTimeouts( + Map sharePreemptionTimeouts){ + synchronized (minSharePreemptionTimeoutsMO) { + minSharePreemptionTimeouts = sharePreemptionTimeouts; + } + } /** * Get the fair share preemption, in milliseconds. This is the time @@ -497,9 +611,10 @@ public long getFairSharePreemptionTimeout() { */ public Map getQueueAcls(String queue) { HashMap out = new HashMap(); - - if (queueAcls.containsKey(queue)) { - out.putAll(queueAcls.get(queue)); + synchronized (queueAclsMO) { + if (queueAcls.containsKey(queue)) { + out.putAll(queueAcls.get(queue)); + } } if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) { out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*")); @@ -509,4 +624,10 @@ public Map getQueueAcls(String queue) { } return out; } + + private void setQueueAcls(Map> queue) { + synchronized (queueAclsMO) { + queueAcls = queue; + } + } }