diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5dfe65d78d..c9bd9d81e5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -85,6 +85,9 @@ Release 2.5.0 - UNRELEASED YARN-1981. Nodemanager version is not updated when a node reconnects (Jason Lowe via jeagles) + YARN-2053. Fixed a bug in AMS to not add null NMToken into NMTokens list from + previous attempts for work-preserving AM restart. (Wangda Tan via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 6b2cb7f876..94dc47437f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -298,9 +298,12 @@ public class ApplicationMasterService extends AbstractService implements List nmTokens = new ArrayList(); for (Container container : transferredContainers) { try { - nmTokens.add(rmContext.getNMTokenSecretManager() - .createAndGetNMToken(app.getUser(), applicationAttemptId, - container)); + NMToken token = rmContext.getNMTokenSecretManager() + .createAndGetNMToken(app.getUser(), applicationAttemptId, + container); + if (null != token) { + nmTokens.add(token); + } } catch (IllegalArgumentException e) { // if it's a DNS issue, throw UnknowHostException directly and that // will be automatically retried by RMProxy in RPC layer. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 6b7b464d55..bcd8c1b508 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -264,31 +264,36 @@ public class TestAMRestart { nm2.registerNode(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - int NUM_CONTAINERS = 1; List containers = new ArrayList(); // nmTokens keeps track of all the nmTokens issued in the allocate call. List expectedNMTokens = new ArrayList(); - // am1 allocate 1 container on nm1. + // am1 allocate 2 container on nm1. + // first container while (true) { AllocateResponse response = - am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS, + am1.allocate("127.0.0.1", 2000, 2, new ArrayList()); nm1.nodeHeartbeat(true); containers.addAll(response.getAllocatedContainers()); expectedNMTokens.addAll(response.getNMTokens()); - if (containers.size() == NUM_CONTAINERS) { + if (containers.size() == 2) { break; } Thread.sleep(200); System.out.println("Waiting for container to be allocated."); } - // launch the container + // launch the container-2 nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); ContainerId containerId2 = ContainerId.newInstance(am1.getApplicationAttemptId(), 2); rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); - + // launch the container-3 + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING); + ContainerId containerId3 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 3); + rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING); + // fail am1 nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am1.waitForState(RMAppAttemptState.FAILED); @@ -308,12 +313,12 @@ public class TestAMRestart { containers = new ArrayList(); while (true) { AllocateResponse allocateResponse = - am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS, + am2.allocate("127.1.1.1", 4000, 1, new ArrayList()); nm2.nodeHeartbeat(true); containers.addAll(allocateResponse.getAllocatedContainers()); expectedNMTokens.addAll(allocateResponse.getNMTokens()); - if (containers.size() == NUM_CONTAINERS) { + if (containers.size() == 1) { break; } Thread.sleep(200);