diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 6ecc5d0c41..35026ef426 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -431,6 +431,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3624. Remove unnecessary dependency on JDK's tools.jar. (mahadev via acmurthy) + MAPREDUCE-3616. Thread pool for launching containers in MR AM not + expanding as expected. (vinodkv via sseth) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index f1670039e6..a2faa9e341 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -76,8 +76,8 @@ public class ContainerLauncherImpl extends AbstractService implements int nmTimeOut; private AppContext context; - private ThreadPoolExecutor launcherPool; - private static final int INITIAL_POOL_SIZE = 10; + protected ThreadPoolExecutor launcherPool; + protected static final int INITIAL_POOL_SIZE = 10; private int limitOnPoolSize; private Thread eventHandlingThread; private BlockingQueue eventQueue = @@ -102,6 +102,7 @@ public synchronized void init(Configuration config) { this.limitOnPoolSize = conf.getInt( MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); + LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT); this.rpc = YarnRPC.create(conf); @@ -141,20 +142,21 @@ public void run() { int numNodes = allNodes.size(); int idealPoolSize = Math.min(limitOnPoolSize, numNodes); - if (poolSize <= idealPoolSize) { + if (poolSize < idealPoolSize) { // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the // later is just a buffer so we are not always increasing the // pool-size - int newPoolSize = idealPoolSize + INITIAL_POOL_SIZE; - LOG.info("Setting ContainerLauncher pool size to " - + newPoolSize); + int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize + + INITIAL_POOL_SIZE); + LOG.info("Setting ContainerLauncher pool size to " + newPoolSize + + " as number-of-nodes to talk to is " + numNodes); launcherPool.setCorePoolSize(newPoolSize); } } // the events from the queue are handled in parallel // using a thread pool - launcherPool.execute(new EventProcessor(event)); + launcherPool.execute(createEventProcessor(event)); // TODO: Group launching of multiple containers to a single // NodeManager into a single connection @@ -172,14 +174,16 @@ public void stop() { super.stop(); } + protected EventProcessor createEventProcessor(ContainerLauncherEvent event) { + return new EventProcessor(event); + } + protected ContainerManager getCMProxy(ContainerId containerID, final String containerManagerBindAddr, ContainerToken containerToken) throws IOException { UserGroupInformation user = UserGroupInformation.getCurrentUser(); - this.allNodes.add(containerManagerBindAddr); - if (UserGroupInformation.isSecurityEnabled()) { Token token = new Token( containerToken.getIdentifier().array(), containerToken @@ -244,7 +248,7 @@ public boolean cancel() { /** * Setup and start the container on remote nodemanager. */ - private class EventProcessor implements Runnable { + class EventProcessor implements Runnable { private ContainerLauncherEvent event; EventProcessor(ContainerLauncherEvent event) { @@ -280,7 +284,7 @@ public void run() { proxy = getCMProxy(containerID, containerManagerBindAddr, containerToken); - // Interruped during getProxy, but that didn't throw exception + // Interrupted during getProxy, but that didn't throw exception if (Thread.interrupted()) { // The timer cancelled the command in the mean while. String message = "Container launch failed for " + containerID @@ -438,6 +442,7 @@ void sendContainerLaunchFailedMsg(TaskAttemptId taskAttemptID, public void handle(ContainerLauncherEvent event) { try { eventQueue.put(event); + this.allNodes.add(event.getContainerMgrAddress()); } catch (InterruptedException e) { throw new YarnException(e); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java deleted file mode 100644 index 860133fada..0000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java +++ /dev/null @@ -1,140 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.mapreduce.v2.app; - -import java.io.IOException; -import java.util.Map; - -import junit.framework.Assert; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.v2.api.records.JobState; -import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; -import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; -import org.apache.hadoop.mapreduce.v2.api.records.TaskId; -import org.apache.hadoop.mapreduce.v2.api.records.TaskState; -import org.apache.hadoop.mapreduce.v2.app.job.Job; -import org.apache.hadoop.mapreduce.v2.app.job.Task; -import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; -import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; -import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; -import org.apache.hadoop.yarn.api.ContainerManager; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerToken; -import org.junit.Test; - -public class TestContainerLauncher { - - static final Log LOG = LogFactory - .getLog(TestContainerLauncher.class); - - @Test - public void testSlowNM() throws Exception { - test(false); - } - - @Test - public void testSlowNMWithInterruptsSwallowed() throws Exception { - test(true); - } - - private void test(boolean swallowInterrupts) throws Exception { - - MRApp app = new MRAppWithSlowNM(swallowInterrupts); - - Configuration conf = new Configuration(); - int maxAttempts = 1; - conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts); - conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); - - // Set low timeout for NM commands - conf.setInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, 3000); - - Job job = app.submit(conf); - app.waitForState(job, JobState.RUNNING); - - Map tasks = job.getTasks(); - Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); - - Task task = tasks.values().iterator().next(); - app.waitForState(task, TaskState.SCHEDULED); - - Map attempts = tasks.values().iterator() - .next().getAttempts(); - Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts - .size()); - - TaskAttempt attempt = attempts.values().iterator().next(); - app.waitForState(attempt, TaskAttemptState.ASSIGNED); - - app.waitForState(job, JobState.FAILED); - - String diagnostics = attempt.getDiagnostics().toString(); - LOG.info("attempt.getDiagnostics: " + diagnostics); - if (swallowInterrupts) { - Assert.assertEquals("[Container launch failed for " - + "container_0_0000_01_000000 : Start-container for " - + "container_0_0000_01_000000 got interrupted. Returning.]", - diagnostics); - } else { - Assert.assertTrue(diagnostics.contains("Container launch failed for " - + "container_0_0000_01_000000 : ")); - Assert.assertTrue(diagnostics - .contains(": java.lang.InterruptedException")); - } - - app.stop(); - } - - private static class MRAppWithSlowNM extends MRApp { - - final boolean swallowInterrupts; - - public MRAppWithSlowNM(boolean swallowInterrupts) { - super(1, 0, false, "TestContainerLauncher", true); - this.swallowInterrupts = swallowInterrupts; - } - - @Override - protected ContainerLauncher createContainerLauncher(AppContext context) { - return new ContainerLauncherImpl(context) { - @Override - protected ContainerManager getCMProxy(ContainerId containerID, - String containerManagerBindAddr, ContainerToken containerToken) - throws IOException { - try { - synchronized (this) { - wait(); // Just hang the thread simulating a very slow NM. - } - } catch (InterruptedException e) { - LOG.info(e); - if (!MRAppWithSlowNM.this.swallowInterrupts) { - throw new IOException(e); - } - Thread.currentThread().interrupt(); - } - return null; - } - }; - }; - } -} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java new file mode 100644 index 0000000000..f25de5c8cc --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -0,0 +1,321 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.launcher; + +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRApp; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.api.ContainerManager; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerToken; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.Test; + +public class TestContainerLauncher { + + static final Log LOG = LogFactory + .getLog(TestContainerLauncher.class); + + @Test + public void testPoolSize() throws InterruptedException { + + ApplicationId appId = BuilderUtils.newApplicationId(12345, 67); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 3); + JobId jobId = MRBuilderUtils.newJobId(appId, 8); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP); + TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10); + + AppContext context = mock(AppContext.class); + CustomContainerLauncher containerLauncher = new CustomContainerLauncher( + context); + containerLauncher.init(new Configuration()); + containerLauncher.start(); + + ThreadPoolExecutor threadPool = containerLauncher.getThreadPool(); + + // No events yet + Assert.assertEquals(0, threadPool.getPoolSize()); + Assert.assertEquals(ContainerLauncherImpl.INITIAL_POOL_SIZE, + threadPool.getCorePoolSize()); + Assert.assertNull(containerLauncher.foundErrors); + + containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; + for (int i = 0; i < 10; i++) { + containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, + containerId, "host" + i + ":1234", null, + ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); + } + waitForEvents(containerLauncher, 10); + Assert.assertEquals(10, threadPool.getPoolSize()); + Assert.assertNull(containerLauncher.foundErrors); + + // Same set of hosts, so no change + containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; + containerLauncher.finishEventHandling = true; + for (int i = 0; i < 10; i++) { + containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, + containerId, "host" + i + ":1234", null, + ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); + } + waitForEvents(containerLauncher, 20); + Assert.assertEquals(10, threadPool.getPoolSize()); + Assert.assertNull(containerLauncher.foundErrors); + + // Different hosts, there should be an increase in core-thread-pool size to + // 21(11hosts+10buffer) + // Core pool size should be 21 but the live pool size should be only 11. + containerLauncher.expectedCorePoolSize = 12 + ContainerLauncherImpl.INITIAL_POOL_SIZE; + for (int i = 1; i <= 2; i++) { + containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, + containerId, "host1" + i + ":1234", null, + ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); + } + waitForEvents(containerLauncher, 22); + Assert.assertEquals(12, threadPool.getPoolSize()); + Assert.assertNull(containerLauncher.foundErrors); + + containerLauncher.stop(); + } + + @Test + public void testPoolLimits() throws InterruptedException { + ApplicationId appId = BuilderUtils.newApplicationId(12345, 67); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 3); + JobId jobId = MRBuilderUtils.newJobId(appId, 8); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP); + TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10); + + AppContext context = mock(AppContext.class); + CustomContainerLauncher containerLauncher = new CustomContainerLauncher( + context); + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, 12); + containerLauncher.init(conf); + containerLauncher.start(); + + ThreadPoolExecutor threadPool = containerLauncher.getThreadPool(); + + // 10 different hosts + containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; + for (int i = 0; i < 10; i++) { + containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, + containerId, "host" + i + ":1234", null, + ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); + } + waitForEvents(containerLauncher, 10); + Assert.assertEquals(10, threadPool.getPoolSize()); + Assert.assertNull(containerLauncher.foundErrors); + + // 4 more different hosts, but thread pool size should be capped at 12 + containerLauncher.expectedCorePoolSize = 12 ; + for (int i = 1; i <= 4; i++) { + containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, + containerId, "host1" + i + ":1234", null, + ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); + } + waitForEvents(containerLauncher, 12); + Assert.assertEquals(12, threadPool.getPoolSize()); + Assert.assertNull(containerLauncher.foundErrors); + + // Make some threads ideal so that remaining events are also done. + containerLauncher.finishEventHandling = true; + waitForEvents(containerLauncher, 14); + Assert.assertEquals(12, threadPool.getPoolSize()); + Assert.assertNull(containerLauncher.foundErrors); + + containerLauncher.stop(); + } + + private void waitForEvents(CustomContainerLauncher containerLauncher, + int expectedNumEvents) throws InterruptedException { + int timeOut = 20; + while (expectedNumEvents != containerLauncher.numEventsProcessed + || timeOut++ < 20) { + LOG.info("Waiting for number of events to become " + expectedNumEvents + + ". It is now " + containerLauncher.numEventsProcessed); + Thread.sleep(1000); + } + Assert + .assertEquals(expectedNumEvents, containerLauncher.numEventsProcessed); + } + + @Test + public void testSlowNM() throws Exception { + test(false); + } + + @Test + public void testSlowNMWithInterruptsSwallowed() throws Exception { + test(true); + } + + private void test(boolean swallowInterrupts) throws Exception { + + MRApp app = new MRAppWithSlowNM(swallowInterrupts); + + Configuration conf = new Configuration(); + int maxAttempts = 1; + conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + + // Set low timeout for NM commands + conf.setInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, 3000); + + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + Map tasks = job.getTasks(); + Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); + + Task task = tasks.values().iterator().next(); + app.waitForState(task, TaskState.SCHEDULED); + + Map attempts = tasks.values().iterator() + .next().getAttempts(); + Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts + .size()); + + TaskAttempt attempt = attempts.values().iterator().next(); + app.waitForState(attempt, TaskAttemptState.ASSIGNED); + + app.waitForState(job, JobState.FAILED); + + String diagnostics = attempt.getDiagnostics().toString(); + LOG.info("attempt.getDiagnostics: " + diagnostics); + if (swallowInterrupts) { + Assert.assertEquals("[Container launch failed for " + + "container_0_0000_01_000000 : Start-container for " + + "container_0_0000_01_000000 got interrupted. Returning.]", + diagnostics); + } else { + Assert.assertTrue(diagnostics.contains("Container launch failed for " + + "container_0_0000_01_000000 : ")); + Assert.assertTrue(diagnostics + .contains(": java.lang.InterruptedException")); + } + + app.stop(); + } + + private final class CustomContainerLauncher extends ContainerLauncherImpl { + + private volatile int expectedCorePoolSize = 0; + private volatile int numEventsProcessed = 0; + private volatile String foundErrors = null; + private volatile boolean finishEventHandling; + private CustomContainerLauncher(AppContext context) { + super(context); + } + + public ThreadPoolExecutor getThreadPool() { + return super.launcherPool; + } + + protected ContainerLauncherImpl.EventProcessor createEventProcessor( + ContainerLauncherEvent event) { + // At this point of time, the EventProcessor is being created and so no + // additional threads would have been created. + + // Core-pool-size should have increased by now. + if (expectedCorePoolSize != launcherPool.getCorePoolSize()) { + foundErrors = "Expected " + expectedCorePoolSize + " but found " + + launcherPool.getCorePoolSize(); + } + + return new ContainerLauncherImpl.EventProcessor(event) { + @Override + public void run() { + // do nothing substantial + numEventsProcessed++; + // Stall + synchronized(this) { + try { + while(!finishEventHandling) { + wait(1000); + } + } catch (InterruptedException e) { + ; + } + } + } + }; + } + } + + private static class MRAppWithSlowNM extends MRApp { + + final boolean swallowInterrupts; + + public MRAppWithSlowNM(boolean swallowInterrupts) { + super(1, 0, false, "TestContainerLauncher", true); + this.swallowInterrupts = swallowInterrupts; + } + + @Override + protected ContainerLauncher createContainerLauncher(AppContext context) { + return new ContainerLauncherImpl(context) { + @Override + protected ContainerManager getCMProxy(ContainerId containerID, + String containerManagerBindAddr, ContainerToken containerToken) + throws IOException { + try { + synchronized (this) { + wait(); // Just hang the thread simulating a very slow NM. + } + } catch (InterruptedException e) { + LOG.info(e); + if (!MRAppWithSlowNM.this.swallowInterrupts) { + throw new IOException(e); + } + Thread.currentThread().interrupt(); + } + return null; + } + }; + }; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 510278c65b..cb802f1c5a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -27,6 +27,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; +import java.net.URL; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -181,23 +182,31 @@ private static void setMRFrameworkClasspath( String mrAppGeneratedClasspathFile = "mrapp-generated-classpath"; classpathFileStream = thisClassLoader.getResourceAsStream(mrAppGeneratedClasspathFile); - // Put the file itself on classpath for tasks. - String classpathElement = thisClassLoader.getResource(mrAppGeneratedClasspathFile).getFile(); - if (classpathElement.contains("!")) { - classpathElement = classpathElement.substring(0, classpathElement.indexOf("!")); - } - else { - classpathElement = new File(classpathElement).getParent(); - } - Apps.addToEnvironment( - environment, - Environment.CLASSPATH.name(), classpathElement); - reader = new BufferedReader(new InputStreamReader(classpathFileStream)); - String cp = reader.readLine(); - if (cp != null) { - Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), cp.trim()); - } + // Put the file itself on classpath for tasks. + URL classpathResource = thisClassLoader + .getResource(mrAppGeneratedClasspathFile); + if (classpathResource != null) { + String classpathElement = classpathResource.getFile(); + if (classpathElement.contains("!")) { + classpathElement = classpathElement.substring(0, + classpathElement.indexOf("!")); + } else { + classpathElement = new File(classpathElement).getParent(); + } + Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), + classpathElement); + } + + if (classpathFileStream != null) { + reader = new BufferedReader(new InputStreamReader(classpathFileStream)); + String cp = reader.readLine(); + if (cp != null) { + Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), + cp.trim()); + } + } + // Add standard Hadoop classes for (String c : ApplicationConstants.APPLICATION_CLASSPATH) { Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c);