YARN-11420. Stabilize TestNMClient (#5317)

This commit is contained in:
K0K0V0K 2023-12-05 11:32:42 +01:00 committed by GitHub
parent 78d5fe24d6
commit 5d1f889432
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -23,28 +23,23 @@
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.NMClient;
@ -60,53 +55,48 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.junit.function.ThrowingRunnable;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestNMClient { public class TestNMClient {
Configuration conf = null; private static final String IS_NOT_HANDLED_BY_THIS_NODEMANAGER =
MiniYARNCluster yarnCluster = null; "is not handled by this NodeManager";
YarnClientImpl yarnClient = null; private static final String UNKNOWN_CONTAINER =
AMRMClientImpl<ContainerRequest> rmClient = null; "Unknown container";
NMClientImpl nmClient = null;
List<NodeReport> nodeReports = null; private static final int NUMBER_OF_CONTAINERS = 5;
ApplicationAttemptId attemptId = null; private Configuration conf;
int nodeCount = 3; private MiniYARNCluster yarnCluster;
NMTokenCache nmTokenCache = null; private YarnClientImpl yarnClient;
private AMRMClientImpl<ContainerRequest> rmClient;
private NMClientImpl nmClient;
private List<NodeReport> nodeReports;
private NMTokenCache nmTokenCache;
private RMAppAttempt appAttempt;
/** /**
* Container State transition listener to track the number of times * Container State transition listener to track the number of times
* a container has transitioned into a state. * a container has transitioned into a state.
*/ */
public static class DebugSumContainerStateListener public static class DebugSumContainerStateListener implements ContainerStateTransitionListener {
implements ContainerStateTransitionListener { public static final Map<ContainerId, Integer> RUNNING_TRANSITIONS = new ConcurrentHashMap<>();
private static final Logger LOG =
LoggerFactory.getLogger(DebugSumContainerStateListener.class);
private static final Map<ContainerId,
Map<org.apache.hadoop.yarn.server.nodemanager.containermanager
.container.ContainerState, Long>>
TRANSITION_COUNTER = new HashMap<>();
public void init(Context context) { public void init(Context context) {
} }
@ -125,444 +115,329 @@ public void postTransition(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.ContainerState afterState, .ContainerState afterState,
ContainerEvent processedEvent) { ContainerEvent processedEvent) {
synchronized (TRANSITION_COUNTER) { if (beforeState != afterState &&
if (beforeState != afterState) { afterState == org.apache.hadoop.yarn.server.nodemanager.containermanager.container
ContainerId id = op.getContainerId(); .ContainerState.RUNNING) {
TRANSITION_COUNTER RUNNING_TRANSITIONS.compute(op.getContainerId(),
.putIfAbsent(id, new HashMap<>()); (containerId, counter) -> counter == null ? 1 : ++counter);
long sum = TRANSITION_COUNTER.get(id)
.compute(afterState,
(state, count) -> count == null ? 1 : count + 1);
LOG.info("***** " + id +
" Transition from " + beforeState +
" to " + afterState +
"sum:" + sum);
}
} }
} }
/**
* Get the current number of state transitions.
* This is useful to check, if an event has occurred in unit tests.
* @param id Container id to check
* @param state Return the overall number of transitions to this state
* @return Number of transitions to the state specified
*/
static long getTransitionCounter(ContainerId id,
org.apache.hadoop.yarn.server.nodemanager
.containermanager.container
.ContainerState state) {
Long ret = TRANSITION_COUNTER.getOrDefault(id, new HashMap<>())
.get(state);
return ret != null ? ret : 0;
}
} }
@Before public void setup() throws YarnException, IOException, InterruptedException, TimeoutException {
public void setup() throws YarnException, IOException {
// start minicluster
conf = new YarnConfiguration(); conf = new YarnConfiguration();
// Turn on state tracking
conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS, conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
DebugSumContainerStateListener.class.getName()); DebugSumContainerStateListener.class.getName());
yarnCluster = startYarnCluster();
new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); startYarnClient();
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
nmTokenCache = new NMTokenCache();
startRMClient();
startNMClient();
}
private void startYarnCluster() {
yarnCluster = new MiniYARNCluster(TestNMClient.class.getName(), 3, 1, 1);
yarnCluster.init(conf); yarnCluster.init(conf);
yarnCluster.start(); yarnCluster.start();
assertNotNull(yarnCluster);
assertEquals(STATE.STARTED, yarnCluster.getServiceState()); assertEquals(STATE.STARTED, yarnCluster.getServiceState());
}
// start rm client private void startYarnClient()
throws IOException, YarnException, InterruptedException, TimeoutException {
yarnClient = (YarnClientImpl) YarnClient.createYarnClient(); yarnClient = (YarnClientImpl) YarnClient.createYarnClient();
yarnClient.init(conf); yarnClient.init(conf);
yarnClient.start(); yarnClient.start();
assertNotNull(yarnClient);
assertEquals(STATE.STARTED, yarnClient.getServiceState()); assertEquals(STATE.STARTED, yarnClient.getServiceState());
// get node info
nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
// submit new app ApplicationSubmissionContext appContext =
ApplicationSubmissionContext appContext =
yarnClient.createApplication().getApplicationSubmissionContext(); yarnClient.createApplication().getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId(); ApplicationId appId = appContext.getApplicationId();
// set the application name
appContext.setApplicationName("Test"); appContext.setApplicationName("Test");
// Set the priority for the application master
Priority pri = Priority.newInstance(0); Priority pri = Priority.newInstance(0);
appContext.setPriority(pri); appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue("default"); appContext.setQueue("default");
// Set up the container launch context for the application master ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
ContainerLaunchContext amContainer = Records
.newRecord(ContainerLaunchContext.class);
appContext.setAMContainerSpec(amContainer); appContext.setAMContainerSpec(amContainer);
// unmanaged AM
appContext.setUnmanagedAM(true); appContext.setUnmanagedAM(true);
// Create the request to send to the applications manager
SubmitApplicationRequest appRequest = Records SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
.newRecord(SubmitApplicationRequest.class);
appRequest.setApplicationSubmissionContext(appContext); appRequest.setApplicationSubmissionContext(appContext);
// Submit the application to the applications manager
yarnClient.submitApplication(appContext); yarnClient.submitApplication(appContext);
GenericTestUtils.waitFor(() -> yarnCluster.getResourceManager().getRMContext().getRMApps()
.get(appId).getCurrentAppAttempt().getAppAttemptState() == RMAppAttemptState.LAUNCHED,
100, 30_000, "Failed to start app");
appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
.get(appId).getCurrentAppAttempt();
}
// wait for app to start private void startRMClient() {
int iterationsLeft = 30; rmClient = (AMRMClientImpl<ContainerRequest>) AMRMClient.createAMRMClient();
RMAppAttempt appAttempt = null;
while (iterationsLeft > 0) {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
if (appReport.getYarnApplicationState() ==
YarnApplicationState.ACCEPTED) {
attemptId = appReport.getCurrentApplicationAttemptId();
appAttempt =
yarnCluster.getResourceManager().getRMContext().getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
while (true) {
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
break;
}
}
break;
}
sleep(1000);
--iterationsLeft;
}
if (iterationsLeft == 0) {
fail("Application hasn't bee started");
}
// Just dig into the ResourceManager and get the AMRMToken just for the sake
// of testing.
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
//creating an instance NMTokenCase
nmTokenCache = new NMTokenCache();
// start am rm client
rmClient =
(AMRMClientImpl<ContainerRequest>) AMRMClient
.<ContainerRequest> createAMRMClient();
//setting an instance NMTokenCase
rmClient.setNMTokenCache(nmTokenCache); rmClient.setNMTokenCache(nmTokenCache);
rmClient.init(conf); rmClient.init(conf);
rmClient.start(); rmClient.start();
assertNotNull(rmClient);
assertEquals(STATE.STARTED, rmClient.getServiceState()); assertEquals(STATE.STARTED, rmClient.getServiceState());
}
// start am nm client private void startNMClient() {
nmClient = (NMClientImpl) NMClient.createNMClient(); nmClient = (NMClientImpl) NMClient.createNMClient();
//propagating the AMRMClient NMTokenCache instance
nmClient.setNMTokenCache(rmClient.getNMTokenCache()); nmClient.setNMTokenCache(rmClient.getNMTokenCache());
nmClient.init(conf); nmClient.init(conf);
nmClient.start(); nmClient.start();
assertNotNull(nmClient);
assertEquals(STATE.STARTED, nmClient.getServiceState()); assertEquals(STATE.STARTED, nmClient.getServiceState());
} }
@After public void tearDown() throws InterruptedException {
public void tearDown() {
rmClient.stop(); rmClient.stop();
yarnClient.stop(); yarnClient.stop();
yarnCluster.stop(); yarnCluster.stop();
} }
private void stopNmClient(boolean stopContainers) { @Test (timeout = 180_000)
public void testNMClientNoCleanupOnStop()
throws YarnException, IOException, InterruptedException, TimeoutException {
runTest(() -> {
stopNmClient();
assertFalse(nmClient.startedContainers.isEmpty());
nmClient.cleanupRunningContainers();
assertEquals(0, nmClient.startedContainers.size());
});
}
@Test (timeout = 200_000)
public void testNMClient()
throws YarnException, IOException, InterruptedException, TimeoutException {
runTest(() -> {
// stop the running containers on close
assertFalse(nmClient.startedContainers.isEmpty());
nmClient.cleanupRunningContainersOnStop(true);
assertTrue(nmClient.getCleanupRunningContainers().get());
nmClient.stop();
});
}
public void runTest(
Runnable test
) throws IOException, InterruptedException, YarnException, TimeoutException {
setup();
rmClient.registerApplicationMaster("Host", 10_000, "");
testContainerManagement(nmClient, allocateContainers(rmClient));
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null);
test.run();
tearDown();
}
private void stopNmClient() {
assertNotNull("Null nmClient", nmClient); assertNotNull("Null nmClient", nmClient);
// leave one unclosed // leave one unclosed
assertEquals(1, nmClient.startedContainers.size()); assertEquals(1, nmClient.startedContainers.size());
// default true // default true
assertTrue(nmClient.getCleanupRunningContainers().get()); assertTrue(nmClient.getCleanupRunningContainers().get());
nmClient.cleanupRunningContainersOnStop(stopContainers); nmClient.cleanupRunningContainersOnStop(false);
assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get()); assertFalse(nmClient.getCleanupRunningContainers().get());
nmClient.stop();
}
@Test (timeout = 180000)
public void testNMClientNoCleanupOnStop()
throws YarnException, IOException {
rmClient.registerApplicationMaster("Host", 10000, "");
testContainerManagement(nmClient, allocateContainers(rmClient, 5));
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
// don't stop the running containers
stopNmClient(false);
assertFalse(nmClient.startedContainers.isEmpty());
//now cleanup
nmClient.cleanupRunningContainers();
assertEquals(0, nmClient.startedContainers.size());
}
@Test (timeout = 200000)
public void testNMClient()
throws YarnException, IOException {
rmClient.registerApplicationMaster("Host", 10000, "");
testContainerManagement(nmClient, allocateContainers(rmClient, 5));
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
// stop the running containers on close
assertFalse(nmClient.startedContainers.isEmpty());
nmClient.cleanupRunningContainersOnStop(true);
assertTrue(nmClient.getCleanupRunningContainers().get());
nmClient.stop(); nmClient.stop();
} }
private Set<Container> allocateContainers( private Set<Container> allocateContainers(
AMRMClientImpl<ContainerRequest> rmClient, int num) AMRMClientImpl<ContainerRequest> client
throws YarnException, IOException { ) throws YarnException, IOException {
// setup container request for (int i = 0; i < NUMBER_OF_CONTAINERS; ++i) {
Resource capability = Resource.newInstance(1024, 0); client.addContainerRequest(new ContainerRequest(
Priority priority = Priority.newInstance(0); Resource.newInstance(1024, 0),
String node = nodeReports.get(0).getNodeId().getHost(); new String[] {nodeReports.get(0).getNodeId().getHost()},
String rack = nodeReports.get(0).getRackName(); new String[] {nodeReports.get(0).getRackName()},
String[] nodes = new String[] {node}; Priority.newInstance(0)
String[] racks = new String[] {rack}; ));
for (int i = 0; i < num; ++i) {
rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
racks, priority));
} }
Set<Container> allocatedContainers = new TreeSet<>();
int containersRequestedAny = rmClient.getTable(0) while (allocatedContainers.size() < NUMBER_OF_CONTAINERS) {
.get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, AllocateResponse allocResponse = client.allocate(0.1f);
capability).remoteRequest.getNumContainers(); allocatedContainers.addAll(allocResponse.getAllocatedContainers());
for (NMToken token : allocResponse.getNMTokens()) {
// RM should allocate container within 2 calls to allocate() client.getNMTokenCache().setToken(token.getNodeId().toString(), token.getToken());
int allocatedContainerCount = 0;
int iterationsLeft = 2;
Set<Container> containers = new TreeSet<Container>();
while (allocatedContainerCount < containersRequestedAny
&& iterationsLeft > 0) {
AllocateResponse allocResponse = rmClient.allocate(0.1f);
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
for(Container container : allocResponse.getAllocatedContainers()) {
containers.add(container);
} }
if (!allocResponse.getNMTokens().isEmpty()) { if (allocatedContainers.size() < NUMBER_OF_CONTAINERS) {
for (NMToken token : allocResponse.getNMTokens()) { sleep(100);
rmClient.getNMTokenCache().setToken(token.getNodeId().toString(),
token.getToken());
}
} }
if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(1000);
}
--iterationsLeft;
} }
return containers; return allocatedContainers;
} }
private void testContainerManagement(NMClientImpl client, private void testContainerManagement(
Set<Container> containers) throws YarnException, IOException { NMClientImpl client, Set<Container> containers
) throws YarnException, IOException {
int size = containers.size(); int size = containers.size();
int i = 0; int i = 0;
for (Container container : containers) { for (Container container : containers) {
// getContainerStatus shouldn't be called before startContainer, // getContainerStatus shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container // otherwise, NodeManager cannot find the container
try { assertYarnException(
client.getContainerStatus(container.getId(), container.getNodeId()); () -> client.getContainerStatus(container.getId(), container.getNodeId()),
fail("Exception is expected"); IS_NOT_HANDLED_BY_THIS_NODEMANAGER);
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("is not handled by this NodeManager"));
}
// upadateContainerResource shouldn't be called before startContainer, // upadateContainerResource shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container // otherwise, NodeManager cannot find the container
try { assertYarnException(
client.updateContainerResource(container); () -> client.updateContainerResource(container),
fail("Exception is expected"); IS_NOT_HANDLED_BY_THIS_NODEMANAGER);
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("is not handled by this NodeManager"));
}
// restart shouldn't be called before startContainer, // restart shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container // otherwise, NodeManager cannot find the container
try { assertYarnException(
client.restartContainer(container.getId()); () -> client.restartContainer(container.getId()),
fail("Exception is expected"); UNKNOWN_CONTAINER);
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("Unknown container"));
}
// rollback shouldn't be called before startContainer, // rollback shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container // otherwise, NodeManager cannot find the container
try { assertYarnException(
client.rollbackLastReInitialization(container.getId()); () -> client.rollbackLastReInitialization(container.getId()),
fail("Exception is expected"); UNKNOWN_CONTAINER);
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("Unknown container"));
}
// commit shouldn't be called before startContainer, // commit shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container // otherwise, NodeManager cannot find the container
try { assertYarnException(
client.commitLastReInitialization(container.getId()); () -> client.commitLastReInitialization(container.getId()),
fail("Exception is expected"); UNKNOWN_CONTAINER);
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("Unknown container"));
}
// stopContainer shouldn't be called before startContainer, // stopContainer shouldn't be called before startContainer,
// otherwise, an exception will be thrown // otherwise, an exception will be thrown
try { assertYarnException(
client.stopContainer(container.getId(), container.getNodeId()); () -> client.stopContainer(container.getId(), container.getNodeId()),
fail("Exception is expected"); IS_NOT_HANDLED_BY_THIS_NODEMANAGER);
} catch (YarnException e) {
if (!e.getMessage()
.contains("is not handled by this NodeManager")) {
throw new AssertionError("Exception is not expected: ", e);
}
}
Credentials ts = new Credentials(); Credentials ts = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob); ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); ContainerLaunchContext clc = Records.newRecord(ContainerLaunchContext.class);
ContainerLaunchContext clc = clc.setCommands(Shell.WINDOWS
Records.newRecord(ContainerLaunchContext.class); ? Arrays.asList("ping", "-n", "10000000", "127.0.0.1", ">nul")
if (Shell.WINDOWS) { : Arrays.asList("sleep", "1000000")
clc.setCommands( );
Arrays.asList("ping", "-n", "10000000", "127.0.0.1", ">nul"));
} else {
clc.setCommands(Arrays.asList("sleep", "1000000"));
}
clc.setTokens(securityTokens); clc.setTokens(securityTokens);
try { client.startContainer(container, clc);
client.startContainer(container, clc); List<Integer> exitStatuses = Arrays.asList(-1000, -105);
} catch (YarnException e) {
throw new AssertionError("Exception is not expected ", e);
}
List<Integer> exitStatuses = Collections.singletonList(-1000);
// leave one container unclosed // leave one container unclosed
if (++i < size) { if (++i < size) {
testContainer(client, i, container, clc, exitStatuses); testContainer(client, i, container, clc, exitStatuses);
} }
} }
} }
private void testContainer(NMClientImpl client, int i, Container container, private void testContainer(NMClientImpl client, int i, Container container,
ContainerLaunchContext clc, List<Integer> exitCode) ContainerLaunchContext clc, List<Integer> exitCode)
throws YarnException, IOException { throws YarnException, IOException {
// NodeManager may still need some time to make the container started
testGetContainerStatus(container, i, ContainerState.RUNNING, "", testGetContainerStatus(container, i, ContainerState.RUNNING, "",
exitCode); exitCode);
waitForContainerTransitionCount(container, waitForContainerRunningTransitionCount(container, 1);
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING, 1);
// Test increase container API and make sure requests can reach NM
testIncreaseContainerResource(container); testIncreaseContainerResource(container);
testRestartContainer(container);
testRestartContainer(container.getId());
testGetContainerStatus(container, i, ContainerState.RUNNING, testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Restarted", exitCode); "will be Restarted", exitCode);
waitForContainerTransitionCount(container, waitForContainerRunningTransitionCount(container, 2);
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING, 2);
if (i % 2 == 0) { if (i % 2 == 0) {
testReInitializeContainer(container.getId(), clc, false); testReInitializeContainer(container, clc, false);
testGetContainerStatus(container, i, ContainerState.RUNNING, testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Re-initialized", exitCode); "will be Re-initialized", exitCode);
waitForContainerTransitionCount(container, waitForContainerRunningTransitionCount(container, 3);
org.apache.hadoop.yarn.server.nodemanager. testContainerRollback(container, true);
containermanager.container.ContainerState.RUNNING, 3);
testRollbackContainer(container.getId(), false);
testGetContainerStatus(container, i, ContainerState.RUNNING, testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Rolled-back", exitCode); "will be Rolled-back", exitCode);
waitForContainerTransitionCount(container, waitForContainerRunningTransitionCount(container, 4);
org.apache.hadoop.yarn.server.nodemanager. testContainerCommit(container, false);
containermanager.container.ContainerState.RUNNING, 4); testReInitializeContainer(container, clc, false);
testCommitContainer(container.getId(), true);
testReInitializeContainer(container.getId(), clc, false);
testGetContainerStatus(container, i, ContainerState.RUNNING, testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Re-initialized", exitCode); "will be Re-initialized", exitCode);
waitForContainerTransitionCount(container, waitForContainerRunningTransitionCount(container, 5);
org.apache.hadoop.yarn.server.nodemanager. testContainerCommit(container, true);
containermanager.container.ContainerState.RUNNING, 5);
testCommitContainer(container.getId(), false);
} else { } else {
testReInitializeContainer(container.getId(), clc, true); testReInitializeContainer(container, clc, true);
testGetContainerStatus(container, i, ContainerState.RUNNING, testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Re-initialized", exitCode); "will be Re-initialized", exitCode);
waitForContainerTransitionCount(container, waitForContainerRunningTransitionCount(container, 3);
org.apache.hadoop.yarn.server.nodemanager. testContainerRollback(container, false);
containermanager.container.ContainerState.RUNNING, 3); testContainerCommit(container, false);
testRollbackContainer(container.getId(), true);
testCommitContainer(container.getId(), true);
} }
client.stopContainer(container.getId(), container.getNodeId());
testGetContainerStatus(container, i, ContainerState.COMPLETE,
"killed by the ApplicationMaster", exitCode);
}
try { private void waitForContainerRunningTransitionCount(Container container, long transitions) {
client.stopContainer(container.getId(), container.getNodeId()); while (DebugSumContainerStateListener.RUNNING_TRANSITIONS
} catch (YarnException e) { .getOrDefault(container.getId(), 0) != transitions) {
throw (AssertionError) sleep(500);
(new AssertionError("Exception is not expected: " + e, e));
} }
}
// getContainerStatus can be called after stopContainer
try { private void testGetContainerStatus(Container container, int index,
// O is possible if CLEANUP_CONTAINER is executed too late ContainerState state, String diagnostics,
// -105 is possible if the container is not terminated but killed List<Integer> exitStatuses)
testGetContainerStatus(container, i, ContainerState.COMPLETE, throws YarnException, IOException {
"Container killed by the ApplicationMaster.", while (true) {
Arrays.asList( sleep(250);
ContainerExitStatus.KILLED_BY_APPMASTER, ContainerStatus status = nmClient.getContainerStatus(
ContainerExitStatus.SUCCESS)); container.getId(), container.getNodeId());
} catch (YarnException e) { // NodeManager may still need some time to get the stable
// The exception is possible because, after the container is stopped, // container status
// it may be removed from NM's context. if (status.getState() == state) {
if (!e.getMessage() assertEquals(container.getId(), status.getContainerId());
.contains("was recently stopped on node manager")) { assertTrue(index + ": " + status.getDiagnostics(),
throw (AssertionError) status.getDiagnostics().contains(diagnostics));
(new AssertionError("Exception is not expected: ", e));
assertTrue("Exit Statuses are supposed to be in: " + exitStatuses +
", but the actual exit status code is: " +
status.getExitStatus(),
exitStatuses.contains(status.getExitStatus()));
break;
} }
} }
} }
/** @SuppressWarnings("deprecation")
* Wait until the container reaches a state N times. private void testIncreaseContainerResource(Container container) {
* @param container container to watch assertYarnException(
* @param state state to test () -> nmClient.increaseContainerResource(container),
* @param transitions the number N above container.getId() + " has update version ");
* @throws YarnException This happens if the test times out while waiting }
*/
private void waitForContainerTransitionCount( private void testRestartContainer(Container container) throws IOException, YarnException {
Container container, nmClient.restartContainer(container.getId());
org.apache.hadoop.yarn.server.nodemanager. }
containermanager.container.ContainerState state, long transitions)
throws YarnException { private void testContainerRollback(Container container, boolean enabled)
long transitionCount = -1; throws IOException, YarnException {
do { if (enabled) {
if (transitionCount != -1) { nmClient.rollbackLastReInitialization(container.getId());
try { } else {
Thread.sleep(10); assertYarnException(
} catch (InterruptedException e) { () -> nmClient.rollbackLastReInitialization(container.getId()),
throw new YarnException( "Nothing to rollback to");
"Timeout at transition count:" + transitionCount, e); }
} }
}
transitionCount = DebugSumContainerStateListener private void testContainerCommit(Container container, boolean enabled)
.getTransitionCounter(container.getId(), state); throws IOException, YarnException {
} while (transitionCount != transitions); if (enabled) {
nmClient.commitLastReInitialization(container.getId());
} else {
assertYarnException(
() -> nmClient.commitLastReInitialization(container.getId()),
"Nothing to Commit");
}
}
private void testReInitializeContainer(
Container container, ContainerLaunchContext clc, boolean autoCommit
) throws IOException, YarnException {
nmClient.reInitializeContainer(container.getId(), clc, autoCommit);
}
private void assertYarnException(ThrowingRunnable runnable, String text) {
YarnException e = assertThrows(YarnException.class, runnable);
assertTrue(String.format("The thrown exception is not expected cause it has text [%s]"
+ ", what not contains text [%s]", e.getMessage(), text), e.getMessage().contains(text));
} }
private void sleep(int sleepTime) { private void sleep(int sleepTime) {
@ -570,131 +445,7 @@ private void sleep(int sleepTime) {
Thread.sleep(sleepTime); Thread.sleep(sleepTime);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} throw new RuntimeException(e);
}
private void testGetContainerStatus(Container container, int index,
ContainerState state, String diagnostics, List<Integer> exitStatuses)
throws YarnException, IOException {
while (true) {
sleep(250);
ContainerStatus status = nmClient.getContainerStatus(
container.getId(), container.getNodeId());
// NodeManager may still need some time to get the stable
// container status
if (status.getState() == state) {
assertEquals(container.getId(), status.getContainerId());
assertTrue("" + index + ": " + status.getDiagnostics(),
status.getDiagnostics().contains(diagnostics));
assertTrue("Exit Statuses are supposed to be in: " + exitStatuses +
", but the actual exit status code is: " +
status.getExitStatus(),
exitStatuses.contains(status.getExitStatus()));
break;
}
}
}
@SuppressWarnings("deprecation")
private void testIncreaseContainerResource(Container container)
throws YarnException, IOException {
try {
nmClient.increaseContainerResource(container);
} catch (YarnException e) {
// NM container increase container resource should fail without a version
// increase action to fail.
if (!e.getMessage().contains(
container.getId() + " has update version ")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
private void testRestartContainer(ContainerId containerId)
throws YarnException, IOException {
try {
sleep(250);
nmClient.restartContainer(containerId);
sleep(250);
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
private void testRollbackContainer(ContainerId containerId,
boolean notRollbackable) throws YarnException, IOException {
try {
sleep(250);
nmClient.rollbackLastReInitialization(containerId);
if (notRollbackable) {
fail("Should not be able to rollback..");
}
sleep(250);
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (notRollbackable) {
Assert.assertTrue(e.getMessage().contains(
"Nothing to rollback to"));
} else {
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
}
private void testCommitContainer(ContainerId containerId,
boolean notCommittable) throws YarnException, IOException {
try {
nmClient.commitLastReInitialization(containerId);
if (notCommittable) {
fail("Should not be able to commit..");
}
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (notCommittable) {
Assert.assertTrue(e.getMessage().contains(
"Nothing to Commit"));
} else {
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
}
private void testReInitializeContainer(ContainerId containerId,
ContainerLaunchContext clc, boolean autoCommit)
throws YarnException, IOException {
try {
nmClient.reInitializeContainer(containerId, clc, autoCommit);
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
} }
} }
} }