From 2b2bd9214604bc2e14e41e08d30bf86f512151bd Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 29 Jul 2015 15:16:40 +0000 Subject: [PATCH] YARN-3950. Add unique SHELL_ID environment variable to DistributedShell. Contributed by Robert Kanter --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../distributedshell/ApplicationMaster.java | 31 ++++++++++++++----- .../distributedshell/TestDSAppMaster.java | 11 ++++++- 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b4666e854c..be6a50cc6d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -358,6 +358,9 @@ Release 2.8.0 - UNRELEASED YARN-3026. Move application-specific container allocation logic from LeafQueue to FiCaSchedulerApp. (Wangda Tan via jianhe) + YARN-3950. Add unique SHELL_ID environment variable to DistributedShell + (Robert Kanter via jlowe) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index b28c0c925c..5d2d6c2824 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -182,6 +182,8 @@ public static enum DSEntity { DS_APP_ATTEMPT, DS_CONTAINER } + private static final String YARN_SHELL_ID = "YARN_SHELL_ID"; + // Configuration private Configuration conf; @@ -279,6 +281,8 @@ public static enum DSEntity { private final String linux_bash_command = "bash"; private final String windows_command = "cmd /c"; + private int yarnShellIdCounter = 1; + @VisibleForTesting protected final Set launchedContainers = Collections.newSetFromMap(new ConcurrentHashMap()); @@ -803,8 +807,11 @@ public void onContainersAllocated(List allocatedContainers) { + allocatedContainers.size()); numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { + String yarnShellId = Integer.toString(yarnShellIdCounter); + yarnShellIdCounter++; LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId() + + ", yarnShellId=" + yarnShellId + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() @@ -815,7 +822,8 @@ public void onContainersAllocated(List allocatedContainers) { // + ", containerToken" // +allocatedContainer.getContainerToken().getIdentifier().toString()); - Thread launchThread = createLaunchContainerThread(allocatedContainer); + Thread launchThread = createLaunchContainerThread(allocatedContainer, + yarnShellId); // launch and start the container on a separate thread to keep // the main thread unblocked @@ -927,7 +935,8 @@ public void onStopContainerError(ContainerId containerId, Throwable t) { private class LaunchContainerRunnable implements Runnable { // Allocated container - Container container; + private Container container; + private String shellId; NMCallbackHandler containerListener; @@ -935,10 +944,11 @@ private class LaunchContainerRunnable implements Runnable { * @param lcontainer Allocated container * @param containerListener Callback handler of the container */ - public LaunchContainerRunnable( - Container lcontainer, NMCallbackHandler containerListener) { + public LaunchContainerRunnable(Container lcontainer, + NMCallbackHandler containerListener, String shellId) { this.container = lcontainer; this.containerListener = containerListener; + this.shellId = shellId; } @Override @@ -949,7 +959,7 @@ public LaunchContainerRunnable( */ public void run() { LOG.info("Setting up container launch container for containerid=" - + container.getId()); + + container.getId() + " with shellid=" + shellId); // Set the local resources Map localResources = new HashMap(); @@ -1038,8 +1048,11 @@ public void run() { // download anyfiles in the distributed file-system. The tokens are // otherwise also useful in cases, for e.g., when one is running a // "hadoop dfs" command inside the distributed shell. + Map myShellEnv = new HashMap(shellEnv); + myShellEnv.put(YARN_SHELL_ID, shellId); ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( - localResources, shellEnv, commands, null, allTokens.duplicate(), null); + localResources, myShellEnv, commands, null, allTokens.duplicate(), + null); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } @@ -1189,9 +1202,11 @@ boolean getDone() { } @VisibleForTesting - Thread createLaunchContainerThread(Container allocatedContainer) { + Thread createLaunchContainerThread(Container allocatedContainer, + String shellId) { LaunchContainerRunnable runnableLaunchContainer = - new LaunchContainerRunnable(allocatedContainer, containerListener); + new LaunchContainerRunnable(allocatedContainer, containerListener, + shellId); return new Thread(runnableLaunchContainer); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java index 0fed14d02c..2789d047fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** @@ -51,11 +52,14 @@ public class TestDSAppMaster { static class TestAppMaster extends ApplicationMaster { private int threadsLaunched = 0; + public List yarnShellIds = new ArrayList(); @Override - protected Thread createLaunchContainerThread(Container allocatedContainer) { + protected Thread createLaunchContainerThread(Container allocatedContainer, + String shellId) { threadsLaunched++; launchedContainers.add(allocatedContainer.getId()); + yarnShellIds.add(shellId); return new Thread(); } @@ -101,6 +105,8 @@ public void testDSAppMasterAllocateHandler() throws Exception { Mockito.verifyZeroInteractions(mockClient); Assert.assertEquals("Incorrect number of threads launched", 1, master.threadsLaunched); + Assert.assertEquals("Incorrect YARN Shell IDs", + Arrays.asList("1"), master.yarnShellIds); // now send 3 extra containers containers.clear(); @@ -117,6 +123,9 @@ public void testDSAppMasterAllocateHandler() throws Exception { Assert.assertEquals("Incorrect number of threads launched", 4, master.threadsLaunched); + Assert.assertEquals("Incorrect YARN Shell IDs", + Arrays.asList("1", "2", "3", "4"), master.yarnShellIds); + // make sure we handle completion events correctly List status = new ArrayList<>(); status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));