HADOOP-15593. Fixed NPE in UGI spawnAutoRenewalThreadForUserCreds.
Contributed by Gabor Bota
This commit is contained in:
parent
40fad32824
commit
77721f39e2
@ -40,6 +40,7 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.EnumMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
@ -851,83 +852,123 @@ void spawnAutoRenewalThreadForUserCreds(boolean force) {
|
||||
}
|
||||
|
||||
//spawn thread only if we have kerb credentials
|
||||
Thread t = new Thread(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
String cmd = conf.get("hadoop.kerberos.kinit.command", "kinit");
|
||||
KerberosTicket tgt = getTGT();
|
||||
if (tgt == null) {
|
||||
return;
|
||||
}
|
||||
long nextRefresh = getRefreshTime(tgt);
|
||||
RetryPolicy rp = null;
|
||||
while (true) {
|
||||
try {
|
||||
long now = Time.now();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Current time is " + now);
|
||||
LOG.debug("Next refresh is " + nextRefresh);
|
||||
}
|
||||
if (now < nextRefresh) {
|
||||
Thread.sleep(nextRefresh - now);
|
||||
}
|
||||
String output = Shell.execCommand(cmd, "-R");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Renewed ticket. kinit output: {}", output);
|
||||
}
|
||||
reloginFromTicketCache();
|
||||
tgt = getTGT();
|
||||
if (tgt == null) {
|
||||
LOG.warn("No TGT after renewal. Aborting renew thread for " +
|
||||
getUserName());
|
||||
return;
|
||||
}
|
||||
nextRefresh = Math.max(getRefreshTime(tgt),
|
||||
now + kerberosMinSecondsBeforeRelogin);
|
||||
metrics.renewalFailures.set(0);
|
||||
rp = null;
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Terminating renewal thread");
|
||||
return;
|
||||
} catch (IOException ie) {
|
||||
metrics.renewalFailuresTotal.incr();
|
||||
final long tgtEndTime = tgt.getEndTime().getTime();
|
||||
LOG.warn("Exception encountered while running the renewal "
|
||||
+ "command for {}. (TGT end time:{}, renewalFailures: {},"
|
||||
+ "renewalFailuresTotal: {})", getUserName(), tgtEndTime,
|
||||
metrics.renewalFailures, metrics.renewalFailuresTotal, ie);
|
||||
final long now = Time.now();
|
||||
if (rp == null) {
|
||||
// Use a dummy maxRetries to create the policy. The policy will
|
||||
// only be used to get next retry time with exponential back-off.
|
||||
// The final retry time will be later limited within the
|
||||
// tgt endTime in getNextTgtRenewalTime.
|
||||
rp = RetryPolicies.exponentialBackoffRetry(Long.SIZE - 2,
|
||||
kerberosMinSecondsBeforeRelogin, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
try {
|
||||
nextRefresh = getNextTgtRenewalTime(tgtEndTime, now, rp);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception when calculating next tgt renewal time", e);
|
||||
return;
|
||||
}
|
||||
metrics.renewalFailures.incr();
|
||||
// retry until close enough to tgt endTime.
|
||||
if (now > nextRefresh) {
|
||||
LOG.error("TGT is expired. Aborting renew thread for {}.",
|
||||
getUserName());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
KerberosTicket tgt = getTGT();
|
||||
if (tgt == null) {
|
||||
return;
|
||||
}
|
||||
String cmd = conf.get("hadoop.kerberos.kinit.command", "kinit");
|
||||
long nextRefresh = getRefreshTime(tgt);
|
||||
Thread t =
|
||||
new Thread(new AutoRenewalForUserCredsRunnable(tgt, cmd, nextRefresh));
|
||||
t.setDaemon(true);
|
||||
t.setName("TGT Renewer for " + getUserName());
|
||||
t.start();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
class AutoRenewalForUserCredsRunnable implements Runnable {
|
||||
private KerberosTicket tgt;
|
||||
private RetryPolicy rp;
|
||||
private String kinitCmd;
|
||||
private long nextRefresh;
|
||||
private boolean runRenewalLoop = true;
|
||||
|
||||
AutoRenewalForUserCredsRunnable(KerberosTicket tgt, String kinitCmd,
|
||||
long nextRefresh){
|
||||
this.tgt = tgt;
|
||||
this.kinitCmd = kinitCmd;
|
||||
this.nextRefresh = nextRefresh;
|
||||
this.rp = null;
|
||||
}
|
||||
|
||||
public void setRunRenewalLoop(boolean runRenewalLoop) {
|
||||
this.runRenewalLoop = runRenewalLoop;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
do {
|
||||
try {
|
||||
long now = Time.now();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Current time is " + now);
|
||||
LOG.debug("Next refresh is " + nextRefresh);
|
||||
}
|
||||
if (now < nextRefresh) {
|
||||
Thread.sleep(nextRefresh - now);
|
||||
}
|
||||
String output = Shell.execCommand(kinitCmd, "-R");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Renewed ticket. kinit output: {}", output);
|
||||
}
|
||||
reloginFromTicketCache();
|
||||
tgt = getTGT();
|
||||
if (tgt == null) {
|
||||
LOG.warn("No TGT after renewal. Aborting renew thread for " +
|
||||
getUserName());
|
||||
return;
|
||||
}
|
||||
nextRefresh = Math.max(getRefreshTime(tgt),
|
||||
now + kerberosMinSecondsBeforeRelogin);
|
||||
metrics.renewalFailures.set(0);
|
||||
rp = null;
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Terminating renewal thread");
|
||||
return;
|
||||
} catch (IOException ie) {
|
||||
metrics.renewalFailuresTotal.incr();
|
||||
final long now = Time.now();
|
||||
|
||||
if (tgt.isDestroyed()) {
|
||||
LOG.error("TGT is destroyed. Aborting renew thread for {}.",
|
||||
getUserName());
|
||||
return;
|
||||
}
|
||||
|
||||
long tgtEndTime;
|
||||
// As described in HADOOP-15593 we need to handle the case when
|
||||
// tgt.getEndTime() throws NPE because of JDK issue JDK-8147772
|
||||
// NPE is only possible if this issue is not fixed in the JDK
|
||||
// currently used
|
||||
try {
|
||||
tgtEndTime = tgt.getEndTime().getTime();
|
||||
} catch (NullPointerException npe) {
|
||||
LOG.error("NPE thrown while getting KerberosTicket endTime. "
|
||||
+ "Aborting renew thread for {}.", getUserName());
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.warn("Exception encountered while running the renewal "
|
||||
+ "command for {}. (TGT end time:{}, renewalFailures: {},"
|
||||
+ "renewalFailuresTotal: {})", getUserName(), tgtEndTime,
|
||||
metrics.renewalFailures.value(),
|
||||
metrics.renewalFailuresTotal.value(), ie);
|
||||
if (rp == null) {
|
||||
// Use a dummy maxRetries to create the policy. The policy will
|
||||
// only be used to get next retry time with exponential back-off.
|
||||
// The final retry time will be later limited within the
|
||||
// tgt endTime in getNextTgtRenewalTime.
|
||||
rp = RetryPolicies.exponentialBackoffRetry(Long.SIZE - 2,
|
||||
kerberosMinSecondsBeforeRelogin, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
try {
|
||||
nextRefresh = getNextTgtRenewalTime(tgtEndTime, now, rp);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception when calculating next tgt renewal time", e);
|
||||
return;
|
||||
}
|
||||
metrics.renewalFailures.incr();
|
||||
// retry until close enough to tgt endTime.
|
||||
if (now > nextRefresh) {
|
||||
LOG.error("TGT is expired. Aborting renew thread for {}.",
|
||||
getUserName());
|
||||
return;
|
||||
}
|
||||
}
|
||||
} while (runRenewalLoop);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get time for next login retry. This will allow the thread to retry with
|
||||
* exponential back-off, until tgt endtime.
|
||||
|
@ -47,6 +47,7 @@
|
||||
|
||||
import javax.security.auth.Subject;
|
||||
import javax.security.auth.kerberos.KerberosPrincipal;
|
||||
import javax.security.auth.kerberos.KerberosTicket;
|
||||
import javax.security.auth.kerberos.KeyTab;
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
import javax.security.auth.login.LoginContext;
|
||||
@ -61,6 +62,7 @@
|
||||
import java.util.Collection;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
@ -88,7 +90,10 @@
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestUserGroupInformation {
|
||||
@ -1211,4 +1216,37 @@ public UserGroupInformation run() throws Exception {
|
||||
barrier.await();
|
||||
assertSame(testUgi1.getSubject(), blockingLookup.get().getSubject());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKerberosTicketIsDestroyedChecked() throws Exception {
|
||||
// Create UserGroupInformation
|
||||
GenericTestUtils.setLogLevel(UserGroupInformation.LOG, Level.DEBUG);
|
||||
Set<User> users = new HashSet<>();
|
||||
users.add(new User("Foo"));
|
||||
Subject subject =
|
||||
new Subject(true, users, new HashSet<>(), new HashSet<>());
|
||||
UserGroupInformation ugi = spy(new UserGroupInformation(subject));
|
||||
|
||||
// throw IOException in the middle of the autoRenewalForUserCreds
|
||||
doThrow(new IOException()).when(ugi).reloginFromTicketCache();
|
||||
|
||||
// Create and destroy the KerberosTicket, so endTime will be null
|
||||
Date d = new Date();
|
||||
KerberosPrincipal kp = new KerberosPrincipal("Foo");
|
||||
KerberosTicket tgt = spy(new KerberosTicket(new byte[]{}, kp, kp, new
|
||||
byte[]{}, 0, null, d, d, d, d, null));
|
||||
tgt.destroy();
|
||||
|
||||
// run AutoRenewalForUserCredsRunnable with this
|
||||
UserGroupInformation.AutoRenewalForUserCredsRunnable userCredsRunnable =
|
||||
ugi.new AutoRenewalForUserCredsRunnable(tgt,
|
||||
Boolean.toString(Boolean.TRUE), 100);
|
||||
|
||||
// Set the runnable to not to run in a loop
|
||||
userCredsRunnable.setRunRenewalLoop(false);
|
||||
// there should be no exception when calling this
|
||||
userCredsRunnable.run();
|
||||
// isDestroyed should be called at least once
|
||||
Mockito.verify(tgt, atLeastOnce()).isDestroyed();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user