HDFS-4805. Webhdfs client is fragile to token renewal errors. Contributed by Daryn Sharp.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1482121 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
09593530fb
commit
d027b947d6
@ -55,13 +55,14 @@ public interface Renewable {
|
|||||||
* An action that will renew and replace the file system's delegation
|
* An action that will renew and replace the file system's delegation
|
||||||
* tokens automatically.
|
* tokens automatically.
|
||||||
*/
|
*/
|
||||||
private static class RenewAction<T extends FileSystem & Renewable>
|
public static class RenewAction<T extends FileSystem & Renewable>
|
||||||
implements Delayed {
|
implements Delayed {
|
||||||
/** when should the renew happen */
|
/** when should the renew happen */
|
||||||
private long renewalTime;
|
private long renewalTime;
|
||||||
/** a weak reference to the file system so that it can be garbage collected */
|
/** a weak reference to the file system so that it can be garbage collected */
|
||||||
private final WeakReference<T> weakFs;
|
private final WeakReference<T> weakFs;
|
||||||
private Token<?> token;
|
private Token<?> token;
|
||||||
|
boolean isValid = true;
|
||||||
|
|
||||||
private RenewAction(final T fs) {
|
private RenewAction(final T fs) {
|
||||||
this.weakFs = new WeakReference<T>(fs);
|
this.weakFs = new WeakReference<T>(fs);
|
||||||
@ -69,6 +70,10 @@ private RenewAction(final T fs) {
|
|||||||
updateRenewalTime(renewCycle);
|
updateRenewalTime(renewCycle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isValid() {
|
||||||
|
return isValid;
|
||||||
|
}
|
||||||
|
|
||||||
/** Get the delay until this event should happen. */
|
/** Get the delay until this event should happen. */
|
||||||
@Override
|
@Override
|
||||||
public long getDelay(final TimeUnit unit) {
|
public long getDelay(final TimeUnit unit) {
|
||||||
@ -132,6 +137,7 @@ private boolean renew() throws IOException, InterruptedException {
|
|||||||
updateRenewalTime(renewCycle);
|
updateRenewalTime(renewCycle);
|
||||||
fs.setDelegationToken(token);
|
fs.setDelegationToken(token);
|
||||||
} catch (IOException ie2) {
|
} catch (IOException ie2) {
|
||||||
|
isValid = false;
|
||||||
throw new IOException("Can't renew or get new delegation token ", ie);
|
throw new IOException("Can't renew or get new delegation token ", ie);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -160,7 +166,8 @@ public String toString() {
|
|||||||
private static final long RENEW_CYCLE = 24 * 60 * 60 * 1000;
|
private static final long RENEW_CYCLE = 24 * 60 * 60 * 1000;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
protected static long renewCycle = RENEW_CYCLE;
|
@VisibleForTesting
|
||||||
|
public static long renewCycle = RENEW_CYCLE;
|
||||||
|
|
||||||
/** Queue to maintain the RenewActions to be processed by the {@link #run()} */
|
/** Queue to maintain the RenewActions to be processed by the {@link #run()} */
|
||||||
private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>();
|
private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>();
|
||||||
@ -206,7 +213,7 @@ static synchronized void reset() {
|
|||||||
|
|
||||||
/** Add a renew action to the queue. */
|
/** Add a renew action to the queue. */
|
||||||
@SuppressWarnings("static-access")
|
@SuppressWarnings("static-access")
|
||||||
public <T extends FileSystem & Renewable> void addRenewAction(final T fs) {
|
public <T extends FileSystem & Renewable> RenewAction<T> addRenewAction(final T fs) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (!isAlive()) {
|
if (!isAlive()) {
|
||||||
start();
|
start();
|
||||||
@ -218,6 +225,7 @@ public <T extends FileSystem & Renewable> void addRenewAction(final T fs) {
|
|||||||
} else {
|
} else {
|
||||||
fs.LOG.error("does not have a token for renewal");
|
fs.LOG.error("does not have a token for renewal");
|
||||||
}
|
}
|
||||||
|
return action;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3043,6 +3043,9 @@ Release 0.23.8 - UNRELEASED
|
|||||||
HDFS-4699. TestPipelinesFailover#testPipelineRecoveryStress fails
|
HDFS-4699. TestPipelinesFailover#testPipelineRecoveryStress fails
|
||||||
sporadically (Chris Nauroth via kihwal)
|
sporadically (Chris Nauroth via kihwal)
|
||||||
|
|
||||||
|
HDFS-4805. Webhdfs client is fragile to token renewal errors
|
||||||
|
(daryn via kihwal)
|
||||||
|
|
||||||
Release 0.23.7 - UNRELEASED
|
Release 0.23.7 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -125,6 +125,8 @@ public class WebHdfsFileSystem extends FileSystem
|
|||||||
= new WebHdfsDelegationTokenSelector();
|
= new WebHdfsDelegationTokenSelector();
|
||||||
|
|
||||||
private DelegationTokenRenewer dtRenewer = null;
|
private DelegationTokenRenewer dtRenewer = null;
|
||||||
|
@VisibleForTesting
|
||||||
|
DelegationTokenRenewer.RenewAction<?> action;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
|
protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
|
||||||
@ -132,7 +134,7 @@ protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
|
|||||||
dtRenewer = DelegationTokenRenewer.getInstance();
|
dtRenewer = DelegationTokenRenewer.getInstance();
|
||||||
}
|
}
|
||||||
|
|
||||||
dtRenewer.addRenewAction(webhdfs);
|
action = dtRenewer.addRenewAction(webhdfs);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Is WebHDFS enabled in conf? */
|
/** Is WebHDFS enabled in conf? */
|
||||||
@ -200,7 +202,8 @@ protected void initDelegationToken() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized Token<?> getDelegationToken() throws IOException {
|
protected synchronized Token<?> getDelegationToken() throws IOException {
|
||||||
if (!hasInitedToken) {
|
// we haven't inited yet, or we used to have a token but it expired
|
||||||
|
if (!hasInitedToken || (action != null && !action.isValid())) {
|
||||||
//since we don't already have a token, go get one
|
//since we don't already have a token, go get one
|
||||||
Token<?> token = getDelegationToken(null);
|
Token<?> token = getDelegationToken(null);
|
||||||
// security might be disabled
|
// security might be disabled
|
||||||
|
@ -27,6 +27,8 @@
|
|||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.DelegationTokenRenewer;
|
||||||
|
import org.apache.hadoop.fs.DelegationTokenRenewer.RenewAction;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
|
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||||
@ -199,4 +201,50 @@ public void testDeleteOpRequireAuth() {
|
|||||||
assertFalse(op.getRequireAuth());
|
assertFalse(op.getRequireAuth());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetTokenAfterFailure() throws Exception {
|
||||||
|
Configuration conf = mock(Configuration.class);
|
||||||
|
Token<?> token1 = mock(Token.class);
|
||||||
|
Token<?> token2 = mock(Token.class);
|
||||||
|
long renewCycle = 1000;
|
||||||
|
|
||||||
|
DelegationTokenRenewer.renewCycle = renewCycle;
|
||||||
|
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
|
||||||
|
doReturn(conf).when(fs).getConf();
|
||||||
|
doReturn(token1).doReturn(token2).when(fs).getDelegationToken(null);
|
||||||
|
// cause token renewer to abandon the token
|
||||||
|
doThrow(new IOException("renew failed")).when(token1).renew(conf);
|
||||||
|
doThrow(new IOException("get failed")).when(fs).addDelegationTokens(null, null);
|
||||||
|
|
||||||
|
// trigger token acquisition
|
||||||
|
Token<?> token = fs.getDelegationToken();
|
||||||
|
RenewAction<?> action = fs.action;
|
||||||
|
assertSame(token1, token);
|
||||||
|
assertTrue(action.isValid());
|
||||||
|
|
||||||
|
// fetch again and make sure it's the same as before
|
||||||
|
token = fs.getDelegationToken();
|
||||||
|
assertSame(token1, token);
|
||||||
|
assertSame(action, fs.action);
|
||||||
|
assertTrue(fs.action.isValid());
|
||||||
|
|
||||||
|
// upon renewal, token will go bad based on above stubbing
|
||||||
|
Thread.sleep(renewCycle);
|
||||||
|
assertSame(action, fs.action);
|
||||||
|
assertFalse(fs.action.isValid());
|
||||||
|
|
||||||
|
// now that token is invalid, should get a new one
|
||||||
|
token = fs.getDelegationToken();
|
||||||
|
assertSame(token2, token);
|
||||||
|
assertNotSame(action, fs.action);
|
||||||
|
assertTrue(fs.action.isValid());
|
||||||
|
action = fs.action;
|
||||||
|
|
||||||
|
// should get same one again
|
||||||
|
token = fs.getDelegationToken();
|
||||||
|
assertSame(token2, token);
|
||||||
|
assertSame(action, fs.action);
|
||||||
|
assertTrue(fs.action.isValid());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user