From d14eff7d3896b75d4da10fcfff15c42fcca48f7c Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 25 Mar 2014 18:21:55 +0000 Subject: [PATCH] YARN-1866. Fixed an issue with renewal of RM-delegation tokens on restart or fail-over. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1581448 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../security/DelegationTokenRenewer.java | 15 +++++-- .../server/resourcemanager/TestRMRestart.java | 41 +++++++++++-------- .../security/TestDelegationTokenRenewer.java | 11 +++-- .../TestDelegationTokenRenewerLifecycle.java | 13 +++++- 5 files changed, 54 insertions(+), 29 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0ab36911f9..30135e381f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -571,6 +571,9 @@ Release 2.4.0 - UNRELEASED YARN-1852. Fixed RMAppAttempt to not resend AttemptFailed/AttemptKilled events to already recovered Failed/Killed RMApps. (Rohith via jianhe) + YARN-1866. Fixed an issue with renewal of RM-delegation tokens on restart or + fail-over. (Jian He via vinodkv) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index 82464cfeaf..38e908926d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -112,6 +112,8 @@ protected synchronized void serviceInit(Configuration conf) throws Exception { this.tokenRemovalDelayMs = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + + setLocalSecretManagerAndServiceAddr(); renewerService = createNewThreadPoolService(conf); pendingEventQueue = new LinkedBlockingQueue(); renewalTimer = new Timer(true); @@ -134,6 +136,13 @@ protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) { return pool; } + // enable RM to short-circuit token operations directly to itself + private void setLocalSecretManagerAndServiceAddr() { + RMDelegationTokenIdentifier.Renewer.setSecretManager(rmContext + .getRMDelegationTokenSecretManager(), rmContext.getClientRMService() + .getBindAddress()); + } + @Override protected void serviceStart() throws Exception { dtCancelThread.start(); @@ -143,10 +152,8 @@ protected void serviceStart() throws Exception { "DelayedTokenCanceller"); delayedRemovalThread.start(); } - // enable RM to short-circuit token operations directly to itself - RMDelegationTokenIdentifier.Renewer.setSecretManager( - rmContext.getRMDelegationTokenSecretManager(), - rmContext.getClientRMService().getBindAddress()); + + setLocalSecretManagerAndServiceAddr(); serviceStateLock.writeLock().lock(); isServiceStarted = true; serviceStateLock.writeLock().unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index ffb6fd9586..49eff8b014 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -394,7 +394,7 @@ public void testRMRestart() throws Exception { Assert.assertEquals(4, rmAppState.size()); } - @Test + @Test (timeout = 60000) public void testRMRestartAppRunningAMFailed() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -440,7 +440,7 @@ public void testRMRestartAppRunningAMFailed() throws Exception { rm2.stop(); } - @Test + @Test (timeout = 60000) public void testRMRestartWaitForPreviousAMToFinish() throws Exception { // testing 3 cases // After RM restarts @@ -607,7 +607,7 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception { // store but before the RMAppAttempt notifies RMApp that it has succeeded. On // recovery, RMAppAttempt should send the AttemptFinished event to RMApp so // that RMApp can recover its state. - @Test + @Test (timeout = 60000) public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); MemoryRMStateStore memStore = new MemoryRMStateStore() { @@ -660,7 +660,7 @@ public void updateApplicationStateInternal(ApplicationId appId, rmAppState.get(app0.getApplicationId()).getState()); } - @Test + @Test (timeout = 60000) public void testRMRestartFailedApp() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); MemoryRMStateStore memStore = new MemoryRMStateStore(); @@ -709,7 +709,7 @@ public void testRMRestartFailedApp() throws Exception { rm2.stop(); } - @Test + @Test (timeout = 60000) public void testRMRestartKilledApp() throws Exception{ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -757,7 +757,7 @@ public void testRMRestartKilledApp() throws Exception{ rm2.stop(); } - @Test + @Test (timeout = 60000) public void testRMRestartKilledAppWithNoAttempts() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore() { @Override @@ -797,7 +797,7 @@ public synchronized void updateApplicationAttemptStateInternal( Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0); } - @Test + @Test (timeout = 60000) public void testRMRestartSucceededApp() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -849,7 +849,7 @@ public void testRMRestartSucceededApp() throws Exception { rm2.stop(); } - @Test + @Test (timeout = 60000) public void testRMRestartGetApplicationList() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); MemoryRMStateStore memStore = new MemoryRMStateStore(); @@ -997,7 +997,7 @@ private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm, appState.getAttempt(am.getApplicationAttemptId()).getState()); } - @Test + @Test (timeout = 60000) public void testRMRestartOnMaxAppAttempts() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -1071,7 +1071,7 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { rm2.stop(); } - @Test + @Test (timeout = 60000) public void testDelegationTokenRestoredInDelegationTokenRenewer() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); @@ -1171,7 +1171,7 @@ private void waitForTokensToBeRenewed(MockRM rm2) throws Exception { } } - @Test + @Test (timeout = 60000) public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, @@ -1261,7 +1261,7 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { rm2.stop(); } - @Test + @Test (timeout = 60000) public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set( @@ -1414,7 +1414,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // This is to test submit an application to the new RM with the old delegation // token got from previous RM. - @Test + @Test (timeout = 60000) public void testAppSubmissionWithOldDelegationTokenAfterRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); @@ -1449,7 +1449,7 @@ public void testAppSubmissionWithOldDelegationTokenAfterRMRestart() rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); } - @Test + @Test (timeout = 60000) public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore() { volatile boolean wait = true; @@ -1508,7 +1508,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) { Assert.assertTrue(rmAppState.size() == NUM_APPS); } - @Test + @Test (timeout = 60000) public void testFinishedAppRemovalAfterRMRestart() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore(); conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1); @@ -1580,7 +1580,7 @@ public synchronized void checkVersion() // This is to test Killing application should be able to wait until app // reaches killed state and also check that attempt state is saved before app // state is saved. - @Test + @Test (timeout = 60000) public void testClientRetryOnKillingApplication() throws Exception { MemoryRMStateStore memStore = new TestMemoryRMStateStore(); memStore.init(conf); @@ -1738,7 +1738,7 @@ private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted, appsCompleted + appsCompletedCarryOn); } - @Test + @Test (timeout = 60000) public void testDecomissionedNMsMetricsOnRMRestart() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, @@ -1875,6 +1875,13 @@ public TestSecurityMockRM(Configuration conf, RMStateStore store) { super(conf, store); } + @Override + public void init(Configuration conf) { + // reset localServiceAddress. + RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null); + super.init(conf); + } + @Override protected ClientRMService createClientRMService() { return new ClientRMService(getRMContext(), getResourceScheduler(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 203e71673f..31eaa71d7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -43,8 +43,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -82,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -179,7 +178,6 @@ public void setUp() throws Exception { dispatcher = new AsyncDispatcher(eventQueue); Renewer.reset(); delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter); - delegationTokenRenewer.init(conf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getDelegationTokenRenewer()).thenReturn( @@ -190,6 +188,7 @@ public void setUp() throws Exception { InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); delegationTokenRenewer.setRMContext(mockContext); + delegationTokenRenewer.init(conf); delegationTokenRenewer.start(); } @@ -515,7 +514,6 @@ public void testDTKeepAlive1 () throws Exception { 1000l); DelegationTokenRenewer localDtr = createNewDelegationTokenRenewer(lconf, counter); - localDtr.init(lconf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); @@ -526,6 +524,7 @@ public void testDTKeepAlive1 () throws Exception { InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); localDtr.setRMContext(mockContext); + localDtr.init(lconf); localDtr.start(); MyFS dfs = (MyFS)FileSystem.get(lconf); @@ -592,7 +591,6 @@ public void testDTKeepAlive2() throws Exception { 1000l); DelegationTokenRenewer localDtr = createNewDelegationTokenRenewer(conf, counter); - localDtr.init(lconf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); @@ -603,6 +601,7 @@ public void testDTKeepAlive2() throws Exception { InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); localDtr.setRMContext(mockContext); + localDtr.init(lconf); localDtr.start(); MyFS dfs = (MyFS)FileSystem.get(lconf); @@ -704,7 +703,6 @@ public Long answer(InvocationOnMock invocation) // fire up the renewer final DelegationTokenRenewer dtr = createNewDelegationTokenRenewer(conf, counter); - dtr.init(conf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); @@ -713,6 +711,7 @@ public Long answer(InvocationOnMock invocation) when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); dtr.setRMContext(mockContext); when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr); + dtr.init(conf); dtr.start(); // submit a job that blocks during renewal Thread submitThread = new Thread() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewerLifecycle.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewerLifecycle.java index d061d79818..637bf37b20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewerLifecycle.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewerLifecycle.java @@ -19,7 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.junit.Test; /** @@ -32,9 +37,13 @@ public class TestDelegationTokenRenewerLifecycle { @Test public void testStartupFailure() throws Exception { Configuration conf = new Configuration(); - DelegationTokenRenewer delegationTokenRenewer = new DelegationTokenRenewer(); + DelegationTokenRenewer delegationTokenRenewer = + new DelegationTokenRenewer(); + RMContext mockContext = mock(RMContext.class); + ClientRMService mockClientRMService = mock(ClientRMService.class); + when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + delegationTokenRenewer.setRMContext(mockContext); delegationTokenRenewer.init(conf); delegationTokenRenewer.stop(); } - }