YARN-2821. Fixed a problem that DistributedShell AM may hang if restarted. Contributed by Varun Vasudev
This commit is contained in:
parent
e422e76fca
commit
7438966586
@ -420,6 +420,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
YARN-3302. TestDockerContainerExecutor should run automatically if it can
|
YARN-3302. TestDockerContainerExecutor should run automatically if it can
|
||||||
detect docker in the usual place (Ravindra Kumar Naik via raviprak)
|
detect docker in the usual place (Ravindra Kumar Naik via raviprak)
|
||||||
|
|
||||||
|
YARN-2821. Fixed a problem that DistributedShell AM may hang if restarted.
|
||||||
|
(Varun Vasudev via jianhe)
|
||||||
|
|
||||||
Release 2.7.1 - UNRELEASED
|
Release 2.7.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -116,6 +116,11 @@
|
|||||||
<type>test-jar</type>
|
<type>test-jar</type>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mockito</groupId>
|
||||||
|
<artifactId>mockito-all</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
@ -30,10 +30,12 @@
|
|||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.Vector;
|
import java.util.Vector;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
@ -277,6 +279,10 @@ public static enum DSEntity {
|
|||||||
private final String linux_bash_command = "bash";
|
private final String linux_bash_command = "bash";
|
||||||
private final String windows_command = "cmd /c";
|
private final String windows_command = "cmd /c";
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected final Set<ContainerId> launchedContainers =
|
||||||
|
Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param args Command line args
|
* @param args Command line args
|
||||||
*/
|
*/
|
||||||
@ -601,8 +607,12 @@ public void run() throws YarnException, IOException, InterruptedException {
|
|||||||
response.getContainersFromPreviousAttempts();
|
response.getContainersFromPreviousAttempts();
|
||||||
LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
|
LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
|
||||||
+ " previous attempts' running containers on AM registration.");
|
+ " previous attempts' running containers on AM registration.");
|
||||||
|
for(Container container: previousAMRunningContainers) {
|
||||||
|
launchedContainers.add(container.getId());
|
||||||
|
}
|
||||||
numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
|
numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
|
||||||
|
|
||||||
|
|
||||||
int numTotalContainersToRequest =
|
int numTotalContainersToRequest =
|
||||||
numTotalContainers - previousAMRunningContainers.size();
|
numTotalContainers - previousAMRunningContainers.size();
|
||||||
// Setup ask for containers from RM
|
// Setup ask for containers from RM
|
||||||
@ -716,7 +726,8 @@ protected boolean finish() {
|
|||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
@VisibleForTesting
|
||||||
|
class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public void onContainersCompleted(List<ContainerStatus> completedContainers) {
|
public void onContainersCompleted(List<ContainerStatus> completedContainers) {
|
||||||
@ -731,6 +742,14 @@ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
|
|||||||
|
|
||||||
// non complete containers should not be here
|
// non complete containers should not be here
|
||||||
assert (containerStatus.getState() == ContainerState.COMPLETE);
|
assert (containerStatus.getState() == ContainerState.COMPLETE);
|
||||||
|
// ignore containers we know nothing about - probably from a previous
|
||||||
|
// attempt
|
||||||
|
if (!launchedContainers.contains(containerStatus.getContainerId())) {
|
||||||
|
LOG.info("Ignoring completed status of "
|
||||||
|
+ containerStatus.getContainerId()
|
||||||
|
+ "; unknown container(probably launched by previous attempt)");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// increment counters for completed/failed containers
|
// increment counters for completed/failed containers
|
||||||
int exitStatus = containerStatus.getExitStatus();
|
int exitStatus = containerStatus.getExitStatus();
|
||||||
@ -796,14 +815,13 @@ public void onContainersAllocated(List<Container> allocatedContainers) {
|
|||||||
// + ", containerToken"
|
// + ", containerToken"
|
||||||
// +allocatedContainer.getContainerToken().getIdentifier().toString());
|
// +allocatedContainer.getContainerToken().getIdentifier().toString());
|
||||||
|
|
||||||
LaunchContainerRunnable runnableLaunchContainer =
|
Thread launchThread = createLaunchContainerThread(allocatedContainer);
|
||||||
new LaunchContainerRunnable(allocatedContainer, containerListener);
|
|
||||||
Thread launchThread = new Thread(runnableLaunchContainer);
|
|
||||||
|
|
||||||
// launch and start the container on a separate thread to keep
|
// launch and start the container on a separate thread to keep
|
||||||
// the main thread unblocked
|
// the main thread unblocked
|
||||||
// as all containers may not be allocated at one go.
|
// as all containers may not be allocated at one go.
|
||||||
launchThreads.add(launchThread);
|
launchThreads.add(launchThread);
|
||||||
|
launchedContainers.add(allocatedContainer.getId());
|
||||||
launchThread.start();
|
launchThread.start();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1150,4 +1168,30 @@ private static void publishApplicationAttemptEvent(
|
|||||||
+ appAttemptId.toString(), e);
|
+ appAttemptId.toString(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RMCallbackHandler getRMCallbackHandler() {
|
||||||
|
return new RMCallbackHandler();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setAmRMClient(AMRMClientAsync client) {
|
||||||
|
this.amRMClient = client;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getNumCompletedContainers() {
|
||||||
|
return numCompletedContainers.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean getDone() {
|
||||||
|
return done;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Thread createLaunchContainerThread(Container allocatedContainer) {
|
||||||
|
LaunchContainerRunnable runnableLaunchContainer =
|
||||||
|
new LaunchContainerRunnable(allocatedContainer, containerListener);
|
||||||
|
return new Thread(runnableLaunchContainer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,13 +20,143 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||||
|
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
||||||
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Matchers;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A bunch of tests to make sure that the container allocations
|
||||||
|
* and releases occur correctly.
|
||||||
|
*/
|
||||||
public class TestDSAppMaster {
|
public class TestDSAppMaster {
|
||||||
|
|
||||||
|
static class TestAppMaster extends ApplicationMaster {
|
||||||
|
private int threadsLaunched = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Thread createLaunchContainerThread(Container allocatedContainer) {
|
||||||
|
threadsLaunched++;
|
||||||
|
launchedContainers.add(allocatedContainer.getId());
|
||||||
|
return new Thread();
|
||||||
|
}
|
||||||
|
|
||||||
|
void setNumTotalContainers(int numTotalContainers) {
|
||||||
|
this.numTotalContainers = numTotalContainers;
|
||||||
|
}
|
||||||
|
|
||||||
|
int getAllocatedContainers() {
|
||||||
|
return this.numAllocatedContainers.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void startTimelineClient(final Configuration conf) throws YarnException,
|
||||||
|
IOException, InterruptedException {
|
||||||
|
timelineClient = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test
|
||||||
|
public void testDSAppMasterAllocateHandler() throws Exception {
|
||||||
|
|
||||||
|
TestAppMaster master = new TestAppMaster();
|
||||||
|
int targetContainers = 2;
|
||||||
|
AMRMClientAsync mockClient = Mockito.mock(AMRMClientAsync.class);
|
||||||
|
master.setAmRMClient(mockClient);
|
||||||
|
master.setNumTotalContainers(targetContainers);
|
||||||
|
Mockito.doNothing().when(mockClient)
|
||||||
|
.addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
|
||||||
|
|
||||||
|
ApplicationMaster.RMCallbackHandler handler = master.getRMCallbackHandler();
|
||||||
|
|
||||||
|
List<Container> containers = new ArrayList<>(1);
|
||||||
|
ContainerId id1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||||
|
containers.add(generateContainer(id1));
|
||||||
|
|
||||||
|
master.numRequestedContainers.set(targetContainers);
|
||||||
|
|
||||||
|
// first allocate a single container, everything should be fine
|
||||||
|
handler.onContainersAllocated(containers);
|
||||||
|
Assert.assertEquals("Wrong container allocation count", 1,
|
||||||
|
master.getAllocatedContainers());
|
||||||
|
Mockito.verifyZeroInteractions(mockClient);
|
||||||
|
Assert.assertEquals("Incorrect number of threads launched", 1,
|
||||||
|
master.threadsLaunched);
|
||||||
|
|
||||||
|
// now send 3 extra containers
|
||||||
|
containers.clear();
|
||||||
|
ContainerId id2 = BuilderUtils.newContainerId(1, 1, 1, 2);
|
||||||
|
containers.add(generateContainer(id2));
|
||||||
|
ContainerId id3 = BuilderUtils.newContainerId(1, 1, 1, 3);
|
||||||
|
containers.add(generateContainer(id3));
|
||||||
|
ContainerId id4 = BuilderUtils.newContainerId(1, 1, 1, 4);
|
||||||
|
containers.add(generateContainer(id4));
|
||||||
|
handler.onContainersAllocated(containers);
|
||||||
|
Assert.assertEquals("Wrong final container allocation count", 4,
|
||||||
|
master.getAllocatedContainers());
|
||||||
|
|
||||||
|
Assert.assertEquals("Incorrect number of threads launched", 4,
|
||||||
|
master.threadsLaunched);
|
||||||
|
|
||||||
|
// make sure we handle completion events correctly
|
||||||
|
List<ContainerStatus> status = new ArrayList<>();
|
||||||
|
status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));
|
||||||
|
status.add(generateContainerStatus(id2, ContainerExitStatus.SUCCESS));
|
||||||
|
status.add(generateContainerStatus(id3, ContainerExitStatus.ABORTED));
|
||||||
|
status.add(generateContainerStatus(id4, ContainerExitStatus.ABORTED));
|
||||||
|
handler.onContainersCompleted(status);
|
||||||
|
|
||||||
|
Assert.assertEquals("Unexpected number of completed containers",
|
||||||
|
targetContainers, master.getNumCompletedContainers());
|
||||||
|
Assert.assertTrue("Master didn't finish containers as expected",
|
||||||
|
master.getDone());
|
||||||
|
|
||||||
|
// test for events from containers we know nothing about
|
||||||
|
// these events should be ignored
|
||||||
|
status = new ArrayList<>();
|
||||||
|
ContainerId id5 = BuilderUtils.newContainerId(1, 1, 1, 5);
|
||||||
|
status.add(generateContainerStatus(id5, ContainerExitStatus.ABORTED));
|
||||||
|
Assert.assertEquals("Unexpected number of completed containers",
|
||||||
|
targetContainers, master.getNumCompletedContainers());
|
||||||
|
Assert.assertTrue("Master didn't finish containers as expected",
|
||||||
|
master.getDone());
|
||||||
|
status.add(generateContainerStatus(id5, ContainerExitStatus.SUCCESS));
|
||||||
|
Assert.assertEquals("Unexpected number of completed containers",
|
||||||
|
targetContainers, master.getNumCompletedContainers());
|
||||||
|
Assert.assertTrue("Master didn't finish containers as expected",
|
||||||
|
master.getDone());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Container generateContainer(ContainerId cid) {
|
||||||
|
return Container.newInstance(cid, NodeId.newInstance("host", 5000),
|
||||||
|
"host:80", Resource.newInstance(1024, 1), Priority.newInstance(0), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ContainerStatus
|
||||||
|
generateContainerStatus(ContainerId id, int exitStatus) {
|
||||||
|
return ContainerStatus.newInstance(id, ContainerState.COMPLETE, "",
|
||||||
|
exitStatus);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTimelineClientInDSAppMaster() throws Exception {
|
public void testTimelineClientInDSAppMaster() throws Exception {
|
||||||
ApplicationMaster appMaster = new ApplicationMaster();
|
ApplicationMaster appMaster = new ApplicationMaster();
|
||||||
|
Loading…
Reference in New Issue
Block a user