YARN-10500. TestDelegationTokenRenewer fails intermittently. (#2619)
This commit is contained in:
parent
78905d7e3f
commit
f9a073c6c1
@ -26,7 +26,6 @@ import java.util.Arrays;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -123,7 +122,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||||||
private long tokenRenewerThreadRetryInterval;
|
private long tokenRenewerThreadRetryInterval;
|
||||||
private int tokenRenewerThreadRetryMaxAttempts;
|
private int tokenRenewerThreadRetryMaxAttempts;
|
||||||
private final Map<DelegationTokenRenewerEvent, Future<?>> futures =
|
private final Map<DelegationTokenRenewerEvent, Future<?>> futures =
|
||||||
new HashMap<>();
|
new ConcurrentHashMap<>();
|
||||||
private boolean delegationTokenRenewerPoolTrackerFlag = true;
|
private boolean delegationTokenRenewerPoolTrackerFlag = true;
|
||||||
|
|
||||||
// this config is supposedly not used by end-users.
|
// this config is supposedly not used by end-users.
|
||||||
|
@ -196,6 +196,10 @@ public class TestDelegationTokenRenewer {
|
|||||||
|
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
DelegationTokenRenewer delegationTokenRenewer;
|
DelegationTokenRenewer delegationTokenRenewer;
|
||||||
|
private MockRM rm;
|
||||||
|
private MockRM rm1;
|
||||||
|
private MockRM rm2;
|
||||||
|
private DelegationTokenRenewer localDtr;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpClass() throws Exception {
|
public static void setUpClass() throws Exception {
|
||||||
@ -243,13 +247,30 @@ public class TestDelegationTokenRenewer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() {
|
public void tearDown() throws Exception {
|
||||||
try {
|
try {
|
||||||
dispatcher.close();
|
dispatcher.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.debug("Unable to close the dispatcher. " + e);
|
LOG.debug("Unable to close the dispatcher. " + e);
|
||||||
}
|
}
|
||||||
delegationTokenRenewer.stop();
|
delegationTokenRenewer.stop();
|
||||||
|
|
||||||
|
if (rm != null) {
|
||||||
|
rm.close();
|
||||||
|
rm = null;
|
||||||
|
}
|
||||||
|
if (rm1 != null) {
|
||||||
|
rm1.close();
|
||||||
|
rm1 = null;
|
||||||
|
}
|
||||||
|
if (rm2 != null) {
|
||||||
|
rm2.close();
|
||||||
|
rm2 = null;
|
||||||
|
}
|
||||||
|
if (localDtr != null) {
|
||||||
|
localDtr.close();
|
||||||
|
localDtr = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class MyDelegationTokenSecretManager extends DelegationTokenSecretManager {
|
private static class MyDelegationTokenSecretManager extends DelegationTokenSecretManager {
|
||||||
@ -371,9 +392,9 @@ public class TestDelegationTokenRenewer {
|
|||||||
return token1;
|
return token1;
|
||||||
}
|
}
|
||||||
|
|
||||||
private RMApp submitApp(MockRM rm, Credentials cred, ByteBuffer tokensConf)
|
private RMApp submitApp(MockRM mockrm,
|
||||||
throws Exception {
|
Credentials cred, ByteBuffer tokensConf) throws Exception {
|
||||||
int maxAttempts = rm.getConfig().getInt(
|
int maxAttempts = mockrm.getConfig().getInt(
|
||||||
YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||||
MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.create()
|
MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.create()
|
||||||
@ -397,7 +418,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
.withApplicationTimeouts(null)
|
.withApplicationTimeouts(null)
|
||||||
.withTokensConf(tokensConf)
|
.withTokensConf(tokensConf)
|
||||||
.build();
|
.build();
|
||||||
return MockRMAppSubmitter.submit(rm, data);
|
return MockRMAppSubmitter.submit(mockrm, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -626,8 +647,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
lconf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
|
lconf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
|
||||||
true);
|
true);
|
||||||
|
|
||||||
DelegationTokenRenewer localDtr =
|
localDtr = createNewDelegationTokenRenewer(lconf, counter);
|
||||||
createNewDelegationTokenRenewer(lconf, counter);
|
|
||||||
RMContext mockContext = mock(RMContext.class);
|
RMContext mockContext = mock(RMContext.class);
|
||||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||||
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
|
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
|
||||||
@ -702,8 +722,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
lconf.setLong(
|
lconf.setLong(
|
||||||
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
|
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
|
||||||
1000l);
|
1000l);
|
||||||
DelegationTokenRenewer localDtr =
|
localDtr = createNewDelegationTokenRenewer(lconf, counter);
|
||||||
createNewDelegationTokenRenewer(lconf, counter);
|
|
||||||
RMContext mockContext = mock(RMContext.class);
|
RMContext mockContext = mock(RMContext.class);
|
||||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||||
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
|
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
|
||||||
@ -783,8 +802,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
lconf.setLong(
|
lconf.setLong(
|
||||||
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
|
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
|
||||||
1000l);
|
1000l);
|
||||||
DelegationTokenRenewer localDtr =
|
localDtr = createNewDelegationTokenRenewer(conf, counter);
|
||||||
createNewDelegationTokenRenewer(conf, counter);
|
|
||||||
RMContext mockContext = mock(RMContext.class);
|
RMContext mockContext = mock(RMContext.class);
|
||||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||||
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
|
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
|
||||||
@ -889,8 +907,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
doThrow(new IOException("boom"))
|
doThrow(new IOException("boom"))
|
||||||
.when(tokenx).renew(any(Configuration.class));
|
.when(tokenx).renew(any(Configuration.class));
|
||||||
// fire up the renewer
|
// fire up the renewer
|
||||||
final DelegationTokenRenewer dtr =
|
localDtr = createNewDelegationTokenRenewer(conf, counter);
|
||||||
createNewDelegationTokenRenewer(conf, counter);
|
|
||||||
RMContext mockContext = mock(RMContext.class);
|
RMContext mockContext = mock(RMContext.class);
|
||||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||||
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
|
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
|
||||||
@ -900,13 +917,14 @@ public class TestDelegationTokenRenewer {
|
|||||||
InetSocketAddress sockAddr =
|
InetSocketAddress sockAddr =
|
||||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
dtr.setRMContext(mockContext);
|
localDtr.setRMContext(mockContext);
|
||||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
|
when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr);
|
||||||
dtr.init(conf);
|
localDtr.init(conf);
|
||||||
dtr.start();
|
localDtr.start();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
dtr.addApplicationSync(mock(ApplicationId.class), credsx, false, "user");
|
localDtr.addApplicationSync(mock(ApplicationId.class),
|
||||||
|
credsx, false, "user");
|
||||||
fail("Catch IOException on app submission");
|
fail("Catch IOException on app submission");
|
||||||
} catch (IOException e){
|
} catch (IOException e){
|
||||||
Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
|
Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
|
||||||
@ -949,8 +967,8 @@ public class TestDelegationTokenRenewer {
|
|||||||
doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
|
doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
|
||||||
|
|
||||||
// fire up the renewer
|
// fire up the renewer
|
||||||
final DelegationTokenRenewer dtr =
|
localDtr = createNewDelegationTokenRenewer(conf, counter);
|
||||||
createNewDelegationTokenRenewer(conf, counter);
|
|
||||||
RMContext mockContext = mock(RMContext.class);
|
RMContext mockContext = mock(RMContext.class);
|
||||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||||
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
|
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
|
||||||
@ -960,24 +978,24 @@ public class TestDelegationTokenRenewer {
|
|||||||
InetSocketAddress sockAddr =
|
InetSocketAddress sockAddr =
|
||||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
dtr.setRMContext(mockContext);
|
localDtr.setRMContext(mockContext);
|
||||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
|
when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr);
|
||||||
dtr.init(conf);
|
localDtr.init(conf);
|
||||||
dtr.start();
|
localDtr.start();
|
||||||
// submit a job that blocks during renewal
|
// submit a job that blocks during renewal
|
||||||
Thread submitThread = new Thread() {
|
Thread submitThread = new Thread() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user",
|
localDtr.addApplicationAsync(mock(ApplicationId.class),
|
||||||
new Configuration());
|
creds1, false, "user", new Configuration());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
submitThread.start();
|
submitThread.start();
|
||||||
|
|
||||||
// wait till 1st submit blocks, then submit another
|
// wait till 1st submit blocks, then submit another
|
||||||
startBarrier.await();
|
startBarrier.await();
|
||||||
dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user",
|
localDtr.addApplicationAsync(mock(ApplicationId.class),
|
||||||
new Configuration());
|
creds2, false, "user", new Configuration());
|
||||||
// signal 1st to complete
|
// signal 1st to complete
|
||||||
endBarrier.await();
|
endBarrier.await();
|
||||||
submitThread.join();
|
submitThread.join();
|
||||||
@ -990,7 +1008,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
"kerberos");
|
"kerberos");
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
MockRM rm = new MockRM(conf) {
|
rm = new MockRM(conf) {
|
||||||
@Override
|
@Override
|
||||||
protected void doSecureLogin() throws IOException {
|
protected void doSecureLogin() throws IOException {
|
||||||
// Skip the login.
|
// Skip the login.
|
||||||
@ -1046,7 +1064,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
|
new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
|
||||||
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
||||||
|
|
||||||
final MockRM rm = new TestSecurityMockRM(conf, null) {
|
rm = new TestSecurityMockRM(conf, null) {
|
||||||
@Override
|
@Override
|
||||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||||
return new DelegationTokenRenewer() {
|
return new DelegationTokenRenewer() {
|
||||||
@ -1149,7 +1167,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
Credentials credentials = new Credentials();
|
Credentials credentials = new Credentials();
|
||||||
credentials.addToken(userText1, originalToken);
|
credentials.addToken(userText1, originalToken);
|
||||||
|
|
||||||
MockRM rm1 = new TestSecurityMockRM(yarnConf);
|
rm1 = new TestSecurityMockRM(yarnConf);
|
||||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||||
rm1.start();
|
rm1.start();
|
||||||
RMApp app = MockRMAppSubmitter.submit(rm1,
|
RMApp app = MockRMAppSubmitter.submit(rm1,
|
||||||
@ -1173,7 +1191,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
||||||
AtomicBoolean firstRenewInvoked = new AtomicBoolean(false);
|
AtomicBoolean firstRenewInvoked = new AtomicBoolean(false);
|
||||||
AtomicBoolean secondRenewInvoked = new AtomicBoolean(false);
|
AtomicBoolean secondRenewInvoked = new AtomicBoolean(false);
|
||||||
MockRM rm2 = new TestSecurityMockRM(yarnConf, memStore) {
|
rm2 = new TestSecurityMockRM(yarnConf, memStore) {
|
||||||
@Override
|
@Override
|
||||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||||
return new DelegationTokenRenewer() {
|
return new DelegationTokenRenewer() {
|
||||||
@ -1183,8 +1201,8 @@ public class TestDelegationTokenRenewer {
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
if (dttr.token.equals(updatedToken)) {
|
if (dttr.token.equals(updatedToken)) {
|
||||||
secondRenewInvoked.set(true);
|
|
||||||
super.renewToken(dttr);
|
super.renewToken(dttr);
|
||||||
|
secondRenewInvoked.set(true);
|
||||||
} else if (dttr.token.equals(originalToken)){
|
} else if (dttr.token.equals(originalToken)){
|
||||||
firstRenewInvoked.set(true);
|
firstRenewInvoked.set(true);
|
||||||
throw new InvalidToken("Failed to renew");
|
throw new InvalidToken("Failed to renew");
|
||||||
@ -1210,6 +1228,9 @@ public class TestDelegationTokenRenewer {
|
|||||||
final MockNM nm1 =
|
final MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(() -> secondRenewInvoked.get(), 100, 10000);
|
||||||
|
|
||||||
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
|
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
|
||||||
|
|
||||||
NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl(
|
NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl(
|
||||||
@ -1241,7 +1262,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
final Token<DelegationTokenIdentifier> token2 =
|
final Token<DelegationTokenIdentifier> token2 =
|
||||||
new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
|
new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
|
||||||
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
||||||
final MockRM rm = new TestSecurityMockRM(conf, null) {
|
rm = new TestSecurityMockRM(conf, null) {
|
||||||
@Override
|
@Override
|
||||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||||
return new DelegationTokenRenewer() {
|
return new DelegationTokenRenewer() {
|
||||||
@ -1293,7 +1314,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
// submitted application.
|
// submitted application.
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
public void testAppSubmissionWithPreviousToken() throws Exception{
|
public void testAppSubmissionWithPreviousToken() throws Exception{
|
||||||
MockRM rm = new TestSecurityMockRM(conf, null);
|
rm = new TestSecurityMockRM(conf, null);
|
||||||
rm.start();
|
rm.start();
|
||||||
final MockNM nm1 =
|
final MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||||
@ -1369,7 +1390,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
// complete
|
// complete
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
public void testCancelWithMultipleAppSubmissions() throws Exception{
|
public void testCancelWithMultipleAppSubmissions() throws Exception{
|
||||||
MockRM rm = new TestSecurityMockRM(conf, null);
|
rm = new TestSecurityMockRM(conf, null);
|
||||||
rm.start();
|
rm.start();
|
||||||
final MockNM nm1 =
|
final MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||||
@ -1484,10 +1505,10 @@ public class TestDelegationTokenRenewer {
|
|||||||
Assert.assertFalse(renewer.getDelegationTokens().contains(token1));
|
Assert.assertFalse(renewer.getDelegationTokens().contains(token1));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void finishAMAndWaitForComplete(final RMApp app, MockRM rm,
|
private void finishAMAndWaitForComplete(final RMApp app, MockRM mockrm,
|
||||||
MockNM nm, MockAM am, final DelegationTokenToRenew dttr)
|
MockNM mocknm, MockAM mockam, final DelegationTokenToRenew dttr)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
MockRM.finishAMAndVerifyAppState(app, rm, nm, am);
|
MockRM.finishAMAndVerifyAppState(app, mockrm, mocknm, mockam);
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
public Boolean get() {
|
public Boolean get() {
|
||||||
return !dttr.referringAppIds.contains(app.getApplicationId());
|
return !dttr.referringAppIds.contains(app.getApplicationId());
|
||||||
@ -1503,7 +1524,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
"kerberos");
|
"kerberos");
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
|
||||||
MockRM rm = new TestSecurityMockRM(conf, null);
|
rm = new TestSecurityMockRM(conf, null);
|
||||||
rm.start();
|
rm.start();
|
||||||
final MockNM nm1 =
|
final MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||||
@ -1558,7 +1579,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
// limit 100 bytes
|
// limit 100 bytes
|
||||||
conf.setInt(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE, 100);
|
conf.setInt(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE, 100);
|
||||||
MockRM rm = new TestSecurityMockRM(conf, null);
|
rm = new TestSecurityMockRM(conf, null);
|
||||||
rm.start();
|
rm.start();
|
||||||
final MockNM nm1 =
|
final MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||||
@ -1621,7 +1642,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testShutDown() {
|
public void testShutDown() {
|
||||||
DelegationTokenRenewer dtr = createNewDelegationTokenRenewer(conf, counter);
|
localDtr = createNewDelegationTokenRenewer(conf, counter);
|
||||||
RMContext mockContext = mock(RMContext.class);
|
RMContext mockContext = mock(RMContext.class);
|
||||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||||
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
|
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
|
||||||
@ -1631,10 +1652,10 @@ public class TestDelegationTokenRenewer {
|
|||||||
InetSocketAddress sockAddr =
|
InetSocketAddress sockAddr =
|
||||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
dtr.setRMContext(mockContext);
|
localDtr.setRMContext(mockContext);
|
||||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
|
when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr);
|
||||||
dtr.init(conf);
|
localDtr.init(conf);
|
||||||
dtr.start();
|
localDtr.start();
|
||||||
delegationTokenRenewer.stop();
|
delegationTokenRenewer.stop();
|
||||||
delegationTokenRenewer.applicationFinished(
|
delegationTokenRenewer.applicationFinished(
|
||||||
BuilderUtils.newApplicationId(0, 1));
|
BuilderUtils.newApplicationId(0, 1));
|
||||||
@ -1656,7 +1677,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
"password2".getBytes(), dtId1.getKind(), new Text("service2"));
|
"password2".getBytes(), dtId1.getKind(), new Text("service2"));
|
||||||
|
|
||||||
// fire up the renewer
|
// fire up the renewer
|
||||||
final DelegationTokenRenewer dtr = new DelegationTokenRenewer() {
|
localDtr = new DelegationTokenRenewer() {
|
||||||
@Override
|
@Override
|
||||||
protected Token<?>[] obtainSystemTokensForUser(String user,
|
protected Token<?>[] obtainSystemTokensForUser(String user,
|
||||||
final Credentials credentials) throws IOException {
|
final Credentials credentials) throws IOException {
|
||||||
@ -1674,25 +1695,25 @@ public class TestDelegationTokenRenewer {
|
|||||||
InetSocketAddress sockAddr =
|
InetSocketAddress sockAddr =
|
||||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
dtr.setRMContext(mockContext);
|
localDtr.setRMContext(mockContext);
|
||||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
|
when(mockContext.getDelegationTokenRenewer()).thenReturn(localDtr);
|
||||||
dtr.init(conf);
|
localDtr.init(conf);
|
||||||
dtr.start();
|
localDtr.start();
|
||||||
|
|
||||||
final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
|
final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
|
||||||
|
|
||||||
Collection<ApplicationId> appIds = new ArrayList<ApplicationId>(1);
|
Collection<ApplicationId> appIds = new ArrayList<ApplicationId>(1);
|
||||||
appIds.add(appId1);
|
appIds.add(appId1);
|
||||||
|
|
||||||
dtr.addApplicationSync(appId1, credsx, false, "user1");
|
localDtr.addApplicationSync(appId1, credsx, false, "user1");
|
||||||
|
|
||||||
// Ensure incrTokenSequenceNo has been called for new token request
|
// Ensure incrTokenSequenceNo has been called for new token request
|
||||||
Mockito.verify(mockContext, Mockito.times(1)).incrTokenSequenceNo();
|
Mockito.verify(mockContext, Mockito.times(1)).incrTokenSequenceNo();
|
||||||
|
|
||||||
DelegationTokenToRenew dttr = dtr.new DelegationTokenToRenew(appIds,
|
DelegationTokenToRenew dttr = localDtr.new DelegationTokenToRenew(appIds,
|
||||||
expectedToken, conf, 1000, false, "user1");
|
expectedToken, conf, 1000, false, "user1");
|
||||||
|
|
||||||
dtr.requestNewHdfsDelegationTokenIfNeeded(dttr);
|
localDtr.requestNewHdfsDelegationTokenIfNeeded(dttr);
|
||||||
|
|
||||||
// Ensure incrTokenSequenceNo has been called for token renewal as well.
|
// Ensure incrTokenSequenceNo has been called for token renewal as well.
|
||||||
Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo();
|
Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo();
|
||||||
@ -1710,16 +1731,17 @@ public class TestDelegationTokenRenewer {
|
|||||||
@Test(timeout = 30000)
|
@Test(timeout = 30000)
|
||||||
public void testTokenThreadTimeout() throws Exception {
|
public void testTokenThreadTimeout() throws Exception {
|
||||||
Configuration yarnConf = new YarnConfiguration();
|
Configuration yarnConf = new YarnConfiguration();
|
||||||
|
yarnConf.set("override_token_expire_time", "30000");
|
||||||
yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
|
yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
|
||||||
true);
|
true);
|
||||||
yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
"kerberos");
|
"kerberos");
|
||||||
yarnConf.setClass(YarnConfiguration.RM_STORE, MemoryRMStateStore.class,
|
yarnConf.setClass(YarnConfiguration.RM_STORE, MemoryRMStateStore.class,
|
||||||
RMStateStore.class);
|
RMStateStore.class);
|
||||||
yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5,
|
yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 2,
|
||||||
TimeUnit.SECONDS);
|
TimeUnit.SECONDS);
|
||||||
yarnConf.setTimeDuration(
|
yarnConf.setTimeDuration(
|
||||||
YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5,
|
YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 0,
|
||||||
TimeUnit.SECONDS);
|
TimeUnit.SECONDS);
|
||||||
yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
|
yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
|
||||||
3);
|
3);
|
||||||
@ -1743,7 +1765,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
DelegationTokenRenewer renewer = createNewDelegationTokenRenewerForTimeout(
|
DelegationTokenRenewer renewer = createNewDelegationTokenRenewerForTimeout(
|
||||||
yarnConf, threadCounter, renewDelay);
|
yarnConf, threadCounter, renewDelay);
|
||||||
|
|
||||||
MockRM rm = new TestSecurityMockRM(yarnConf) {
|
rm = new TestSecurityMockRM(yarnConf) {
|
||||||
@Override
|
@Override
|
||||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||||
return renewer;
|
return renewer;
|
||||||
@ -1766,8 +1788,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
|
YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
|
||||||
YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
|
YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
|
||||||
|
|
||||||
GenericTestUtils.waitFor(() -> threadCounter.get() >= attempts, 2000,
|
GenericTestUtils.waitFor(() -> threadCounter.get() >= attempts, 100, 20000);
|
||||||
30000);
|
|
||||||
|
|
||||||
// Ensure no. of threads has been used in renewer service thread pool is
|
// Ensure no. of threads has been used in renewer service thread pool is
|
||||||
// higher than the configured max retry attempts
|
// higher than the configured max retry attempts
|
||||||
@ -1816,7 +1837,7 @@ public class TestDelegationTokenRenewer {
|
|||||||
DelegationTokenRenewer renwer = createNewDelegationTokenRenewerForTimeout(
|
DelegationTokenRenewer renwer = createNewDelegationTokenRenewerForTimeout(
|
||||||
yarnConf, threadCounter, renewDelay);
|
yarnConf, threadCounter, renewDelay);
|
||||||
|
|
||||||
MockRM rm = new TestSecurityMockRM(yarnConf) {
|
rm = new TestSecurityMockRM(yarnConf) {
|
||||||
@Override
|
@Override
|
||||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||||
return renwer;
|
return renwer;
|
||||||
@ -1880,4 +1901,4 @@ public class TestDelegationTokenRenewer {
|
|||||||
renew.setDelegationTokenRenewerPoolTracker(true);
|
renew.setDelegationTokenRenewerPoolTracker(true);
|
||||||
return renew;
|
return renew;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user