From d6af50719961be7052c9f363110ebad26e5937f9 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 21 Nov 2012 12:29:37 +0000 Subject: [PATCH] HADOOP-9049. DelegationTokenRenewer needs to be Singleton and FileSystems should register/deregister to/from. Contributed by Karthik Kambatla. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1412077 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../hadoop/fs/DelegationTokenRenewer.java | 63 +++++-- .../hadoop/fs/TestDelegationTokenRenewer.java | 159 ++++++++++++++++++ .../apache/hadoop/hdfs/HftpFileSystem.java | 24 ++- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 19 ++- 5 files changed, 243 insertions(+), 25 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 023d87d6bd..2a6db063bd 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -444,6 +444,9 @@ Release 2.0.3-alpha - Unreleased HADOOP-6607. Add different variants of non caching HTTP headers. (tucu) + HADOOP-9049. DelegationTokenRenewer needs to be Singleton and FileSystems + should register/deregister to/from. (Karthik Kambatla via tomwhite) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java index fd2d07d1a6..1224600c9b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java @@ -33,7 +33,7 @@ * A daemon thread that waits for the next file system to renew. */ @InterfaceAudience.Private -public class DelegationTokenRenewer +public class DelegationTokenRenewer extends Thread { /** The renewable interface used by the renewer. */ public interface Renewable { @@ -93,7 +93,7 @@ public boolean equals(final Object that) { * @param newTime the new time */ private void updateRenewalTime() { - renewalTime = RENEW_CYCLE + Time.now(); + renewalTime = renewCycle + Time.now(); } /** @@ -134,34 +134,69 @@ public String toString() { } /** Wait for 95% of a day between renewals */ - private static final int RENEW_CYCLE = 24 * 60 * 60 * 950; + private static final int RENEW_CYCLE = 24 * 60 * 60 * 950; - private DelayQueue> queue = new DelayQueue>(); + @InterfaceAudience.Private + protected static int renewCycle = RENEW_CYCLE; - public DelegationTokenRenewer(final Class clazz) { + /** Queue to maintain the RenewActions to be processed by the {@link #run()} */ + private volatile DelayQueue> queue = new DelayQueue>(); + + /** + * Create the singleton instance. However, the thread can be started lazily in + * {@link #addRenewAction(FileSystem)} + */ + private static DelegationTokenRenewer INSTANCE = null; + + private DelegationTokenRenewer(final Class clazz) { super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName()); setDaemon(true); } - /** Add a renew action to the queue. */ - public void addRenewAction(final T fs) { - queue.add(new RenewAction(fs)); + public static synchronized DelegationTokenRenewer getInstance() { + if (INSTANCE == null) { + INSTANCE = new DelegationTokenRenewer(FileSystem.class); + } + return INSTANCE; } + /** Add a renew action to the queue. */ + public synchronized void addRenewAction(final T fs) { + queue.add(new RenewAction(fs)); + if (!isAlive()) { + start(); + } + } + + /** Remove the associated renew action from the queue */ + public synchronized void removeRenewAction( + final T fs) { + for (RenewAction action : queue) { + if (action.weakFs.get() == fs) { + queue.remove(action); + return; + } + } + } + + @SuppressWarnings("static-access") @Override public void run() { for(;;) { - RenewAction action = null; + RenewAction action = null; try { - action = queue.take(); - if (action.renew()) { - action.updateRenewalTime(); - queue.add(action); + synchronized (this) { + action = queue.take(); + if (action.renew()) { + action.updateRenewalTime(); + queue.add(action); + } } } catch (InterruptedException ie) { return; } catch (Exception ie) { - T.LOG.warn("Failed to renew token, action=" + action, ie); + action.weakFs.get().LOG.warn("Failed to renew token, action=" + action, + ie); } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java new file mode 100644 index 0000000000..789641dc32 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java @@ -0,0 +1,159 @@ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.Progressable; + +import org.junit.Before; +import org.junit.Test; + +public class TestDelegationTokenRenewer { + private static final int RENEW_CYCLE = 1000; + private static final int MAX_RENEWALS = 100; + + @SuppressWarnings("rawtypes") + static class TestToken extends Token { + public volatile int renewCount = 0; + + @Override + public long renew(Configuration conf) { + if (renewCount == MAX_RENEWALS) { + Thread.currentThread().interrupt(); + } else { + renewCount++; + } + return renewCount; + } + } + + static class TestFileSystem extends FileSystem implements + DelegationTokenRenewer.Renewable { + private Configuration mockConf = mock(Configuration.class);; + private TestToken testToken = new TestToken(); + + @Override + public Configuration getConf() { + return mockConf; + } + + @Override + public Token getRenewToken() { + return testToken; + } + + @Override + public URI getUri() { + return null; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return null; + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + return null; + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) throws IOException { + return null; + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return false; + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + return false; + } + + @Override + public FileStatus[] listStatus(Path f) throws FileNotFoundException, + IOException { + return null; + } + + @Override + public void setWorkingDirectory(Path new_dir) { + } + + @Override + public Path getWorkingDirectory() { + return null; + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + return false; + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return null; + } + + @Override + public void setDelegationToken(Token token) { + return; + } + } + + private DelegationTokenRenewer renewer; + + @Before + public void setup() { + DelegationTokenRenewer.renewCycle = RENEW_CYCLE; + renewer = DelegationTokenRenewer.getInstance(); + } + + @Test + public void testAddRenewAction() throws IOException, InterruptedException { + TestFileSystem tfs = new TestFileSystem(); + renewer.addRenewAction(tfs); + + for (int i = 0; i < 10; i++) { + Thread.sleep(RENEW_CYCLE); + if (tfs.testToken.renewCount > 0) { + return; + } + } + + assertTrue("Token not renewed even after 10 seconds", + (tfs.testToken.renewCount > 0)); + } + + @Test + public void testRemoveRenewAction() throws IOException, InterruptedException { + TestFileSystem tfs = new TestFileSystem(); + renewer.addRenewAction(tfs); + + for (int i = 0; i < 10; i++) { + Thread.sleep(RENEW_CYCLE); + if (tfs.testToken.renewCount > 0) { + renewer.removeRenewAction(tfs); + break; + } + } + + assertTrue("Token not renewed even once", + (tfs.testToken.renewCount > 0)); + assertTrue("Token not removed", + (tfs.testToken.renewCount < MAX_RENEWALS)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java index 48d0f5c6a6..d97c62012e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java @@ -82,12 +82,8 @@ @InterfaceStability.Evolving public class HftpFileSystem extends FileSystem implements DelegationTokenRenewer.Renewable { - private static final DelegationTokenRenewer dtRenewer - = new DelegationTokenRenewer(HftpFileSystem.class); - static { HttpURLConnection.setFollowRedirects(true); - dtRenewer.start(); } public static final Text TOKEN_KIND = new Text("HFTP delegation"); @@ -106,6 +102,16 @@ public class HftpFileSystem extends FileSystem private static final HftpDelegationTokenSelector hftpTokenSelector = new HftpDelegationTokenSelector(); + private DelegationTokenRenewer dtRenewer = null; + + private synchronized void addRenewAction(final HftpFileSystem hftpFs) { + if (dtRenewer == null) { + dtRenewer = DelegationTokenRenewer.getInstance(); + } + + dtRenewer.addRenewAction(hftpFs); + } + public static final SimpleDateFormat getDateFormat() { final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT); df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE)); @@ -202,7 +208,7 @@ protected void initDelegationToken() throws IOException { if (token != null) { setDelegationToken(token); if (createdToken) { - dtRenewer.addRenewAction(this); + addRenewAction(this); LOG.debug("Created new DT for " + token.getService()); } else { LOG.debug("Found existing DT for " + token.getService()); @@ -395,6 +401,14 @@ public FSDataInputStream open(Path f, int buffersize) throws IOException { return new FSDataInputStream(new RangeHeaderInputStream(u)); } + @Override + public void close() throws IOException { + super.close(); + if (dtRenewer != null) { + dtRenewer.removeRenewAction(this); // blocks + } + } + /** Class to parse and store a listing reply from the server. */ class LsParser extends DefaultHandler { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 1dc6af3733..7304b1df96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -124,15 +124,14 @@ public class WebHdfsFileSystem extends FileSystem public static final WebHdfsDelegationTokenSelector DT_SELECTOR = new WebHdfsDelegationTokenSelector(); - private static DelegationTokenRenewer DT_RENEWER = null; + private DelegationTokenRenewer dtRenewer = null; - private static synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) { - if (DT_RENEWER == null) { - DT_RENEWER = new DelegationTokenRenewer(WebHdfsFileSystem.class); - DT_RENEWER.start(); + private synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) { + if (dtRenewer == null) { + dtRenewer = DelegationTokenRenewer.getInstance(); } - DT_RENEWER.addRenewAction(webhdfs); + dtRenewer.addRenewAction(webhdfs); } /** Is WebHDFS enabled in conf? */ @@ -766,6 +765,14 @@ public FSDataInputStream open(final Path f, final int buffersize new OffsetUrlOpener(url), new OffsetUrlOpener(null))); } + @Override + public void close() throws IOException { + super.close(); + if (dtRenewer != null) { + dtRenewer.removeRenewAction(this); // blocks + } + } + class OffsetUrlOpener extends ByteRangeInputStream.URLOpener { OffsetUrlOpener(final URL url) { super(url);