YARN-1629. IndexOutOfBoundsException in MaxRunningAppsEnforcer (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1561996 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8006467e58
commit
7f2b01a742
@ -465,6 +465,8 @@ Release 2.4.0 - UNRELEASED
|
|||||||
|
|
||||||
YARN-1642. RMDTRenewer#getRMClient should use ClientRMProxy (kasha)
|
YARN-1642. RMDTRenewer#getRMClient should use ClientRMProxy (kasha)
|
||||||
|
|
||||||
|
YARN-1629. IndexOutOfBoundsException in MaxRunningAppsEnforcer (Sandy Ryza)
|
||||||
|
|
||||||
Release 2.3.0 - UNRELEASED
|
Release 2.3.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -91,15 +91,6 @@ public boolean removeApp(FSSchedulerApp app) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void makeAppRunnable(AppSchedulable appSched) {
|
|
||||||
if (!nonRunnableAppScheds.remove(appSched)) {
|
|
||||||
throw new IllegalStateException("Can't make app runnable that does not " +
|
|
||||||
"already exist in queue as non-runnable" + appSched);
|
|
||||||
}
|
|
||||||
|
|
||||||
runnableAppScheds.add(appSched);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Collection<AppSchedulable> getRunnableAppSchedulables() {
|
public Collection<AppSchedulable> getRunnableAppSchedulables() {
|
||||||
return runnableAppScheds;
|
return runnableAppScheds;
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,9 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.PriorityQueue;
|
import java.util.PriorityQueue;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ArrayListMultimap;
|
import com.google.common.collect.ArrayListMultimap;
|
||||||
import com.google.common.collect.ListMultimap;
|
import com.google.common.collect.ListMultimap;
|
||||||
@ -33,6 +36,8 @@
|
|||||||
* constraints
|
* constraints
|
||||||
*/
|
*/
|
||||||
public class MaxRunningAppsEnforcer {
|
public class MaxRunningAppsEnforcer {
|
||||||
|
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
||||||
|
|
||||||
private final FairScheduler scheduler;
|
private final FairScheduler scheduler;
|
||||||
|
|
||||||
// Tracks the number of running applications by user.
|
// Tracks the number of running applications by user.
|
||||||
@ -163,7 +168,7 @@ public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) {
|
|||||||
Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator(
|
Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator(
|
||||||
appsNowMaybeRunnable);
|
appsNowMaybeRunnable);
|
||||||
FSSchedulerApp prev = null;
|
FSSchedulerApp prev = null;
|
||||||
int numNowRunnable = 0;
|
List<AppSchedulable> noLongerPendingApps = new ArrayList<AppSchedulable>();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
FSSchedulerApp next = iter.next();
|
FSSchedulerApp next = iter.next();
|
||||||
if (next == prev) {
|
if (next == prev) {
|
||||||
@ -173,21 +178,34 @@ public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) {
|
|||||||
if (canAppBeRunnable(next.getQueue(), next.getUser())) {
|
if (canAppBeRunnable(next.getQueue(), next.getUser())) {
|
||||||
trackRunnableApp(next);
|
trackRunnableApp(next);
|
||||||
AppSchedulable appSched = next.getAppSchedulable();
|
AppSchedulable appSched = next.getAppSchedulable();
|
||||||
next.getQueue().makeAppRunnable(appSched);
|
next.getQueue().getRunnableAppSchedulables().add(appSched);
|
||||||
if (!usersNonRunnableApps.remove(next.getUser(), appSched)) {
|
noLongerPendingApps.add(appSched);
|
||||||
throw new IllegalStateException("Waiting app " + next
|
|
||||||
+ " expected to be in usersNonRunnableApps");
|
|
||||||
}
|
|
||||||
|
|
||||||
// No more than one app per list will be able to be made runnable, so
|
// No more than one app per list will be able to be made runnable, so
|
||||||
// we can stop looking after we've found that many
|
// we can stop looking after we've found that many
|
||||||
if (numNowRunnable >= appsNowMaybeRunnable.size()) {
|
if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
prev = next;
|
prev = next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We remove the apps from their pending lists afterwards so that we don't
|
||||||
|
// pull them out from under the iterator. If they are not in these lists
|
||||||
|
// in the first place, there is a bug.
|
||||||
|
for (AppSchedulable appSched : noLongerPendingApps) {
|
||||||
|
if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables()
|
||||||
|
.remove(appSched)) {
|
||||||
|
LOG.error("Can't make app runnable that does not already exist in queue"
|
||||||
|
+ " as non-runnable: " + appSched + ". This should never happen.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) {
|
||||||
|
LOG.error("Waiting app " + appSched + " expected to be in "
|
||||||
|
+ "usersNonRunnableApps, but was not. This should never happen.");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -225,7 +243,7 @@ private void gatherPossiblyRunnableAppLists(FSQueue queue,
|
|||||||
* This allows us to pick which list to advance in O(log(num lists)) instead
|
* This allows us to pick which list to advance in O(log(num lists)) instead
|
||||||
* of O(num lists) time.
|
* of O(num lists) time.
|
||||||
*/
|
*/
|
||||||
private static class MultiListStartTimeIterator implements
|
static class MultiListStartTimeIterator implements
|
||||||
Iterator<FSSchedulerApp> {
|
Iterator<FSSchedulerApp> {
|
||||||
|
|
||||||
private List<AppSchedulable>[] appLists;
|
private List<AppSchedulable>[] appLists;
|
||||||
|
@ -21,6 +21,10 @@
|
|||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -152,4 +156,41 @@ public void testRemoveEnablingOrderedByStartTime() {
|
|||||||
assertEquals(0, leaf2.getNonRunnableAppSchedulables().size());
|
assertEquals(0, leaf2.getNonRunnableAppSchedulables().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleAppsWaitingOnCousinQueue() {
|
||||||
|
FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true);
|
||||||
|
FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true);
|
||||||
|
queueMaxApps.put("root.queue1", 2);
|
||||||
|
FSSchedulerApp app1 = addApp(leaf1, "user");
|
||||||
|
addApp(leaf2, "user");
|
||||||
|
addApp(leaf2, "user");
|
||||||
|
addApp(leaf2, "user");
|
||||||
|
assertEquals(1, leaf1.getRunnableAppSchedulables().size());
|
||||||
|
assertEquals(1, leaf2.getRunnableAppSchedulables().size());
|
||||||
|
assertEquals(2, leaf2.getNonRunnableAppSchedulables().size());
|
||||||
|
removeApp(app1);
|
||||||
|
assertEquals(0, leaf1.getRunnableAppSchedulables().size());
|
||||||
|
assertEquals(2, leaf2.getRunnableAppSchedulables().size());
|
||||||
|
assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiListStartTimeIteratorEmptyAppLists() {
|
||||||
|
List<List<AppSchedulable>> lists = new ArrayList<List<AppSchedulable>>();
|
||||||
|
lists.add(Arrays.asList(mockAppSched(1)));
|
||||||
|
lists.add(Arrays.asList(mockAppSched(2)));
|
||||||
|
Iterator<FSSchedulerApp> iter =
|
||||||
|
new MaxRunningAppsEnforcer.MultiListStartTimeIterator(lists);
|
||||||
|
assertEquals(1, iter.next().getAppSchedulable().getStartTime());
|
||||||
|
assertEquals(2, iter.next().getAppSchedulable().getStartTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
private AppSchedulable mockAppSched(long startTime) {
|
||||||
|
AppSchedulable appSched = mock(AppSchedulable.class);
|
||||||
|
when(appSched.getStartTime()).thenReturn(startTime);
|
||||||
|
FSSchedulerApp schedApp = mock(FSSchedulerApp.class);
|
||||||
|
when(schedApp.getAppSchedulable()).thenReturn(appSched);
|
||||||
|
when(appSched.getApp()).thenReturn(schedApp);
|
||||||
|
return appSched;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user