From 59b5490989fad4c8e80dd85c3419810cdc8332f7 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 10 Sep 2013 01:24:45 +0000 Subject: [PATCH] YARN-292. Fixed FifoScheduler and FairScheduler to make their applications data structures thread safe to avoid RM crashing with ArrayIndexOutOfBoundsException. Contributed by Zhijie Shen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1521328 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 4 + .../rmapp/attempt/RMAppAttemptImpl.java | 6 +- .../scheduler/capacity/CapacityScheduler.java | 5 +- .../scheduler/fair/FairScheduler.java | 5 +- .../scheduler/fifo/FifoScheduler.java | 14 ++-- .../capacity/TestCapacityScheduler.java | 75 ++++++++++++++++++- .../scheduler/fair/TestFairScheduler.java | 9 +++ .../scheduler/fifo/TestFifoScheduler.java | 13 +++- 8 files changed, 117 insertions(+), 14 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ff35149429..b211a18bbb 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -176,6 +176,10 @@ Release 2.1.1-beta - UNRELEASED invalid client token key errors when an appliation is about to finish. (Jason Lowe via vinodkv) + YARN-292. Fixed FifoScheduler and FairScheduler to make their applications + data structures thread safe to avoid RM crashing with + ArrayIndexOutOfBoundsException. (Zhijie Shen via vinodkv) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 0e1b2c8a53..94a0f94b57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -833,7 +833,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { Allocation amContainerAllocation = appAttempt.scheduler.allocate( appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null, null); - + // There must be at least one container allocated, because a + // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed, + // and is put in SchedulerApplication#newlyAllocatedContainers. Then, + // YarnScheduler#allocate will fetch it. + assert amContainerAllocation.getContainers().size() != 0; // Set the masterContainer appAttempt.setMasterContainer(amContainerAllocation.getContainers().get( 0)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index bbf7f5cf69..9edf420da0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -80,6 +80,8 @@ import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") @@ -179,7 +181,8 @@ public class CapacityScheduler private Resource minimumAllocation; private Resource maximumAllocation; - private Map applications = + @VisibleForTesting + protected Map applications = new ConcurrentHashMap(); private boolean initialized = false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 72000e91fc..73221042a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -37,7 +37,6 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -50,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; @@ -154,8 +154,9 @@ public class FairScheduler implements ResourceScheduler { // This stores per-application scheduling information, indexed by // attempt ID's for fast lookup. + @VisibleForTesting protected Map applications = - new HashMap(); + new ConcurrentHashMap(); // Nodes in the cluster, indexed by NodeId private Map nodes = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 115d2089c3..bac013f365 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -25,8 +25,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,12 +36,10 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -50,7 +48,9 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; @@ -90,6 +90,8 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") @@ -113,8 +115,10 @@ public class FifoScheduler implements ResourceScheduler, Configurable { private Resource maximumAllocation; private boolean usePortForNodeName; - private Map applications - = new TreeMap(); + // Use ConcurrentSkipListMap because applications need to be ordered + @VisibleForTesting + protected Map applications + = new ConcurrentSkipListMap(); private ActiveUsersManager activeUsersManager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index ec486d78ea..38df24fa99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -19,22 +19,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; +import java.lang.reflect.Constructor; import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetworkTopology; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -44,19 +49,24 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Before; @@ -525,4 +535,63 @@ public class TestCapacityScheduler { assertTrue(appComparator.compare(app2, app3) < 0); } + @Test + public void testConcurrentAccessOnApplications() throws Exception { + CapacityScheduler cs = new CapacityScheduler(); + verifyConcurrentAccessOnApplications( + cs.applications, FiCaSchedulerApp.class); + } + + public static + void verifyConcurrentAccessOnApplications( + final Map applications, Class clazz) + throws Exception { + final int size = 10000; + final ApplicationId appId = ApplicationId.newInstance(0, 0); + final Constructor ctor = clazz.getDeclaredConstructor( + ApplicationAttemptId.class, String.class, Queue.class, + ActiveUsersManager.class, RMContext.class); + + ApplicationAttemptId appAttemptId0 + = ApplicationAttemptId.newInstance(appId, 0); + applications.put(appAttemptId0, ctor.newInstance( + appAttemptId0, null, mock(Queue.class), null, null)); + assertNotNull(applications.get(appAttemptId0)); + + // Imitating the thread of scheduler that will add and remove apps + final AtomicBoolean finished = new AtomicBoolean(false); + final AtomicBoolean failed = new AtomicBoolean(false); + Thread t = new Thread() { + + @Override + public void run() { + for (int i = 1; i <= size; ++i) { + ApplicationAttemptId appAttemptId + = ApplicationAttemptId.newInstance(appId, i); + try { + applications.put(appAttemptId, ctor.newInstance( + appAttemptId, null, mock(Queue.class), null, null)); + } catch (Exception e) { + failed.set(true); + finished.set(true); + return; + } + } + for (int i = 1; i <= size; ++i) { + ApplicationAttemptId appAttemptId + = ApplicationAttemptId.newInstance(appId, i); + applications.remove(appAttemptId); + } + finished.set(true); + } + }; + t.start(); + + // Imitating the thread of rmappattempt that will get the app + while (!finished.get()) { + assertNotNull(applications.get(appAttemptId0)); + } + assertFalse(failed.get()); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 17ff7ded85..0b348eaf04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -79,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.Resources; @@ -2196,4 +2198,11 @@ public class TestFairScheduler { assertEquals(1, app.getLiveContainers().size()); } + @Test + public void testConcurrentAccessOnApplications() throws Exception { + FairScheduler fs = new FairScheduler(); + TestCapacityScheduler.verifyConcurrentAccessOnApplications( + fs.applications, FSSchedulerApp.class); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index b71726a355..30ce68e73e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -51,13 +51,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Before; @@ -414,7 +416,14 @@ public class TestFifoScheduler { LOG.info("--- END: testFifoScheduler ---"); } - + + @Test + public void testConcurrentAccessOnApplications() throws Exception { + FifoScheduler fs = new FifoScheduler(); + TestCapacityScheduler.verifyConcurrentAccessOnApplications( + fs.applications, FiCaSchedulerApp.class); + } + private void checkApplicationResourceUsage(int expected, Application application) { Assert.assertEquals(expected, application.getUsedResources().getMemory());