HADOOP-9549. WebHdfsFileSystem hangs on close(). Contributed by Daryn Sharp.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1481075 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2013-05-10 16:22:54 +00:00
parent 146ccd7b06
commit a96780013e
3 changed files with 245 additions and 162 deletions

View File

@ -702,6 +702,8 @@ Release 2.0.5-beta - UNRELEASED
HADOOP-9550. Remove aspectj dependency. (kkambatl via tucu)
HADOOP-9549. WebHdfsFileSystem hangs on close(). (daryn via kihwal)
Release 2.0.4-alpha - 2013-04-25
INCOMPATIBLE CHANGES

View File

@ -61,10 +61,12 @@ private static class RenewAction<T extends FileSystem & Renewable>
private long renewalTime;
/** a weak reference to the file system so that it can be garbage collected */
private final WeakReference<T> weakFs;
private Token<?> token;
private RenewAction(final T fs) {
this.weakFs = new WeakReference<T>(fs);
updateRenewalTime();
this.token = fs.getRenewToken();
updateRenewalTime(renewCycle);
}
/** Get the delay until this event should happen. */
@ -83,28 +85,32 @@ public int compareTo(final Delayed delayed) {
@Override
public int hashCode() {
return (int)renewalTime ^ (int)(renewalTime >>> 32);
return token.hashCode();
}
@Override
public boolean equals(final Object that) {
if (that == null || !(that instanceof RenewAction)) {
if (this == that) {
return true;
} else if (that == null || !(that instanceof RenewAction)) {
return false;
}
return compareTo((Delayed)that) == 0;
return token.equals(((RenewAction<?>)that).token);
}
/**
* Set a new time for the renewal.
* It can only be called when the action is not in the queue.
* It can only be called when the action is not in the queue or any
* collection because the hashCode may change
* @param newTime the new time
*/
private void updateRenewalTime() {
renewalTime = renewCycle + Time.now();
private void updateRenewalTime(long delay) {
renewalTime = Time.now() + delay - delay/10;
}
/**
* Renew or replace the delegation token for this file system.
* It can only be called when the action is not in the queue.
* @return
* @throws IOException
*/
@ -114,14 +120,17 @@ private boolean renew() throws IOException, InterruptedException {
if (b) {
synchronized(fs) {
try {
fs.getRenewToken().renew(fs.getConf());
long expires = token.renew(fs.getConf());
updateRenewalTime(expires - Time.now());
} catch (IOException ie) {
try {
Token<?>[] tokens = fs.addDelegationTokens(null, null);
if (tokens.length == 0) {
throw new IOException("addDelegationTokens returned no tokens");
}
fs.setDelegationToken(tokens[0]);
token = tokens[0];
updateRenewalTime(renewCycle);
fs.setDelegationToken(token);
} catch (IOException ie2) {
throw new IOException("Can't renew or get new delegation token ", ie);
}
@ -131,20 +140,27 @@ private boolean renew() throws IOException, InterruptedException {
return b;
}
private void cancel() throws IOException, InterruptedException {
final T fs = weakFs.get();
if (fs != null) {
token.cancel(fs.getConf());
}
}
@Override
public String toString() {
Renewable fs = weakFs.get();
return fs == null? "evaporated token renew"
: "The token will be renewed in " + getDelay(TimeUnit.SECONDS)
+ " secs, renewToken=" + fs.getRenewToken();
+ " secs, renewToken=" + token;
}
}
/** Wait for 95% of a day between renewals */
private static final int RENEW_CYCLE = 24 * 60 * 60 * 950;
/** assumes renew cycle for a token is 24 hours... */
private static final long RENEW_CYCLE = 24 * 60 * 60 * 1000;
@InterfaceAudience.Private
protected static int renewCycle = RENEW_CYCLE;
protected static long renewCycle = RENEW_CYCLE;
/** Queue to maintain the RenewActions to be processed by the {@link #run()} */
private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>();
@ -173,11 +189,34 @@ public static synchronized DelegationTokenRenewer getInstance() {
return INSTANCE;
}
@VisibleForTesting
static synchronized void reset() {
if (INSTANCE != null) {
INSTANCE.queue.clear();
INSTANCE.interrupt();
try {
INSTANCE.join();
} catch (InterruptedException e) {
LOG.warn("Failed to reset renewer");
} finally {
INSTANCE = null;
}
}
}
/** Add a renew action to the queue. */
public synchronized <T extends FileSystem & Renewable> void addRenewAction(final T fs) {
queue.add(new RenewAction<T>(fs));
if (!isAlive()) {
start();
@SuppressWarnings("static-access")
public <T extends FileSystem & Renewable> void addRenewAction(final T fs) {
synchronized (this) {
if (!isAlive()) {
start();
}
}
RenewAction<T> action = new RenewAction<T>(fs);
if (action.token != null) {
queue.add(action);
} else {
fs.LOG.error("does not have a token for renewal");
}
}
@ -186,21 +225,18 @@ public synchronized <T extends FileSystem & Renewable> void addRenewAction(final
*
* @throws IOException
*/
public synchronized <T extends FileSystem & Renewable> void removeRenewAction(
public <T extends FileSystem & Renewable> void removeRenewAction(
final T fs) throws IOException {
for (RenewAction<?> action : queue) {
if (action.weakFs.get() == fs) {
try {
fs.getRenewToken().cancel(fs.getConf());
} catch (InterruptedException ie) {
LOG.error("Interrupted while canceling token for " + fs.getUri()
+ "filesystem");
if (LOG.isDebugEnabled()) {
LOG.debug(ie.getStackTrace());
}
RenewAction<T> action = new RenewAction<T>(fs);
if (queue.remove(action)) {
try {
action.cancel();
} catch (InterruptedException ie) {
LOG.error("Interrupted while canceling token for " + fs.getUri()
+ "filesystem");
if (LOG.isDebugEnabled()) {
LOG.debug(ie.getStackTrace());
}
queue.remove(action);
return;
}
}
}
@ -211,12 +247,9 @@ public void run() {
for(;;) {
RenewAction<?> action = null;
try {
synchronized (this) {
action = queue.take();
if (action.renew()) {
action.updateRenewalTime();
queue.add(action);
}
action = queue.take();
if (action.renew()) {
queue.add(action);
}
} catch (InterruptedException ie) {
return;

View File

@ -17,155 +17,203 @@
*/
package org.apache.hadoop.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestDelegationTokenRenewer {
private static final int RENEW_CYCLE = 1000;
private static final int MAX_RENEWALS = 100;
@SuppressWarnings("rawtypes")
static class TestToken extends Token {
public volatile int renewCount = 0;
public volatile boolean cancelled = false;
@Override
public long renew(Configuration conf) {
if (renewCount == MAX_RENEWALS) {
Thread.currentThread().interrupt();
} else {
renewCount++;
}
return renewCount;
}
@Override
public void cancel(Configuration conf) {
cancelled = true;
}
}
public abstract class RenewableFileSystem extends FileSystem
implements Renewable { }
private static final long RENEW_CYCLE = 1000;
static class TestFileSystem extends FileSystem implements
DelegationTokenRenewer.Renewable {
private Configuration mockConf = mock(Configuration.class);;
private TestToken testToken = new TestToken();
@Override
public Configuration getConf() {
return mockConf;
}
@Override
public Token<?> getRenewToken() {
return testToken;
}
@Override
public URI getUri() {
return null;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return null;
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return null;
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
return null;
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
return false;
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return false;
}
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
return null;
}
@Override
public void setWorkingDirectory(Path new_dir) {
}
@Override
public Path getWorkingDirectory() {
return null;
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return false;
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return null;
}
@Override
public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
return;
}
}
private DelegationTokenRenewer renewer;
Configuration conf;
FileSystem fs;
@Before
public void setup() {
DelegationTokenRenewer.renewCycle = RENEW_CYCLE;
DelegationTokenRenewer.reset();
renewer = DelegationTokenRenewer.getInstance();
}
@SuppressWarnings("unchecked")
@Test
public void testAddRemoveRenewAction() throws IOException,
InterruptedException {
TestFileSystem tfs = new TestFileSystem();
renewer.addRenewAction(tfs);
Text service = new Text("myservice");
Configuration conf = mock(Configuration.class);
Token<?> token = mock(Token.class);
doReturn(service).when(token).getService();
doAnswer(new Answer<Long>() {
public Long answer(InvocationOnMock invocation) {
return Time.now() + RENEW_CYCLE;
}
}).when(token).renew(any(Configuration.class));
RenewableFileSystem fs = mock(RenewableFileSystem.class);
doReturn(conf).when(fs).getConf();
doReturn(token).when(fs).getRenewToken();
renewer.addRenewAction(fs);
assertEquals("FileSystem not added to DelegationTokenRenewer", 1,
renewer.getRenewQueueLength());
Thread.sleep(RENEW_CYCLE*2);
verify(token, atLeast(2)).renew(eq(conf));
verify(token, atMost(3)).renew(eq(conf));
verify(token, never()).cancel(any(Configuration.class));
renewer.removeRenewAction(fs);
verify(token).cancel(eq(conf));
for (int i = 0; i < 60; i++) {
Thread.sleep(RENEW_CYCLE);
if (tfs.testToken.renewCount > 0) {
renewer.removeRenewAction(tfs);
break;
}
}
assertTrue("Token not renewed even after 1 minute",
(tfs.testToken.renewCount > 0));
verify(fs, never()).getDelegationToken(null);
verify(fs, never()).setDelegationToken(any(Token.class));
assertEquals("FileSystem not removed from DelegationTokenRenewer", 0,
renewer.getRenewQueueLength());
assertTrue("Token not cancelled", tfs.testToken.cancelled);
}
@Test
public void testAddRenewActionWithNoToken() throws IOException,
InterruptedException {
Configuration conf = mock(Configuration.class);
RenewableFileSystem fs = mock(RenewableFileSystem.class);
doReturn(conf).when(fs).getConf();
doReturn(null).when(fs).getRenewToken();
renewer.addRenewAction(fs);
verify(fs).getRenewToken();
assertEquals(0, renewer.getRenewQueueLength());
}
@Test
public void testGetNewTokenOnRenewFailure() throws IOException,
InterruptedException {
Text service = new Text("myservice");
Configuration conf = mock(Configuration.class);
final Token<?> token1 = mock(Token.class);
doReturn(service).when(token1).getService();
doThrow(new IOException("boom")).when(token1).renew(eq(conf));
final Token<?> token2 = mock(Token.class);
doReturn(service).when(token2).getService();
doAnswer(new Answer<Long>() {
public Long answer(InvocationOnMock invocation) {
return Time.now() + RENEW_CYCLE;
}
}).when(token2).renew(eq(conf));
RenewableFileSystem fs = mock(RenewableFileSystem.class);
doReturn(conf).when(fs).getConf();
doReturn(token1).doReturn(token2).when(fs).getRenewToken();
doReturn(token2).when(fs).getDelegationToken(null);
doAnswer(new Answer<Token<?>[]>() {
public Token<?>[] answer(InvocationOnMock invocation) {
return new Token<?>[]{token2};
}
}).when(fs).addDelegationTokens(null, null);
renewer.addRenewAction(fs);
assertEquals(1, renewer.getRenewQueueLength());
Thread.sleep(RENEW_CYCLE);
verify(fs).getRenewToken();
verify(token1, atLeast(1)).renew(eq(conf));
verify(token1, atMost(2)).renew(eq(conf));
verify(fs).addDelegationTokens(null, null);
verify(fs).setDelegationToken(eq(token2));
assertEquals(1, renewer.getRenewQueueLength());
renewer.removeRenewAction(fs);
verify(token2).cancel(eq(conf));
assertEquals(0, renewer.getRenewQueueLength());
}
@Test
public void testStopRenewalWhenFsGone() throws IOException,
InterruptedException {
Configuration conf = mock(Configuration.class);
Token<?> token = mock(Token.class);
doReturn(new Text("myservice")).when(token).getService();
doAnswer(new Answer<Long>() {
public Long answer(InvocationOnMock invocation) {
return Time.now() + RENEW_CYCLE;
}
}).when(token).renew(any(Configuration.class));
RenewableFileSystem fs = mock(RenewableFileSystem.class);
doReturn(conf).when(fs).getConf();
doReturn(token).when(fs).getRenewToken();
renewer.addRenewAction(fs);
assertEquals(1, renewer.getRenewQueueLength());
Thread.sleep(RENEW_CYCLE);
verify(token, atLeast(1)).renew(eq(conf));
verify(token, atMost(2)).renew(eq(conf));
// drop weak ref
fs = null;
System.gc(); System.gc(); System.gc();
// next renew should detect the fs as gone
Thread.sleep(RENEW_CYCLE);
verify(token, atLeast(1)).renew(eq(conf));
verify(token, atMost(2)).renew(eq(conf));
assertEquals(0, renewer.getRenewQueueLength());
}
@Test(timeout=4000)
public void testMultipleTokensDoNotDeadlock() throws IOException,
InterruptedException {
Configuration conf = mock(Configuration.class);
FileSystem fs = mock(FileSystem.class);
doReturn(conf).when(fs).getConf();
long distantFuture = Time.now() + 3600 * 1000; // 1h
Token<?> token1 = mock(Token.class);
doReturn(new Text("myservice1")).when(token1).getService();
doReturn(distantFuture).when(token1).renew(eq(conf));
Token<?> token2 = mock(Token.class);
doReturn(new Text("myservice2")).when(token2).getService();
doReturn(distantFuture).when(token2).renew(eq(conf));
RenewableFileSystem fs1 = mock(RenewableFileSystem.class);
doReturn(conf).when(fs1).getConf();
doReturn(token1).when(fs1).getRenewToken();
RenewableFileSystem fs2 = mock(RenewableFileSystem.class);
doReturn(conf).when(fs2).getConf();
doReturn(token2).when(fs2).getRenewToken();
renewer.addRenewAction(fs1);
renewer.addRenewAction(fs2);
assertEquals(2, renewer.getRenewQueueLength());
renewer.removeRenewAction(fs1);
assertEquals(1, renewer.getRenewQueueLength());
renewer.removeRenewAction(fs2);
assertEquals(0, renewer.getRenewQueueLength());
verify(token1).cancel(eq(conf));
verify(token2).cancel(eq(conf));
}
}