diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e276d19489..dc178d7143 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -74,6 +74,9 @@ Release 2.1.1-beta - UNRELEASED forceKillApplication on non-running and finished applications. (Xuan Gong via vinodkv) + YARN-643. Fixed ResourceManager to remove all tokens consistently on app + finish. (Xuan Gong via vinodkv) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index e287c20372..1543110db0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -761,6 +761,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { rejectedEvent.getApplicationAttemptId().getApplicationId(), message) ); + + appAttempt.removeTokens(appAttempt); } } @@ -847,7 +849,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId(); // Tell the AMS. Unregister from the ApplicationMasterService @@ -894,9 +895,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttemptId, finalAttemptState)); - // Remove the AppAttempt from the AMRMTokenSecretManager - appAttempt.rmContext.getAMRMTokenSecretManager() - .applicationMasterFinished(appAttemptId); + appAttempt.removeTokens(appAttempt); } } @@ -1015,7 +1014,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { " exitCode: " + status.getExitStatus() + " due to: " + status.getDiagnostics() + "." + "Failing this attempt."); - // Tell the app, scheduler super.transition(appAttempt, finishEvent); } @@ -1042,12 +1040,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { appAttempt.rmContext.getAMFinishingMonitor().unregister( appAttempt.getAppAttemptId()); - // Unregister from the ClientToAMTokenSecretManager - if (UserGroupInformation.isSecurityEnabled()) { - appAttempt.rmContext.getClientToAMTokenSecretManager() - .unRegisterApplication(appAttempt.getAppAttemptId()); - } - if(!appAttempt.submissionContext.getUnmanagedAM()) { // Tell the launcher to cleanup. appAttempt.eventHandler.handle(new AMLauncherEvent( @@ -1116,10 +1108,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId); - // Remove the AppAttempt from the AMRMTokenSecretManager - appAttempt.rmContext.getAMRMTokenSecretManager() - .applicationMasterFinished(appAttemptId); - appAttempt.progress = 1.0f; RMAppAttemptUnregistrationEvent unregisterEvent @@ -1267,4 +1255,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { + " MasterContainer: " + masterContainer); store.storeApplicationAttempt(this); } + + private void removeTokens(RMAppAttemptImpl appAttempt) { + // Unregister from the ClientToAMTokenSecretManager + if (UserGroupInformation.isSecurityEnabled()) { + appAttempt.rmContext.getClientToAMTokenSecretManager() + .unRegisterApplication(appAttempt.getAppAttemptId()); + } + + // Remove the AppAttempt from the AMRMTokenSecretManager + appAttempt.rmContext.getAMRMTokenSecretManager() + .applicationMasterFinished(appAttempt.getAppAttemptId()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index d61b5c9e6f..5261d077d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; import java.util.Collections; import java.util.List; @@ -35,6 +36,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -102,6 +104,11 @@ public class TestRMAppAttemptTransitions { private RMApp application; private RMAppAttempt applicationAttempt; + + private Configuration conf = new Configuration(); + private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf)); + private ClientToAMTokenSecretManagerInRM clientToAMTokenManager = + spy(new ClientToAMTokenSecretManagerInRM()); private final class TestApplicationAttemptEventDispatcher implements EventHandler { @@ -163,14 +170,13 @@ public class TestRMAppAttemptTransitions { mock(ContainerAllocationExpirer.class); amLivelinessMonitor = mock(AMLivelinessMonitor.class); amFinishingMonitor = mock(AMLivelinessMonitor.class); - Configuration conf = new Configuration(); rmContext = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, - null, new AMRMTokenSecretManager(conf), + null, amRMTokenManager, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM()); + clientToAMTokenManager); RMStateStore store = mock(RMStateStore.class); ((RMContextImpl) rmContext).setStateStore(store); @@ -261,7 +267,11 @@ public class TestRMAppAttemptTransitions { assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); assertEquals(0, applicationAttempt.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); - + if (UserGroupInformation.isSecurityEnabled()) { + verify(clientToAMTokenManager).registerApplication( + applicationAttempt.getAppAttemptId()); + } + assertNotNull(applicationAttempt.getAMRMToken()); // Check events verify(masterService). registerAppAttempt(applicationAttempt.getAppAttemptId()); @@ -288,6 +298,7 @@ public class TestRMAppAttemptTransitions { // this works for unmanaged and managed AM's because this is actually doing // verify(application).handle(anyObject()); verify(application).handle(any(RMAppRejectedEvent.class)); + verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); } /** @@ -303,6 +314,7 @@ public class TestRMAppAttemptTransitions { assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); assertEquals(0, applicationAttempt.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); + verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); } /** @@ -377,6 +389,8 @@ public class TestRMAppAttemptTransitions { // Check events verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class)); + + verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); } /** @@ -422,6 +436,7 @@ public class TestRMAppAttemptTransitions { applicationAttempt.getTrackingUrl()); assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); + verifyTokenCount(applicationAttempt.getAppAttemptId(), 0); } /** @@ -442,6 +457,7 @@ public class TestRMAppAttemptTransitions { .getJustFinishedContainers().size()); assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); + verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); } @@ -592,6 +608,7 @@ public class TestRMAppAttemptTransitions { applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.KILL)); testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS); + verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); } @Test @@ -666,6 +683,7 @@ public class TestRMAppAttemptTransitions { applicationAttempt.getAppAttemptId(), cs)); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); + verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); } @Test @@ -709,6 +727,7 @@ public class TestRMAppAttemptTransitions { applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); + verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); } @Test(timeout=10000) @@ -725,6 +744,7 @@ public class TestRMAppAttemptTransitions { applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); + verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); } @Test(timeout=20000) @@ -742,6 +762,7 @@ public class TestRMAppAttemptTransitions { applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); + verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); } @Test @@ -848,4 +869,10 @@ public class TestRMAppAttemptTransitions { diagnostics, 0); } + private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) { + verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId); + if (UserGroupInformation.isSecurityEnabled()) { + verify(clientToAMTokenManager, times(count)).unRegisterApplication(appAttemptId); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index b0c0488694..aa894c5f6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -46,6 +48,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -80,6 +84,7 @@ public class TestAMRMTokens { * * @throws Exception */ + @SuppressWarnings("unchecked") @Test public void testTokenExpiry() throws Exception { @@ -134,6 +139,20 @@ public class TestAMRMTokens { finishAMRequest.setTrackingUrl("url"); rmClient.finishApplicationMaster(finishAMRequest); + // Send RMAppAttemptEventType.CONTAINER_FINISHED to transit RMAppAttempt + // from Finishing state to Finished State. Both AMRMToken and + // ClientToAMToken will be removed. + ContainerStatus containerStatus = + BuilderUtils.newContainerStatus(attempt.getMasterContainer().getId(), + ContainerState.COMPLETE, + "AM Container Finished", 0); + rm.getRMContext() + .getDispatcher() + .getEventHandler() + .handle( + new RMAppAttemptContainerFinishedEvent(applicationAttemptId, + containerStatus)); + // Now simulate trying to allocate. RPC call itself should throw auth // exception. rpc.stopProxy(rmClient, conf); // To avoid using cached client