From 635f96e74e7b988b320770c71022f38f55806090 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 26 Apr 2013 04:42:34 +0000 Subject: [PATCH] YARN-562. Missed files from previous commit. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1476038 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/api/ResourceManagerConstants.java | 24 ++ .../InvalidContainerException.java | 33 ++ .../NMNotYetReadyException.java | 34 ++ .../nodemanager/TestNodeManagerResync.java | 315 ++++++++++++++++++ .../TestApplicationMasterService.java | 84 +++++ 5 files changed, 490 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerConstants.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/InvalidContainerException.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/NMNotYetReadyException.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationmasterservice/TestApplicationMasterService.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerConstants.java new file mode 100644 index 0000000000..3842574a2c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerConstants.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api; + +public interface ResourceManagerConstants { + + public static final long RM_INVALID_IDENTIFIER = 0; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/InvalidContainerException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/InvalidContainerException.java new file mode 100644 index 0000000000..87f1cae243 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/InvalidContainerException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.yarn.YarnException; + +/** + * This Exception happens when NM is rejecting container requests from RM + */ +public class InvalidContainerException extends YarnException { + + private static final long serialVersionUID = 1L; + + public InvalidContainerException(String msg) { + super(msg); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/NMNotYetReadyException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/NMNotYetReadyException.java new file mode 100644 index 0000000000..a47f68120b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/NMNotYetReadyException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.yarn.YarnException; + +/** + * This exception happens when NM starts from scratch but has not yet connected + * with RM. + */ +public class NMNotYetReadyException extends YarnException { + + private static final long serialVersionUID = 1L; + + public NMNotYetReadyException(String msg) { + super(msg); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java new file mode 100644 index 0000000000..76ac4420c7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -0,0 +1,315 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; + +import junit.framework.Assert; + +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.NMNotYetReadyException; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestNodeManagerResync { + static final File basedir = + new File("target", TestNodeManagerResync.class.getName()); + static final File tmpDir = new File(basedir, "tmpDir"); + static final File logsDir = new File(basedir, "logs"); + static final File remoteLogsDir = new File(basedir, "remotelogs"); + static final File nmLocalDir = new File(basedir, "nm0"); + static final File processStartFile = new File(tmpDir, "start_file.txt") + .getAbsoluteFile(); + + static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + static final String user = "nobody"; + private FileContext localFS; + private CyclicBarrier syncBarrier; + private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); + + @Before + public void setup() throws UnsupportedFileSystemException { + localFS = FileContext.getLocalFSFileContext(); + tmpDir.mkdirs(); + logsDir.mkdirs(); + remoteLogsDir.mkdirs(); + nmLocalDir.mkdirs(); + syncBarrier = new CyclicBarrier(2); + } + + @After + public void tearDown() throws IOException, InterruptedException { + localFS.delete(new Path(basedir.getPath()), true); + assertionFailedInThread.set(false); + } + + @SuppressWarnings("unchecked") + @Test + public void testKillContainersOnResync() throws IOException, + InterruptedException { + NodeManager nm = new TestNodeManager1(); + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + ContainerId cId = TestNodeManagerShutdown.createContainerId(); + TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir, + processStartFile); + + Assert.assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount()); + nm.getNMDispatcher().getEventHandler(). + handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount()); + + Assert.assertFalse(assertionFailedInThread.get()); + + nm.stop(); + } + + // This test tests new container requests are blocked when NM starts from + // scratch until it register with RM AND while NM is resyncing with RM + @SuppressWarnings("unchecked") + @Test + public void testBlockNewContainerRequestsOnStartAndResync() + throws IOException, InterruptedException { + NodeManager nm = new TestNodeManager2(); + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + + // Start the container in running state + ContainerId cId = TestNodeManagerShutdown.createContainerId(); + TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir, + processStartFile); + + nm.getNMDispatcher().getEventHandler() + .handle(new NodeManagerEvent(NodeManagerEventType.RESYNC)); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + Assert.assertFalse(assertionFailedInThread.get()); + nm.stop(); + } + + private YarnConfiguration createNMConfig() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB + conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345"); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346"); + conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + remoteLogsDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath()); + return conf; + } + + class TestNodeManager1 extends NodeManager { + + private int registrationCount = 0; + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new TestNodeStatusUpdaterImpl1(context, dispatcher, + healthChecker, metrics); + } + + public int getNMRegistrationCount() { + return registrationCount; + } + + class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater { + + public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + } + + @Override + protected void registerWithRM() throws YarnRemoteException { + super.registerWithRM(); + registrationCount++; + } + + @Override + protected void rebootNodeStatusUpdater() { + ConcurrentMap containers = + getNMContext().getContainers(); + try { + // ensure that containers are empty before restart nodeStatusUpdater + Assert.assertTrue(containers.isEmpty()); + super.rebootNodeStatusUpdater(); + syncBarrier.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } catch (AssertionError ae) { + assertionFailedInThread.set(true); + } + } + } + } + + class TestNodeManager2 extends NodeManager { + + Thread launchContainersThread = null; + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new TestNodeStatusUpdaterImpl2(context, dispatcher, + healthChecker, metrics); + } + + @Override + protected ContainerManagerImpl createContainerManager(Context context, + ContainerExecutor exec, DeletionService del, + NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, + LocalDirsHandlerService dirsHandler) { + return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, + metrics, aclsManager, dirsHandler){ + @Override + public void setBlockNewContainerRequests( + boolean blockNewContainerRequests) { + if (blockNewContainerRequests) { + // start test thread right after blockNewContainerRequests is set + // true + super.setBlockNewContainerRequests(blockNewContainerRequests); + launchContainersThread = new RejectedContainersLauncherThread(); + launchContainersThread.start(); + } else { + // join the test thread right before blockNewContainerRequests is + // reset + try { + // stop the test thread + ((RejectedContainersLauncherThread) launchContainersThread) + .setStopThreadFlag(true); + launchContainersThread.join(); + ((RejectedContainersLauncherThread) launchContainersThread) + .setStopThreadFlag(false); + super.setBlockNewContainerRequests(blockNewContainerRequests); + } catch (InterruptedException e) { + } + } + } + }; + } + + class TestNodeStatusUpdaterImpl2 extends MockNodeStatusUpdater { + + public TestNodeStatusUpdaterImpl2(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + } + + @Override + protected void rebootNodeStatusUpdater() { + ConcurrentMap containers = + getNMContext().getContainers(); + + try { + // ensure that containers are empty before restart nodeStatusUpdater + Assert.assertTrue(containers.isEmpty()); + super.rebootNodeStatusUpdater(); + // After this point new containers are free to be launched, except + // containers from previous RM + // Wait here so as to sync with the main test thread. + syncBarrier.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } catch (AssertionError ae) { + assertionFailedInThread.set(true); + } + } + } + + class RejectedContainersLauncherThread extends Thread { + + boolean isStopped = false; + public void setStopThreadFlag(boolean isStopped) { + this.isStopped = isStopped; + } + + @Override + public void run() { + int numContainers = 0; + int numContainersRejected = 0; + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + try { + while (!isStopped && numContainers < 10) { + ContainerId cId = TestNodeManagerShutdown.createContainerId(); + Container container = + BuilderUtils.newContainer(cId, null, null, null, null, null, 0); + StartContainerRequest startRequest = + recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest.setContainerLaunchContext(containerLaunchContext); + startRequest.setContainer(container); + System.out.println("no. of containers to be launched: " + + numContainers); + numContainers++; + try { + getContainerManager().startContainer(startRequest); + } catch (YarnRemoteException e) { + numContainersRejected++; + Assert.assertTrue(e.getMessage().contains( + "Rejecting new containers as NodeManager has not" + + " yet connected with ResourceManager")); + // TO DO: This should be replaced to explicitly check exception + // class name after YARN-142 + Assert.assertTrue(e.getRemoteTrace().contains( + NMNotYetReadyException.class.getName())); + } + } + // no. of containers to be launched should equal to no. of + // containers rejected + Assert.assertEquals(numContainers, numContainersRejected); + } catch (AssertionError ae) { + assertionFailedInThread.set(true); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationmasterservice/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationmasterservice/TestApplicationMasterService.java new file mode 100644 index 0000000000..6bd3a54e1b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationmasterservice/TestApplicationMasterService.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationmasterservice; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestApplicationMasterService { + private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); + + private final int GB = 1024; + private static YarnConfiguration conf; + + @BeforeClass + public static void setup() { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + } + + @Test(timeout = 30000) + public void testRMIdentifierOnContainerAllocation() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("h1:1234", 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + am1.addRequests(new String[] { "h1" }, GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler + nm1.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(1000); + alloc1Response = am1.schedule(); + } + + // assert RMIdentifer is set properly in allocated containers + Assert.assertEquals(rm.clusterTimeStamp, alloc1Response + .getAllocatedContainers().get(0).getRMIdentifer()); + rm.stop(); + } +}