diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8852a3bedb..1ec852aef3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -310,6 +310,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2726. Added job-file to the AM and JobHistoryServer web interfaces. (Jeffrey Naisbitt via vinodkv) + MAPREDUCE-3055. Simplified ApplicationAttemptId passing to + ApplicationMaster via environment variable. (vinodkv) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 41a86f1271..ab8dec169c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -87,6 +88,7 @@ import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; +import org.apache.hadoop.yarn.util.ConverterUtils; /** * The Map-Reduce Application Master. @@ -647,13 +649,18 @@ public void handle(SpeculatorEvent event) { public static void main(String[] args) { try { - //Configuration.addDefaultResource("job.xml"); - ApplicationId applicationId = RecordFactoryProvider - .getRecordFactory(null).newRecordInstance(ApplicationId.class); - applicationId.setClusterTimestamp(Long.valueOf(args[0])); - applicationId.setId(Integer.valueOf(args[1])); - int failCount = Integer.valueOf(args[2]); - MRAppMaster appMaster = new MRAppMaster(applicationId, failCount); + String applicationAttemptIdStr = System + .getenv(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV); + if (applicationAttemptIdStr == null) { + String msg = ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV + + " is null"; + LOG.error(msg); + throw new IOException(msg); + } + ApplicationAttemptId applicationAttemptId = ConverterUtils + .toApplicationAttemptId(applicationAttemptIdStr); + MRAppMaster appMaster = new MRAppMaster(applicationAttemptId + .getApplicationId(), applicationAttemptId.getAttemptId()); Runtime.getRuntime().addShutdownHook( new CompositeServiceShutdownHook(appMaster)); YarnConfiguration conf = new YarnConfiguration(new JobConf()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 7a6443b6c9..20bd976b8d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -321,9 +321,6 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS)); vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS); - vargs.add(String.valueOf(applicationId.getClusterTimestamp())); - vargs.add(String.valueOf(applicationId.getId())); - vargs.add(ApplicationConstants.AM_FAIL_COUNT_STRING); vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java index 591035b046..99f145fbdc 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java @@ -37,8 +37,11 @@ public interface ApplicationConstants { public static final String APPLICATION_CLIENT_SECRET_ENV_NAME = "AppClientTokenEnv"; - // TODO: Weird. This is part of AM command line. Instead it should be a env. - public static final String AM_FAIL_COUNT_STRING = ""; + /** + * The environmental variable for APPLICATION_ATTEMPT_ID. Set in + * ApplicationMaster's environment only. + */ + public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID"; public static final String CONTAINER_TOKEN_FILE_ENV_NAME = UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java index ab6bd7395d..6f5e904319 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.yarn.util.StringHelper._split; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.text.NumberFormat; @@ -45,6 +46,8 @@ public class ConverterUtils { public static final String APPLICATION_PREFIX = "application"; + public static final String CONTAINER_PREFIX = "container"; + public static final String APPLICATION_ATTEMPT_PREFIX = "appattempt"; /** * return a hadoop path from a given url @@ -132,14 +135,12 @@ private static ApplicationId toApplicationId(RecordFactory recordFactory, } private static ApplicationAttemptId toApplicationAttemptId( - RecordFactory recordFactory, - Iterator it) { - ApplicationId appId = - recordFactory.newRecordInstance(ApplicationId.class); + Iterator it) throws NumberFormatException { + ApplicationId appId = Records.newRecord(ApplicationId.class); appId.setClusterTimestamp(Long.parseLong(it.next())); appId.setId(Integer.parseInt(it.next())); - ApplicationAttemptId appAttemptId = - recordFactory.newRecordInstance(ApplicationAttemptId.class); + ApplicationAttemptId appAttemptId = Records + .newRecord(ApplicationAttemptId.class); appAttemptId.setApplicationId(appId); appAttemptId.setAttemptId(Integer.parseInt(it.next())); return appAttemptId; @@ -149,16 +150,35 @@ public static String toString(ContainerId cId) { return cId.toString(); } - public static ContainerId toContainerId(RecordFactory recordFactory, - String containerIdStr) { + public static ContainerId toContainerId(String containerIdStr) + throws IOException { Iterator it = _split(containerIdStr).iterator(); - it.next(); // prefix. TODO: Validate container prefix - ApplicationAttemptId appAttemptID = - toApplicationAttemptId(recordFactory, it); - ContainerId containerId = - recordFactory.newRecordInstance(ContainerId.class); - containerId.setApplicationAttemptId(appAttemptID); - containerId.setId(Integer.parseInt(it.next())); - return containerId; + if (!it.next().equals(CONTAINER_PREFIX)) { + throw new IOException("Invalid ContainerId prefix: " + containerIdStr); + } + try { + ApplicationAttemptId appAttemptID = toApplicationAttemptId(it); + ContainerId containerId = Records.newRecord(ContainerId.class); + containerId.setApplicationAttemptId(appAttemptID); + containerId.setId(Integer.parseInt(it.next())); + return containerId; + } catch (NumberFormatException n) { + throw new IOException("Invalid ContainerId: " + containerIdStr, n); + } + } + + public static ApplicationAttemptId toApplicationAttemptId( + String applicationAttmeptIdStr) throws IOException { + Iterator it = _split(applicationAttmeptIdStr).iterator(); + if (!it.next().equals(APPLICATION_ATTEMPT_PREFIX)) { + throw new IOException("Invalid AppAttemptId prefix: " + + applicationAttmeptIdStr); + } + try { + return toApplicationAttemptId(it); + } catch (NumberFormatException n) { + throw new IOException("Invalid AppAttemptId: " + + applicationAttmeptIdStr, n); + } } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java index 68b0686a25..e0795613b6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java @@ -31,8 +31,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; @@ -56,22 +54,26 @@ public static class ContainersLogsBlock extends HtmlBlock implements private final Configuration conf; private final LocalDirAllocator logsSelector; private final Context nmContext; - private final RecordFactory recordFactory; @Inject public ContainersLogsBlock(Configuration conf, Context context) { this.conf = conf; this.logsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS); this.nmContext = context; - this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); } @Override protected void render(Block html) { DIV div = html.div("#content"); - ContainerId containerId = - ConverterUtils.toContainerId(this.recordFactory, $(CONTAINER_ID)); + ContainerId containerId; + try { + containerId = ConverterUtils.toContainerId($(CONTAINER_ID)); + } catch (IOException e) { + div.h1("Invalid containerId " + $(CONTAINER_ID))._(); + return; + } + Container container = this.nmContext.getContainers().get(containerId); if (container == null) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java index 27be38a029..5425032eec 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java @@ -18,16 +18,15 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp; +import static org.apache.hadoop.yarn.util.StringHelper.ujoin; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID; -import static org.apache.hadoop.yarn.util.StringHelper.ujoin; -import org.apache.hadoop.conf.Configuration; +import java.io.IOException; + import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -53,21 +52,23 @@ protected Class content() { public static class ContainerBlock extends HtmlBlock implements NMWebParams { - private final Configuration conf; private final Context nmContext; - private final RecordFactory recordFactory; @Inject - public ContainerBlock(Configuration conf, Context nmContext) { - this.conf = conf; + public ContainerBlock(Context nmContext) { this.nmContext = nmContext; - this.recordFactory = RecordFactoryProvider.getRecordFactory(this.conf); } @Override protected void render(Block html) { - ContainerId containerID = - ConverterUtils.toContainerId(this.recordFactory, $(CONTAINER_ID)); + ContainerId containerID; + try { + containerID = ConverterUtils.toContainerId($(CONTAINER_ID)); + } catch (IOException e) { + html.p()._("Invalid containerId " + $(CONTAINER_ID))._(); + return; + } + Container container = this.nmContext.getContainers().get(containerID); ContainerStatus containerData = container.cloneAndGetContainerStatus(); int exitCode = containerData.getExitStatus(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index b394faa85d..337f481689 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -136,7 +136,7 @@ private void cleanup() throws IOException { containerMgrProxy.stopContainer(stopRequest); } - private ContainerManager getContainerMgrProxy( + protected ContainerManager getContainerMgrProxy( final ApplicationId applicationID) throws IOException { Container container = application.getMasterContainer(); @@ -173,23 +173,11 @@ private ContainerLaunchContext createAMContainerLaunchContext( // Construct the actual Container ContainerLaunchContext container = applicationMasterContext.getAMContainerSpec(); - StringBuilder mergedCommand = new StringBuilder(); - String failCount = Integer.toString(application.getAppAttemptId() - .getAttemptId()); - List commandList = new ArrayList(); - for (String str : container.getCommands()) { - // This is out-right wrong. AM FAIL count should be passed via env. - String result = - str.replaceFirst(ApplicationConstants.AM_FAIL_COUNT_STRING, - failCount); - mergedCommand.append(result).append(" "); - commandList.add(result); - } - container.setCommands(commandList); - /** add the failed count to the app master command line */ - - LOG.info("Command to launch container " + - containerID + " : " + mergedCommand); + LOG.info("Command to launch container " + + containerID + + " : " + + StringUtils.arrayToString(container.getCommands().toArray( + new String[0]))); // Finalize the container container.setContainerId(containerID); @@ -203,6 +191,11 @@ private void setupTokensAndEnv( ContainerLaunchContext container) throws IOException { Map environment = container.getEnvironment(); + + // Set the AppAttemptId to be consumable by the AM. + environment.put(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV, + application.getAppAttemptId().toString()); + if (UserGroupInformation.isSecurityEnabled()) { // TODO: Security enabled/disabled info should come from RM. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java index d1ef1d1400..67f0c8a016 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java @@ -42,9 +42,9 @@ public class ApplicationMasterLauncher extends AbstractService implements private final BlockingQueue masterEvents = new LinkedBlockingQueue(); - private ApplicationTokenSecretManager applicationTokenSecretManager; + protected ApplicationTokenSecretManager applicationTokenSecretManager; private ClientToAMSecretManager clientToAMSecretManager; - private final RMContext context; + protected final RMContext context; public ApplicationMasterLauncher(ApplicationTokenSecretManager applicationTokenSecretManager, ClientToAMSecretManager clientToAMSecretManager, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 4be2739967..2123ee806c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -195,6 +195,7 @@ public void stop() { }; } + @Override protected AdminService createAdminService() { return new AdminService(getConfig(), scheduler, getRMContext(), this.nodesListManager){ diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java new file mode 100644 index 0000000000..a12049f9e8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -0,0 +1,159 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ContainerManager; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; + +public class TestApplicationMasterLauncher { + + private static final Log LOG = LogFactory + .getLog(TestApplicationMasterLauncher.class); + + private static final class MyContainerManagerImpl implements + ContainerManager { + + boolean launched = false; + boolean cleanedup = false; + String attemptIdAtContainerManager = null; + + @Override + public StartContainerResponse + startContainer(StartContainerRequest request) + throws YarnRemoteException { + LOG.info("Container started by MyContainerManager: " + request); + launched = true; + attemptIdAtContainerManager = request.getContainerLaunchContext() + .getEnvironment().get( + ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV); + return null; + } + + @Override + public StopContainerResponse stopContainer(StopContainerRequest request) + throws YarnRemoteException { + LOG.info("Container cleaned up by MyContainerManager"); + cleanedup = true; + return null; + } + + @Override + public GetContainerStatusResponse getContainerStatus( + GetContainerStatusRequest request) throws YarnRemoteException { + return null; + } + + } + + private static final class MockRMWithCustomAMLauncher extends MockRM { + + private final ContainerManager containerManager; + + public MockRMWithCustomAMLauncher(ContainerManager containerManager) { + this.containerManager = containerManager; + } + + @Override + protected ApplicationMasterLauncher createAMLauncher() { + return new ApplicationMasterLauncher(super.appTokenSecretManager, + super.clientToAMSecretManager, getRMContext()) { + @Override + protected Runnable createRunnableLauncher(RMAppAttempt application, + AMLauncherEventType event) { + return new AMLauncher(context, application, event, + applicationTokenSecretManager, clientToAMSecretManager, + getConfig()) { + @Override + protected ContainerManager getContainerMgrProxy( + ApplicationId applicationID) throws IOException { + return containerManager; + } + }; + } + }; + } + } + + @Test + public void testAMLaunchAndCleanup() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + MyContainerManagerImpl containerManager = new MyContainerManagerImpl(); + MockRMWithCustomAMLauncher rm = new MockRMWithCustomAMLauncher( + containerManager); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 5120); + + RMApp app = rm.submitApp(2000); + + // kick the scheduling + nm1.nodeHeartbeat(true); + + int waitCount = 0; + while (containerManager.launched == false && waitCount++ < 20) { + LOG.info("Waiting for AM Launch to happen.."); + Thread.sleep(1000); + } + Assert.assertTrue(containerManager.launched); + + RMAppAttempt attempt = app.getCurrentAppAttempt(); + ApplicationAttemptId appAttemptId = attempt.getAppAttemptId(); + Assert.assertEquals(appAttemptId.toString(), + containerManager.attemptIdAtContainerManager); + + MockAM am = new MockAM(rm.getRMContext(), rm + .getApplicationMasterService(), appAttemptId); + am.registerAppAttempt(); + am.unregisterAppAttempt(); + + waitCount = 0; + while (containerManager.cleanedup == false && waitCount++ < 20) { + LOG.info("Waiting for AM Cleanup to happen.."); + Thread.sleep(1000); + } + Assert.assertTrue(containerManager.cleanedup); + + am.waitForState(RMAppAttemptState.FINISHED); + rm.stop(); + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java deleted file mode 100644 index 8cc948400e..0000000000 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java +++ /dev/null @@ -1,193 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; - -import java.util.concurrent.atomic.AtomicInteger; - -import junit.framework.Assert; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationState; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager; -import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; -import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; -import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; -import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; -import org.apache.hadoop.yarn.util.Records; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -/** - * Testing the applications manager launcher. - * - */ -public class TestApplicationMasterLauncher { -// private static final Log LOG = LogFactory.getLog(TestApplicationMasterLauncher.class); -// private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); -// private ApplicationMasterLauncher amLauncher; -// private DummyEventHandler asmHandle; -// private final ApplicationTokenSecretManager applicationTokenSecretManager = -// new ApplicationTokenSecretManager(); -// private final ClientToAMSecretManager clientToAMSecretManager = -// new ClientToAMSecretManager(); -// -// Object doneLaunching = new Object(); -// AtomicInteger launched = new AtomicInteger(); -// AtomicInteger cleanedUp = new AtomicInteger(); -// private RMContext context = new RMContextImpl(new MemStore(), null, null, -// null); -// -// private Configuration conf = new Configuration(); -// -// private class DummyEventHandler implements EventHandler { -// @Override -// public void handle(ApplicationEvent appEvent) { -// ApplicationEventType event = appEvent.getType(); -// switch (event) { -// case FINISH: -// synchronized(doneLaunching) { -// doneLaunching.notify(); -// } -// break; -// -// default: -// break; -// } -// } -// } -// -// private class DummyLaunch implements Runnable { -// public void run() { -// launched.incrementAndGet(); -// } -// } -// -// private class DummyCleanUp implements Runnable { -// private EventHandler eventHandler; -// -// public DummyCleanUp(EventHandler eventHandler) { -// this.eventHandler = eventHandler; -// } -// public void run() { -// cleanedUp.incrementAndGet(); -// eventHandler.handle(new AMFinishEvent(null, -// ApplicationState.COMPLETED, "", "")); -// } -// } -// -// private class DummyApplicationMasterLauncher extends -// ApplicationMasterLauncher { -// private EventHandler eventHandler; -// -// public DummyApplicationMasterLauncher( -// ApplicationTokenSecretManager applicationTokenSecretManager, -// ClientToAMSecretManager clientToAMSecretManager, -// EventHandler eventHandler) { -// super(applicationTokenSecretManager, clientToAMSecretManager, context); -// this.eventHandler = eventHandler; -// } -// -// @Override -// protected Runnable createRunnableLauncher(RMAppAttempt application, -// AMLauncherEventType event) { -// Runnable r = null; -// switch (event) { -// case LAUNCH: -// r = new DummyLaunch(); -// break; -// case CLEANUP: -// r = new DummyCleanUp(eventHandler); -// default: -// break; -// } -// return r; -// } -// } -// -// @Before -// public void setUp() { -// asmHandle = new DummyEventHandler(); -// amLauncher = new DummyApplicationMasterLauncher(applicationTokenSecretManager, -// clientToAMSecretManager, asmHandle); -// context.getDispatcher().init(conf); -// amLauncher.init(conf); -// context.getDispatcher().start(); -// amLauncher.start(); -// -// } -// -// @After -// public void tearDown() { -// amLauncher.stop(); -// } -// -// @Test -// public void testAMLauncher() throws Exception { -// -// // Creat AppId -// ApplicationId appId = recordFactory -// .newRecordInstance(ApplicationId.class); -// appId.setClusterTimestamp(System.currentTimeMillis()); -// appId.setId(1); -// -// ApplicationAttemptId appAttemptId = Records -// .newRecord(ApplicationAttemptId.class); -// appAttemptId.setApplicationId(appId); -// appAttemptId.setAttemptId(1); -// -// // Create submissionContext -// ApplicationSubmissionContext submissionContext = recordFactory -// .newRecordInstance(ApplicationSubmissionContext.class); -// submissionContext.setApplicationId(appId); -// submissionContext.setUser("dummyuser"); -// -// RMAppAttempt appAttempt = new RMAppAttemptImpl(appAttemptId, -// "dummyclienttoken", context, null, submissionContext); -// -// // Tell AMLauncher to launch the appAttempt -// amLauncher.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, -// appAttempt)); -// -// // Tell AMLauncher to cleanup the appAttempt -// amLauncher.handle(new AMLauncherEvent(AMLauncherEventType.CLEANUP, -// appAttempt)); -// -// synchronized (doneLaunching) { -// doneLaunching.wait(10000); -// } -// Assert.assertEquals(1, launched.get()); -// Assert.assertEquals(1, cleanedUp.get()); -// } -} \ No newline at end of file