YARN-639. Modified Distributed Shell application to start using the new NMClient library. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1493280 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-06-14 23:18:34 +00:00
parent 5e72bfc521
commit b503b6a07d
3 changed files with 159 additions and 94 deletions

View File

@ -349,6 +349,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-789. Enable zero capabilities resource requests in fair scheduler. YARN-789. Enable zero capabilities resource requests in fair scheduler.
(tucu) (tucu)
YARN-639. Modified Distributed Shell application to start using the new
NMClient library. (Zhijie Shen via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -21,15 +21,16 @@
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.security.PrivilegedAction; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
@ -42,9 +43,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@ -71,12 +69,10 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClientAsync; import org.apache.hadoop.yarn.client.AMRMClientAsync;
import org.apache.hadoop.yarn.client.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
/** /**
@ -147,11 +143,15 @@ public class ApplicationMaster {
// Configuration // Configuration
private Configuration conf; private Configuration conf;
// YARN RPC to communicate with the Resource Manager or Node Manager
private YarnRPC rpc;
// Handle to communicate with the Resource Manager // Handle to communicate with the Resource Manager
private AMRMClientAsync<ContainerRequest> resourceManager; @SuppressWarnings("rawtypes")
private AMRMClientAsync resourceManager;
// Handle to communicate with the Node Manager
private NMClientAsync nmClientAsync;
// Listen to process the response from the Node Manager
private NMCallbackHandler containerListener;
// Application Attempt Id ( combination of attemptId and fail count ) // Application Attempt Id ( combination of attemptId and fail count )
private ApplicationAttemptId appAttemptID; private ApplicationAttemptId appAttemptID;
@ -273,7 +273,6 @@ private void dumpOutDebugInfo() {
public ApplicationMaster() throws Exception { public ApplicationMaster() throws Exception {
// Set up the configuration and RPC // Set up the configuration and RPC
conf = new YarnConfiguration(); conf = new YarnConfiguration();
rpc = YarnRPC.create(conf);
} }
/** /**
@ -437,17 +436,20 @@ private void printUsage(Options opts) {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
@SuppressWarnings({ "rawtypes", "unchecked" })
public boolean run() throws YarnException, IOException { public boolean run() throws YarnException, IOException {
LOG.info("Starting ApplicationMaster"); LOG.info("Starting ApplicationMaster");
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
resourceManager = new AMRMClientAsync<ContainerRequest>(appAttemptID,
1000,
allocListener);
resourceManager.init(conf); resourceManager.init(conf);
resourceManager.start(); resourceManager.start();
containerListener = new NMCallbackHandler();
nmClientAsync = new NMClientAsync(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
// Setup local RPC Server to accept status requests directly from clients // Setup local RPC Server to accept status requests directly from clients
// TODO need to setup a protocol for client to be able to communicate to // TODO need to setup a protocol for client to be able to communicate to
// the RPC server // the RPC server
@ -517,6 +519,10 @@ private void finish() {
} }
} }
// When the application completes, it should stop all running containers
LOG.info("Application completed. Stopping running containers");
nmClientAsync.stop();
// When the application completes, it should send a finish application // When the application completes, it should send a finish application
// signal to the RM // signal to the RM
LOG.info("Application completed. Signalling finish to RM"); LOG.info("Application completed. Signalling finish to RM");
@ -548,6 +554,7 @@ private void finish() {
} }
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
@SuppressWarnings("unchecked")
@Override @Override
public void onContainersCompleted(List<ContainerStatus> completedContainers) { public void onContainersCompleted(List<ContainerStatus> completedContainers) {
LOG.info("Got response from RM for container ask, completedCnt=" LOG.info("Got response from RM for container ask, completedCnt="
@ -618,8 +625,8 @@ public void onContainersAllocated(List<Container> allocatedContainers) {
// + ", containerToken" // + ", containerToken"
// +allocatedContainer.getContainerToken().getIdentifier().toString()); // +allocatedContainer.getContainerToken().getIdentifier().toString());
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable( LaunchContainerRunnable runnableLaunchContainer =
allocatedContainer); new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer); Thread launchThread = new Thread(runnableLaunchContainer);
// launch and start the container on a separate thread to keep // launch and start the container on a separate thread to keep
@ -652,6 +659,64 @@ public void onError(Exception e) {
} }
} }
private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
private ConcurrentMap<ContainerId, Container> containers =
new ConcurrentHashMap<ContainerId, Container>();
public void addContainer(ContainerId containerId, Container container) {
containers.putIfAbsent(containerId, container);
}
@Override
public void onContainerStopped(ContainerId containerId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Succeeded to stop Container " + containerId);
}
containers.remove(containerId);
}
@Override
public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
if (LOG.isDebugEnabled()) {
LOG.debug("Container Status: id=" + containerId + ", status=" +
containerStatus);
}
}
@Override
public void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse) {
if (LOG.isDebugEnabled()) {
LOG.debug("Succeeded to start Container " + containerId);
}
Container container = containers.get(containerId);
if (container != null) {
nmClientAsync.getContainerStatus(containerId, container.getNodeId(),
container.getContainerToken());
}
}
@Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
LOG.error("Failed to start Container " + containerId);
containers.remove(containerId);
}
@Override
public void onGetContainerStatusError(
ContainerId containerId, Throwable t) {
LOG.error("Failed to query the status of Container " + containerId);
}
@Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
LOG.error("Failed to stop Container " + containerId);
containers.remove(containerId);
}
}
/** /**
* Thread to connect to the {@link ContainerManager} and launch the container * Thread to connect to the {@link ContainerManager} and launch the container
* that will execute the shell command. * that will execute the shell command.
@ -660,40 +725,17 @@ private class LaunchContainerRunnable implements Runnable {
// Allocated container // Allocated container
Container container; Container container;
// Handle to communicate with ContainerManager
ContainerManager cm; NMCallbackHandler containerListener;
/** /**
* @param lcontainer Allocated container * @param lcontainer Allocated container
* @param containerListener Callback handler of the container
*/ */
public LaunchContainerRunnable(Container lcontainer) { public LaunchContainerRunnable(
Container lcontainer, NMCallbackHandler containerListener) {
this.container = lcontainer; this.container = lcontainer;
} this.containerListener = containerListener;
/**
* Helper function to connect to CM
*/
private void connectToCM() {
LOG.debug("Connecting to ContainerManager for containerid="
+ container.getId());
String cmIpPortStr = container.getNodeId().getHost() + ":"
+ container.getNodeId().getPort();
final InetSocketAddress cmAddress =
NetUtils.createSocketAddr(cmIpPortStr);
LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(container.getId().toString());
Token<ContainerTokenIdentifier> token =
ProtoUtils.convertFromProtoFormat(container.getContainerToken(),
cmAddress);
ugi.addToken(token);
this.cm = ugi.doAs(new PrivilegedAction<ContainerManager>() {
@Override
public ContainerManager run() {
return ((ContainerManager) rpc.getProxy(ContainerManager.class,
cmAddress, conf));
}
});
} }
@Override @Override
@ -703,9 +745,6 @@ public ContainerManager run() {
* start request to the CM. * start request to the CM.
*/ */
public void run() { public void run() {
// Connect to ContainerManager
connectToCM();
LOG.info("Setting up container launch container for containerid=" LOG.info("Setting up container launch container for containerid="
+ container.getId()); + container.getId());
ContainerLaunchContext ctx = Records ContainerLaunchContext ctx = Records
@ -773,40 +812,8 @@ public void run() {
commands.add(command.toString()); commands.add(command.toString());
ctx.setCommands(commands); ctx.setCommands(commands);
StartContainerRequest startReq = Records containerListener.addContainer(container.getId(), container);
.newRecord(StartContainerRequest.class); nmClientAsync.startContainer(container, ctx);
startReq.setContainerLaunchContext(ctx);
startReq.setContainerToken(container.getContainerToken());
try {
cm.startContainer(startReq);
} catch (YarnException e) {
LOG.info("Start container failed for :" + ", containerId="
+ container.getId());
e.printStackTrace();
// TODO do we need to release this container?
} catch (IOException e) {
LOG.info("Start container failed for :" + ", containerId="
+ container.getId());
e.printStackTrace();
}
// Get container status?
// Left commented out as the shell scripts are short lived
// and we are relying on the status for completed containers
// from RM to detect status
// GetContainerStatusRequest statusReq =
// Records.newRecord(GetContainerStatusRequest.class);
// statusReq.setContainerId(container.getId());
// GetContainerStatusResponse statusResp;
// try {
// statusResp = cm.getContainerStatus(statusReq);
// LOG.info("Container Status"
// + ", id=" + container.getId()
// + ", status=" +statusResp.getStatus());
// } catch (YarnException e) {
// e.printStackTrace();
// }
} }
} }

View File

@ -123,21 +123,76 @@ public void testDSShell() throws Exception {
} }
@Test(timeout=30000) @Test(timeout=30000)
public void testDSShellWithNoArgs() throws Exception { public void testDSShellWithInvalidArgs() throws Exception {
Client client = new Client(new Configuration(yarnCluster.getConfig()));
String[] args = {};
LOG.info("Initializing DS Client with no args"); LOG.info("Initializing DS Client with no args");
Client client = new Client(new Configuration(yarnCluster.getConfig()));
boolean exceptionThrown = false;
try { try {
boolean initSuccess = client.init(args); client.init(new String[]{});
Assert.assertTrue(initSuccess); Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
Assert.assertTrue("The throw exception is not expected",
e.getMessage().contains("No args"));
} }
catch (IllegalArgumentException e) {
exceptionThrown = true; LOG.info("Initializing DS Client with no jar file");
try {
String[] args = {
"--num_containers",
"2",
"--shell_command",
Shell.WINDOWS ? "dir" : "ls",
"--master_memory",
"512",
"--container_memory",
"128"
};
client.init(args);
Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
Assert.assertTrue("The throw exception is not expected",
e.getMessage().contains("No jar"));
}
LOG.info("Initializing DS Client with no shell command");
try {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"2",
"--master_memory",
"512",
"--container_memory",
"128"
};
client.init(args);
Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
Assert.assertTrue("The throw exception is not expected",
e.getMessage().contains("No shell command"));
}
LOG.info("Initializing DS Client with invalid no. of containers");
try {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"-1",
"--shell_command",
Shell.WINDOWS ? "dir" : "ls",
"--master_memory",
"512",
"--container_memory",
"128"
};
client.init(args);
Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
Assert.assertTrue("The throw exception is not expected",
e.getMessage().contains("Invalid no. of containers"));
} }
Assert.assertTrue(exceptionThrown);
} }
} }