From 4468378e4bf8c8035da3ebaed2c6359dc096fe53 Mon Sep 17 00:00:00 2001 From: Masatake Iwasaki Date: Fri, 12 Feb 2021 04:55:04 +0900 Subject: [PATCH] YARN-10500. TestDelegationTokenRenewer fails intermittently. (#2619) (cherry picked from commit f9a073c6c186848e09e2ee04118fd996ea8ace59) --- .../security/DelegationTokenRenewer.java | 3 +- .../security/TestDelegationTokenRenewer.java | 143 ++++++++++-------- 2 files changed, 83 insertions(+), 63 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index bc7073a11f..b4da08f52f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -26,7 +26,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -123,7 +122,7 @@ public class DelegationTokenRenewer extends AbstractService { private long tokenRenewerThreadRetryInterval; private int tokenRenewerThreadRetryMaxAttempts; private final Map> futures = - new HashMap<>(); + new ConcurrentHashMap<>(); private boolean delegationTokenRenewerPoolTrackerFlag = true; // this config is supposedly not used by end-users. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 09d9f39b5d..2856c271f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -196,6 +196,10 @@ public class TestDelegationTokenRenewer { private static Configuration conf; DelegationTokenRenewer delegationTokenRenewer; + private MockRM rm; + private MockRM rm1; + private MockRM rm2; + private DelegationTokenRenewer localDtr; @BeforeClass public static void setUpClass() throws Exception { @@ -243,13 +247,30 @@ public class TestDelegationTokenRenewer { } @After - public void tearDown() { + public void tearDown() throws Exception { try { dispatcher.close(); } catch (IOException e) { LOG.debug("Unable to close the dispatcher. " + e); } delegationTokenRenewer.stop(); + + if (rm != null) { + rm.close(); + rm = null; + } + if (rm1 != null) { + rm1.close(); + rm1 = null; + } + if (rm2 != null) { + rm2.close(); + rm2 = null; + } + if (localDtr != null) { + localDtr.close(); + localDtr = null; + } } private static class MyDelegationTokenSecretManager extends DelegationTokenSecretManager { @@ -371,9 +392,9 @@ public class TestDelegationTokenRenewer { return token1; } - private RMApp submitApp(MockRM rm, Credentials cred, ByteBuffer tokensConf) - throws Exception { - int maxAttempts = rm.getConfig().getInt( + private RMApp submitApp(MockRM mockrm, + Credentials cred, ByteBuffer tokensConf) throws Exception { + int maxAttempts = mockrm.getConfig().getInt( YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.create() @@ -397,7 +418,7 @@ public class TestDelegationTokenRenewer { .withApplicationTimeouts(null) .withTokensConf(tokensConf) .build(); - return MockRMAppSubmitter.submit(rm, data); + return MockRMAppSubmitter.submit(mockrm, data); } @@ -626,8 +647,7 @@ public class TestDelegationTokenRenewer { lconf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL, true); - DelegationTokenRenewer localDtr = - createNewDelegationTokenRenewer(lconf, counter); + localDtr = createNewDelegationTokenRenewer(lconf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( new ConcurrentHashMap()); @@ -702,8 +722,7 @@ public class TestDelegationTokenRenewer { lconf.setLong( YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS, 1000l); - DelegationTokenRenewer localDtr = - createNewDelegationTokenRenewer(lconf, counter); + localDtr = createNewDelegationTokenRenewer(lconf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( new ConcurrentHashMap()); @@ -783,8 +802,7 @@ public class TestDelegationTokenRenewer { lconf.setLong( YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS, 1000l); - DelegationTokenRenewer localDtr = - createNewDelegationTokenRenewer(conf, counter); + localDtr = createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( new ConcurrentHashMap()); @@ -889,8 +907,7 @@ public class TestDelegationTokenRenewer { doThrow(new IOException("boom")) .when(tokenx).renew(any(Configuration.class)); // fire up the renewer - final DelegationTokenRenewer dtr = - createNewDelegationTokenRenewer(conf, counter); + localDtr = createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( new ConcurrentHashMap()); @@ -900,13 +917,14 @@ public class TestDelegationTokenRenewer { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); - dtr.setRMContext(mockContext); - when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr); - dtr.init(conf); - dtr.start(); + localDtr.setRMContext(mockContext); + when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr); + localDtr.init(conf); + localDtr.start(); try { - dtr.addApplicationSync(mock(ApplicationId.class), credsx, false, "user"); + localDtr.addApplicationSync(mock(ApplicationId.class), + credsx, false, "user"); fail("Catch IOException on app submission"); } catch (IOException e){ Assert.assertTrue(e.getMessage().contains(tokenx.toString())); @@ -949,8 +967,8 @@ public class TestDelegationTokenRenewer { doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class)); // fire up the renewer - final DelegationTokenRenewer dtr = - createNewDelegationTokenRenewer(conf, counter); + localDtr = createNewDelegationTokenRenewer(conf, counter); + RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( new ConcurrentHashMap()); @@ -960,24 +978,24 @@ public class TestDelegationTokenRenewer { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); - dtr.setRMContext(mockContext); - when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr); - dtr.init(conf); - dtr.start(); + localDtr.setRMContext(mockContext); + when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr); + localDtr.init(conf); + localDtr.start(); // submit a job that blocks during renewal Thread submitThread = new Thread() { @Override public void run() { - dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user", - new Configuration()); + localDtr.addApplicationAsync(mock(ApplicationId.class), + creds1, false, "user", new Configuration()); } }; submitThread.start(); // wait till 1st submit blocks, then submit another startBarrier.await(); - dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user", - new Configuration()); + localDtr.addApplicationAsync(mock(ApplicationId.class), + creds2, false, "user", new Configuration()); // signal 1st to complete endBarrier.await(); submitThread.join(); @@ -990,7 +1008,7 @@ public class TestDelegationTokenRenewer { CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); - MockRM rm = new MockRM(conf) { + rm = new MockRM(conf) { @Override protected void doSecureLogin() throws IOException { // Skip the login. @@ -1046,7 +1064,7 @@ public class TestDelegationTokenRenewer { new Token(dtId2.getBytes(), "password2".getBytes(), dtId2.getKind(), new Text("service2")); - final MockRM rm = new TestSecurityMockRM(conf, null) { + rm = new TestSecurityMockRM(conf, null) { @Override protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer() { @@ -1149,7 +1167,7 @@ public class TestDelegationTokenRenewer { Credentials credentials = new Credentials(); credentials.addToken(userText1, originalToken); - MockRM rm1 = new TestSecurityMockRM(yarnConf); + rm1 = new TestSecurityMockRM(yarnConf); MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); RMApp app = MockRMAppSubmitter.submit(rm1, @@ -1173,7 +1191,7 @@ public class TestDelegationTokenRenewer { "password2".getBytes(), dtId2.getKind(), new Text("service2")); AtomicBoolean firstRenewInvoked = new AtomicBoolean(false); AtomicBoolean secondRenewInvoked = new AtomicBoolean(false); - MockRM rm2 = new TestSecurityMockRM(yarnConf, memStore) { + rm2 = new TestSecurityMockRM(yarnConf, memStore) { @Override protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer() { @@ -1183,8 +1201,8 @@ public class TestDelegationTokenRenewer { throws IOException { if (dttr.token.equals(updatedToken)) { - secondRenewInvoked.set(true); super.renewToken(dttr); + secondRenewInvoked.set(true); } else if (dttr.token.equals(originalToken)){ firstRenewInvoked.set(true); throw new InvalidToken("Failed to renew"); @@ -1210,6 +1228,9 @@ public class TestDelegationTokenRenewer { final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); nm1.registerNode(); + + GenericTestUtils.waitFor(() -> secondRenewInvoked.get(), 100, 10000); + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( @@ -1241,7 +1262,7 @@ public class TestDelegationTokenRenewer { final Token token2 = new Token(dtId2.getBytes(), "password2".getBytes(), dtId2.getKind(), new Text("service2")); - final MockRM rm = new TestSecurityMockRM(conf, null) { + rm = new TestSecurityMockRM(conf, null) { @Override protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer() { @@ -1293,7 +1314,7 @@ public class TestDelegationTokenRenewer { // submitted application. @Test (timeout = 30000) public void testAppSubmissionWithPreviousToken() throws Exception{ - MockRM rm = new TestSecurityMockRM(conf, null); + rm = new TestSecurityMockRM(conf, null); rm.start(); final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); @@ -1369,7 +1390,7 @@ public class TestDelegationTokenRenewer { // complete @Test (timeout = 30000) public void testCancelWithMultipleAppSubmissions() throws Exception{ - MockRM rm = new TestSecurityMockRM(conf, null); + rm = new TestSecurityMockRM(conf, null); rm.start(); final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); @@ -1484,10 +1505,10 @@ public class TestDelegationTokenRenewer { Assert.assertFalse(renewer.getDelegationTokens().contains(token1)); } - private void finishAMAndWaitForComplete(final RMApp app, MockRM rm, - MockNM nm, MockAM am, final DelegationTokenToRenew dttr) + private void finishAMAndWaitForComplete(final RMApp app, MockRM mockrm, + MockNM mocknm, MockAM mockam, final DelegationTokenToRenew dttr) throws Exception { - MockRM.finishAMAndVerifyAppState(app, rm, nm, am); + MockRM.finishAMAndVerifyAppState(app, mockrm, mocknm, mockam); GenericTestUtils.waitFor(new Supplier() { public Boolean get() { return !dttr.referringAppIds.contains(app.getApplicationId()); @@ -1503,7 +1524,7 @@ public class TestDelegationTokenRenewer { "kerberos"); UserGroupInformation.setConfiguration(conf); - MockRM rm = new TestSecurityMockRM(conf, null); + rm = new TestSecurityMockRM(conf, null); rm.start(); final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); @@ -1558,7 +1579,7 @@ public class TestDelegationTokenRenewer { UserGroupInformation.setConfiguration(conf); // limit 100 bytes conf.setInt(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE, 100); - MockRM rm = new TestSecurityMockRM(conf, null); + rm = new TestSecurityMockRM(conf, null); rm.start(); final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); @@ -1621,7 +1642,7 @@ public class TestDelegationTokenRenewer { */ @Test public void testShutDown() { - DelegationTokenRenewer dtr = createNewDelegationTokenRenewer(conf, counter); + localDtr = createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); when(mockContext.getSystemCredentialsForApps()).thenReturn( new ConcurrentHashMap()); @@ -1631,10 +1652,10 @@ public class TestDelegationTokenRenewer { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); - dtr.setRMContext(mockContext); - when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr); - dtr.init(conf); - dtr.start(); + localDtr.setRMContext(mockContext); + when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr); + localDtr.init(conf); + localDtr.start(); delegationTokenRenewer.stop(); delegationTokenRenewer.applicationFinished( BuilderUtils.newApplicationId(0, 1)); @@ -1656,7 +1677,7 @@ public class TestDelegationTokenRenewer { "password2".getBytes(), dtId1.getKind(), new Text("service2")); // fire up the renewer - final DelegationTokenRenewer dtr = new DelegationTokenRenewer() { + localDtr = new DelegationTokenRenewer() { @Override protected Token[] obtainSystemTokensForUser(String user, final Credentials credentials) throws IOException { @@ -1674,25 +1695,25 @@ public class TestDelegationTokenRenewer { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); - dtr.setRMContext(mockContext); - when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr); - dtr.init(conf); - dtr.start(); + localDtr.setRMContext(mockContext); + when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr); + localDtr.init(conf); + localDtr.start(); final ApplicationId appId1 = ApplicationId.newInstance(1234, 1); Collection appIds = new ArrayList(1); appIds.add(appId1); - dtr.addApplicationSync(appId1, credsx, false, "user1"); + localDtr.addApplicationSync(appId1, credsx, false, "user1"); // Ensure incrTokenSequenceNo has been called for new token request Mockito.verify(mockContext, Mockito.times(1)).incrTokenSequenceNo(); - DelegationTokenToRenew dttr = dtr.new DelegationTokenToRenew(appIds, + DelegationTokenToRenew dttr = localDtr.new DelegationTokenToRenew(appIds, expectedToken, conf, 1000, false, "user1"); - dtr.requestNewHdfsDelegationTokenIfNeeded(dttr); + localDtr.requestNewHdfsDelegationTokenIfNeeded(dttr); // Ensure incrTokenSequenceNo has been called for token renewal as well. Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo(); @@ -1710,16 +1731,17 @@ public class TestDelegationTokenRenewer { @Test(timeout = 30000) public void testTokenThreadTimeout() throws Exception { Configuration yarnConf = new YarnConfiguration(); + yarnConf.set("override_token_expire_time", "30000"); yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); yarnConf.setClass(YarnConfiguration.RM_STORE, MemoryRMStateStore.class, RMStateStore.class); - yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5, + yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 2, TimeUnit.SECONDS); yarnConf.setTimeDuration( - YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5, + YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 0, TimeUnit.SECONDS); yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, 3); @@ -1743,7 +1765,7 @@ public class TestDelegationTokenRenewer { DelegationTokenRenewer renewer = createNewDelegationTokenRenewerForTimeout( yarnConf, threadCounter, renewDelay); - MockRM rm = new TestSecurityMockRM(yarnConf) { + rm = new TestSecurityMockRM(yarnConf) { @Override protected DelegationTokenRenewer createDelegationTokenRenewer() { return renewer; @@ -1766,8 +1788,7 @@ public class TestDelegationTokenRenewer { YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS); - GenericTestUtils.waitFor(() -> threadCounter.get() >= attempts, 2000, - 30000); + GenericTestUtils.waitFor(() -> threadCounter.get() >= attempts, 100, 20000); // Ensure no. of threads has been used in renewer service thread pool is // higher than the configured max retry attempts @@ -1816,7 +1837,7 @@ public class TestDelegationTokenRenewer { DelegationTokenRenewer renwer = createNewDelegationTokenRenewerForTimeout( yarnConf, threadCounter, renewDelay); - MockRM rm = new TestSecurityMockRM(yarnConf) { + rm = new TestSecurityMockRM(yarnConf) { @Override protected DelegationTokenRenewer createDelegationTokenRenewer() { return renwer; @@ -1880,4 +1901,4 @@ public class TestDelegationTokenRenewer { renew.setDelegationTokenRenewerPoolTracker(true); return renew; } -} \ No newline at end of file +}