YARN-3950. Add unique SHELL_ID environment variable to DistributedShell. Contributed by Robert Kanter

This commit is contained in:
Jason Lowe 2015-07-29 15:16:40 +00:00
parent 6374ee0db4
commit 2b2bd92146
3 changed files with 36 additions and 9 deletions

View File

@ -358,6 +358,9 @@ Release 2.8.0 - UNRELEASED
YARN-3026. Move application-specific container allocation logic from YARN-3026. Move application-specific container allocation logic from
LeafQueue to FiCaSchedulerApp. (Wangda Tan via jianhe) LeafQueue to FiCaSchedulerApp. (Wangda Tan via jianhe)
YARN-3950. Add unique SHELL_ID environment variable to DistributedShell
(Robert Kanter via jlowe)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -182,6 +182,8 @@ public static enum DSEntity {
DS_APP_ATTEMPT, DS_CONTAINER DS_APP_ATTEMPT, DS_CONTAINER
} }
private static final String YARN_SHELL_ID = "YARN_SHELL_ID";
// Configuration // Configuration
private Configuration conf; private Configuration conf;
@ -279,6 +281,8 @@ public static enum DSEntity {
private final String linux_bash_command = "bash"; private final String linux_bash_command = "bash";
private final String windows_command = "cmd /c"; private final String windows_command = "cmd /c";
private int yarnShellIdCounter = 1;
@VisibleForTesting @VisibleForTesting
protected final Set<ContainerId> launchedContainers = protected final Set<ContainerId> launchedContainers =
Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>()); Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
@ -803,8 +807,11 @@ public void onContainersAllocated(List<Container> allocatedContainers) {
+ allocatedContainers.size()); + allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size()); numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) { for (Container allocatedContainer : allocatedContainers) {
String yarnShellId = Integer.toString(yarnShellIdCounter);
yarnShellIdCounter++;
LOG.info("Launching shell command on a new container." LOG.info("Launching shell command on a new container."
+ ", containerId=" + allocatedContainer.getId() + ", containerId=" + allocatedContainer.getId()
+ ", yarnShellId=" + yarnShellId
+ ", containerNode=" + allocatedContainer.getNodeId().getHost() + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ ":" + allocatedContainer.getNodeId().getPort() + ":" + allocatedContainer.getNodeId().getPort()
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
@ -815,7 +822,8 @@ public void onContainersAllocated(List<Container> allocatedContainers) {
// + ", containerToken" // + ", containerToken"
// +allocatedContainer.getContainerToken().getIdentifier().toString()); // +allocatedContainer.getContainerToken().getIdentifier().toString());
Thread launchThread = createLaunchContainerThread(allocatedContainer); Thread launchThread = createLaunchContainerThread(allocatedContainer,
yarnShellId);
// launch and start the container on a separate thread to keep // launch and start the container on a separate thread to keep
// the main thread unblocked // the main thread unblocked
@ -927,7 +935,8 @@ public void onStopContainerError(ContainerId containerId, Throwable t) {
private class LaunchContainerRunnable implements Runnable { private class LaunchContainerRunnable implements Runnable {
// Allocated container // Allocated container
Container container; private Container container;
private String shellId;
NMCallbackHandler containerListener; NMCallbackHandler containerListener;
@ -935,10 +944,11 @@ private class LaunchContainerRunnable implements Runnable {
* @param lcontainer Allocated container * @param lcontainer Allocated container
* @param containerListener Callback handler of the container * @param containerListener Callback handler of the container
*/ */
public LaunchContainerRunnable( public LaunchContainerRunnable(Container lcontainer,
Container lcontainer, NMCallbackHandler containerListener) { NMCallbackHandler containerListener, String shellId) {
this.container = lcontainer; this.container = lcontainer;
this.containerListener = containerListener; this.containerListener = containerListener;
this.shellId = shellId;
} }
@Override @Override
@ -949,7 +959,7 @@ public LaunchContainerRunnable(
*/ */
public void run() { public void run() {
LOG.info("Setting up container launch container for containerid=" LOG.info("Setting up container launch container for containerid="
+ container.getId()); + container.getId() + " with shellid=" + shellId);
// Set the local resources // Set the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
@ -1038,8 +1048,11 @@ public void run() {
// download anyfiles in the distributed file-system. The tokens are // download anyfiles in the distributed file-system. The tokens are
// otherwise also useful in cases, for e.g., when one is running a // otherwise also useful in cases, for e.g., when one is running a
// "hadoop dfs" command inside the distributed shell. // "hadoop dfs" command inside the distributed shell.
Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
myShellEnv.put(YARN_SHELL_ID, shellId);
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, shellEnv, commands, null, allTokens.duplicate(), null); localResources, myShellEnv, commands, null, allTokens.duplicate(),
null);
containerListener.addContainer(container.getId(), container); containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx); nmClientAsync.startContainerAsync(container, ctx);
} }
@ -1189,9 +1202,11 @@ boolean getDone() {
} }
@VisibleForTesting @VisibleForTesting
Thread createLaunchContainerThread(Container allocatedContainer) { Thread createLaunchContainerThread(Container allocatedContainer,
String shellId) {
LaunchContainerRunnable runnableLaunchContainer = LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener); new LaunchContainerRunnable(allocatedContainer, containerListener,
shellId);
return new Thread(runnableLaunchContainer); return new Thread(runnableLaunchContainer);
} }
} }

View File

@ -41,6 +41,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
@ -51,11 +52,14 @@ public class TestDSAppMaster {
static class TestAppMaster extends ApplicationMaster { static class TestAppMaster extends ApplicationMaster {
private int threadsLaunched = 0; private int threadsLaunched = 0;
public List<String> yarnShellIds = new ArrayList<String>();
@Override @Override
protected Thread createLaunchContainerThread(Container allocatedContainer) { protected Thread createLaunchContainerThread(Container allocatedContainer,
String shellId) {
threadsLaunched++; threadsLaunched++;
launchedContainers.add(allocatedContainer.getId()); launchedContainers.add(allocatedContainer.getId());
yarnShellIds.add(shellId);
return new Thread(); return new Thread();
} }
@ -101,6 +105,8 @@ public void testDSAppMasterAllocateHandler() throws Exception {
Mockito.verifyZeroInteractions(mockClient); Mockito.verifyZeroInteractions(mockClient);
Assert.assertEquals("Incorrect number of threads launched", 1, Assert.assertEquals("Incorrect number of threads launched", 1,
master.threadsLaunched); master.threadsLaunched);
Assert.assertEquals("Incorrect YARN Shell IDs",
Arrays.asList("1"), master.yarnShellIds);
// now send 3 extra containers // now send 3 extra containers
containers.clear(); containers.clear();
@ -117,6 +123,9 @@ public void testDSAppMasterAllocateHandler() throws Exception {
Assert.assertEquals("Incorrect number of threads launched", 4, Assert.assertEquals("Incorrect number of threads launched", 4,
master.threadsLaunched); master.threadsLaunched);
Assert.assertEquals("Incorrect YARN Shell IDs",
Arrays.asList("1", "2", "3", "4"), master.yarnShellIds);
// make sure we handle completion events correctly // make sure we handle completion events correctly
List<ContainerStatus> status = new ArrayList<>(); List<ContainerStatus> status = new ArrayList<>();
status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS)); status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));