HADOOP-13263. Reload cached groups in background after expiry. (Contributed bt Stephen O'Donnell)

This commit is contained in:
Arpit Agarwal 2016-06-27 09:36:05 -07:00
parent 73615a789d
commit 9683eab0e1
5 changed files with 423 additions and 4 deletions

View File

@ -296,6 +296,19 @@ public class CommonConfigurationKeysPublic {
public static final long HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT = public static final long HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT =
5000; 5000;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */ /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD =
"hadoop.security.groups.cache.background.reload";
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a>. */
public static final boolean
HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_DEFAULT = false;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a>. */
public static final String
HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_THREADS =
"hadoop.security.groups.cache.background.reload.threads";
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a>. */
public static final int
HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_THREADS_DEFAULT = 3;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a>.*/
public static final String HADOOP_SECURITY_AUTHENTICATION = public static final String HADOOP_SECURITY_AUTHENTICATION =
"hadoop.security.authentication"; "hadoop.security.authentication";
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */ /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */

View File

@ -24,8 +24,13 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer; import org.apache.htrace.core.Tracer;
@ -35,6 +40,11 @@
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -71,6 +81,17 @@ public class Groups {
private final long warningDeltaMs; private final long warningDeltaMs;
private final Timer timer; private final Timer timer;
private Set<String> negativeCache; private Set<String> negativeCache;
private final boolean reloadGroupsInBackground;
private final int reloadGroupsThreadCount;
private final AtomicLong backgroundRefreshSuccess =
new AtomicLong(0);
private final AtomicLong backgroundRefreshException =
new AtomicLong(0);
private final AtomicLong backgroundRefreshQueued =
new AtomicLong(0);
private final AtomicLong backgroundRefreshRunning =
new AtomicLong(0);
public Groups(Configuration conf) { public Groups(Configuration conf) {
this(conf, new Timer()); this(conf, new Timer());
@ -93,6 +114,18 @@ public Groups(Configuration conf, final Timer timer) {
warningDeltaMs = warningDeltaMs =
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS, conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS,
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT); CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT);
reloadGroupsInBackground =
conf.getBoolean(
CommonConfigurationKeys.
HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
CommonConfigurationKeys.
HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_DEFAULT);
reloadGroupsThreadCount =
conf.getInt(
CommonConfigurationKeys.
HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_THREADS,
CommonConfigurationKeys.
HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_THREADS_DEFAULT);
parseStaticMapping(conf); parseStaticMapping(conf);
this.timer = timer; this.timer = timer;
@ -187,6 +220,22 @@ public List<String> getGroups(final String user) throws IOException {
} }
} }
public long getBackgroundRefreshSuccess() {
return backgroundRefreshSuccess.get();
}
public long getBackgroundRefreshException() {
return backgroundRefreshException.get();
}
public long getBackgroundRefreshQueued() {
return backgroundRefreshQueued.get();
}
public long getBackgroundRefreshRunning() {
return backgroundRefreshRunning.get();
}
/** /**
* Convert millisecond times from hadoop's timer to guava's nanosecond ticker. * Convert millisecond times from hadoop's timer to guava's nanosecond ticker.
*/ */
@ -208,11 +257,41 @@ public long read() {
* Deals with loading data into the cache. * Deals with loading data into the cache.
*/ */
private class GroupCacheLoader extends CacheLoader<String, List<String>> { private class GroupCacheLoader extends CacheLoader<String, List<String>> {
private ListeningExecutorService executorService;
GroupCacheLoader() {
if (reloadGroupsInBackground) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Group-Cache-Reload")
.setDaemon(true)
.build();
// With coreThreadCount == maxThreadCount we effectively
// create a fixed size thread pool. As allowCoreThreadTimeOut
// has been set, all threads will die after 60 seconds of non use
ThreadPoolExecutor parentExecutor = new ThreadPoolExecutor(
reloadGroupsThreadCount,
reloadGroupsThreadCount,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
threadFactory);
parentExecutor.allowCoreThreadTimeOut(true);
executorService = MoreExecutors.listeningDecorator(parentExecutor);
}
}
/** /**
* This method will block if a cache entry doesn't exist, and * This method will block if a cache entry doesn't exist, and
* any subsequent requests for the same user will wait on this * any subsequent requests for the same user will wait on this
* request to return. If a user already exists in the cache, * request to return. If a user already exists in the cache,
* this will be run in the background. * and when the key expires, the first call to reload the key
* will block, but subsequent requests will return the old
* value until the blocking thread returns.
* If reloadGroupsInBackground is true, then the thread that
* needs to refresh an expired key will not block either. Instead
* it will return the old cache value and schedule a background
* refresh
* @param user key of cache * @param user key of cache
* @return List of groups belonging to user * @return List of groups belonging to user
* @throws IOException to prevent caching negative entries * @throws IOException to prevent caching negative entries
@ -246,6 +325,44 @@ public List<String> load(String user) throws Exception {
return groups; return groups;
} }
/**
* Override the reload method to provide an asynchronous implementation. If
* reloadGroupsInBackground is false, then this method defers to the super
* implementation, otherwise is arranges for the cache to be updated later
*/
@Override
public ListenableFuture<List<String>> reload(final String key,
List<String> oldValue)
throws Exception {
if (!reloadGroupsInBackground) {
return super.reload(key, oldValue);
}
backgroundRefreshQueued.incrementAndGet();
ListenableFuture<List<String>> listenableFuture =
executorService.submit(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
boolean success = false;
try {
backgroundRefreshQueued.decrementAndGet();
backgroundRefreshRunning.incrementAndGet();
List<String> results = load(key);
success = true;
return results;
} finally {
backgroundRefreshRunning.decrementAndGet();
if (success) {
backgroundRefreshSuccess.incrementAndGet();
} else {
backgroundRefreshException.incrementAndGet();
}
}
}
});
return listenableFuture;
}
/** /**
* Queries impl for groups belonging to the user. This could involve I/O and take awhile. * Queries impl for groups belonging to the user. This could involve I/O and take awhile.
*/ */

View File

@ -165,6 +165,28 @@
</description> </description>
</property> </property>
<property>
<name>hadoop.security.groups.cache.background.reload</name>
<value>false</value>
<description>
Whether to reload expired user->group mappings using a background thread
pool. If set to true, a pool of
hadoop.security.groups.cache.background.reload.threads is created to
update the cache in the background.
</description>
</property>
<property>
<name>hadoop.security.groups.cache.background.reload.threads</name>
<value>3</value>
<description>
Only relevant if hadoop.security.groups.cache.background.reload is true.
Controls the number of concurrent background user->group cache entry
refreshes. Pending refresh requests beyond this value are queued and
processed when a thread is free.
</description>
</property>
<property> <property>
<name>hadoop.security.group.mapping.ldap.connection.timeout.ms</name> <name>hadoop.security.group.mapping.ldap.connection.timeout.ms</name>
<value>60000</value> <value>60000</value>

View File

@ -63,6 +63,10 @@ Caching/Negative caching
-------- --------
Since the group mapping resolution relies on external mechanisms, the NameNode performance may be impacted. To reduce the impact due to repeated lookups, Hadoop caches the groups returned by the service provider. The cache invalidate is configurable via `hadoop.security.groups.cache.secs`, and the default is 300 seconds. Since the group mapping resolution relies on external mechanisms, the NameNode performance may be impacted. To reduce the impact due to repeated lookups, Hadoop caches the groups returned by the service provider. The cache invalidate is configurable via `hadoop.security.groups.cache.secs`, and the default is 300 seconds.
With the default caching implementation, after `hadoop.security.groups.cache.secs` when the cache entry expires, the next thread to request group membership will query the group mapping service provider to lookup the current groups for the user. While this lookup is running, the thread that initiated it will block, while any other threads requesting groups for the same user will retrieve the previously cached values. If the refresh fails, the thread performing the refresh will throw an exception and the process will repeat for the next thread that requests a lookup for that value. If the lookup repeatedly fails, and the cache is not updated, after `hadoop.security.groups.cache.secs * 10` seconds the cached entry will be evicted and all threads will block until a successful reload is performed.
To avoid any threads blocking when the cached entry expires, set `hadoop.security.groups.cache.background.reload` to true. This enables a small thread pool of `hadoop.security.groups.cache.background.reload.threads` threads having 3 threads by default. With this setting, when the cache is queried for an expired entry, the expired result is returned immediately and a task is queued to refresh the cache in the background. If the background refresh fails a new refresh operation will be queued by the next request to the cache, until `hadoop.security.groups.cache.secs * 10` when the cached entry will be evicted and all threads will block for that user until a successful reload occurs.
To avoid spamming NameNode with unknown users, Hadoop employs negative caching so that if the result of the lookup is empty, return an empty group directly instead of performing more group mapping queries, To avoid spamming NameNode with unknown users, Hadoop employs negative caching so that if the result of the lookup is empty, return an empty group directly instead of performing more group mapping queries,
The cache invalidation is configurable via `hadoop.security.groups.negative-cache.secs`. The default is 30 seconds, so if group mapping service providers returns no group for a user, no lookup will be performed for the same user within 30 seconds. The cache invalidation is configurable via `hadoop.security.groups.negative-cache.secs`. The default is 30 seconds, so if group mapping service providers returns no group for a user, no lookup will be performed for the same user within 30 seconds.

View File

@ -50,8 +50,8 @@ public class TestGroupsCaching {
private Configuration conf; private Configuration conf;
@Before @Before
public void setup() { public void setup() throws IOException {
FakeGroupMapping.resetRequestCount(); FakeGroupMapping.clearAll();
ExceptionalGroupMapping.resetRequestCount(); ExceptionalGroupMapping.resetRequestCount();
conf = new Configuration(); conf = new Configuration();
@ -66,13 +66,18 @@ public static class FakeGroupMapping extends ShellBasedUnixGroupsMapping {
private static Set<String> blackList = new HashSet<String>(); private static Set<String> blackList = new HashSet<String>();
private static int requestCount = 0; private static int requestCount = 0;
private static long getGroupsDelayMs = 0; private static long getGroupsDelayMs = 0;
private static boolean throwException;
@Override @Override
public List<String> getGroups(String user) throws IOException { public List<String> getGroups(String user) throws IOException {
LOG.info("Getting groups for " + user); LOG.info("Getting groups for " + user);
delayIfNecessary();
requestCount++; requestCount++;
delayIfNecessary(); if (throwException) {
throw new IOException("For test");
}
if (blackList.contains(user)) { if (blackList.contains(user)) {
return new LinkedList<String>(); return new LinkedList<String>();
@ -102,6 +107,15 @@ public static void clearBlackList() throws IOException {
blackList.clear(); blackList.clear();
} }
public static void clearAll() throws IOException {
LOG.info("Resetting FakeGroupMapping");
blackList.clear();
allGroups.clear();
requestCount = 0;
getGroupsDelayMs = 0;
throwException = false;
}
@Override @Override
public void cacheGroupsAdd(List<String> groups) throws IOException { public void cacheGroupsAdd(List<String> groups) throws IOException {
LOG.info("Adding " + groups + " to groups."); LOG.info("Adding " + groups + " to groups.");
@ -124,6 +138,10 @@ public static void resetRequestCount() {
public static void setGetGroupsDelayMs(long delayMs) { public static void setGetGroupsDelayMs(long delayMs) {
getGroupsDelayMs = delayMs; getGroupsDelayMs = delayMs;
} }
public static void setThrowException(boolean throwIfTrue) {
throwException = throwIfTrue;
}
} }
public static class ExceptionalGroupMapping extends ShellBasedUnixGroupsMapping { public static class ExceptionalGroupMapping extends ShellBasedUnixGroupsMapping {
@ -403,6 +421,251 @@ public void run() {
assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount()); assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
} }
@Test
public void testThreadNotBlockedWhenExpiredEntryExistsWithBackgroundRefresh()
throws Exception {
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
conf.setBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
true);
FakeTimer timer = new FakeTimer();
final Groups groups = new Groups(conf, timer);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
// We make an initial request to populate the cache
groups.getGroups("me");
// Further lookups will have a delay
FakeGroupMapping.setGetGroupsDelayMs(100);
// add another groups
groups.cacheGroupsAdd(Arrays.asList("grp3"));
int startingRequestCount = FakeGroupMapping.getRequestCount();
// Then expire that entry
timer.advance(4 * 1000);
// Now get the cache entry - it should return immediately
// with the old value and the cache will not have completed
// a request to getGroups yet.
assertEquals(groups.getGroups("me").size(), 2);
assertEquals(startingRequestCount, FakeGroupMapping.getRequestCount());
// Now sleep for over the delay time and the request count should
// have completed
Thread.sleep(110);
assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
// Another call to get groups should give 3 groups instead of 2
assertEquals(groups.getGroups("me").size(), 3);
}
@Test
public void testThreadBlockedWhenExpiredEntryExistsWithoutBackgroundRefresh()
throws Exception {
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
conf.setBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
false);
FakeTimer timer = new FakeTimer();
final Groups groups = new Groups(conf, timer);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
// We make an initial request to populate the cache
groups.getGroups("me");
// Further lookups will have a delay
FakeGroupMapping.setGetGroupsDelayMs(100);
// add another group
groups.cacheGroupsAdd(Arrays.asList("grp3"));
int startingRequestCount = FakeGroupMapping.getRequestCount();
// Then expire that entry
timer.advance(4 * 1000);
// Now get the cache entry - it should block and return the new
// 3 group value
assertEquals(groups.getGroups("me").size(), 3);
assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
}
@Test
public void testExceptionOnBackgroundRefreshHandled() throws Exception {
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
conf.setBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
true);
FakeTimer timer = new FakeTimer();
final Groups groups = new Groups(conf, timer);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
// We make an initial request to populate the cache
groups.getGroups("me");
// add another group
groups.cacheGroupsAdd(Arrays.asList("grp3"));
int startingRequestCount = FakeGroupMapping.getRequestCount();
// Arrange for an exception to occur only on the
// second call
FakeGroupMapping.setThrowException(true);
// Then expire that entry
timer.advance(4 * 1000);
// Now get the cache entry - it should return immediately
// with the old value and the cache will not have completed
// a request to getGroups yet.
assertEquals(groups.getGroups("me").size(), 2);
assertEquals(startingRequestCount, FakeGroupMapping.getRequestCount());
// Now sleep for a short time and re-check the request count. It should have
// increased, but the exception means the cache will not have updated
Thread.sleep(50);
FakeGroupMapping.setThrowException(false);
assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
assertEquals(groups.getGroups("me").size(), 2);
// Now sleep another short time - the 3rd call to getGroups above
// will have kicked off another refresh that updates the cache
Thread.sleep(50);
assertEquals(startingRequestCount + 2, FakeGroupMapping.getRequestCount());
assertEquals(groups.getGroups("me").size(), 3);
}
@Test
public void testEntriesExpireIfBackgroundRefreshFails() throws Exception {
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
conf.setBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
true);
FakeTimer timer = new FakeTimer();
final Groups groups = new Groups(conf, timer);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
// We make an initial request to populate the cache
groups.getGroups("me");
// Now make all calls to the FakeGroupMapper throw exceptions
FakeGroupMapping.setThrowException(true);
// The cache entry expires for refresh after 1 second
// It expires for eviction after 1 * 10 seconds after it was last written
// So if we call getGroups repeatedly over 9 seconds, 9 refreshes should
// be triggered which will fail to update the key, but the keys old value
// will be retrievable until it is evicted after about 10 seconds.
for(int i=0; i<9; i++) {
assertEquals(groups.getGroups("me").size(), 2);
timer.advance(1 * 1000);
}
// Wait until the 11th second. The call to getGroups should throw
// an exception as the key will have been evicted and FakeGroupMapping
// will throw IO Exception when it is asked for new groups. In this case
// load must be called synchronously as there is no key present
timer.advance(2 * 1000);
try {
groups.getGroups("me");
fail("Should have thrown an exception here");
} catch (Exception e) {
// pass
}
// Finally check groups are retrieve again after FakeGroupMapping
// stops throw exceptions
FakeGroupMapping.setThrowException(false);
assertEquals(groups.getGroups("me").size(), 2);
}
@Test
public void testBackgroundRefreshCounters()
throws IOException, InterruptedException {
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
conf.setBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
true);
conf.setInt(
CommonConfigurationKeys.
HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_THREADS,
2);
FakeTimer timer = new FakeTimer();
final Groups groups = new Groups(conf, timer);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
// populate the cache
String[] grps = {"one", "two", "three", "four", "five"};
for (String g: grps) {
groups.getGroups(g);
}
// expire the cache
timer.advance(2*1000);
FakeGroupMapping.setGetGroupsDelayMs(40);
// Request all groups again, as there are 2 threads to process them
// 3 should get queued and 2 should be running
for (String g: grps) {
groups.getGroups(g);
}
Thread.sleep(20);
assertEquals(groups.getBackgroundRefreshQueued(), 3);
assertEquals(groups.getBackgroundRefreshRunning(), 2);
// After 120ms all should have completed running
Thread.sleep(120);
assertEquals(groups.getBackgroundRefreshQueued(), 0);
assertEquals(groups.getBackgroundRefreshRunning(), 0);
assertEquals(groups.getBackgroundRefreshSuccess(), 5);
// Now run again, this time throwing exceptions but no delay
timer.advance(2*1000);
FakeGroupMapping.setGetGroupsDelayMs(0);
FakeGroupMapping.setThrowException(true);
for (String g: grps) {
groups.getGroups(g);
}
Thread.sleep(20);
assertEquals(groups.getBackgroundRefreshQueued(), 0);
assertEquals(groups.getBackgroundRefreshRunning(), 0);
assertEquals(groups.getBackgroundRefreshSuccess(), 5);
assertEquals(groups.getBackgroundRefreshException(), 5);
}
@Test
public void testExceptionCallingLoadWithoutBackgroundRefreshReturnsOldValue()
throws Exception {
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
conf.setBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
false);
FakeTimer timer = new FakeTimer();
final Groups groups = new Groups(conf, timer);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
// First populate the cash
assertEquals(groups.getGroups("me").size(), 2);
// Advance the timer so a refresh is required
timer.advance(2 * 1000);
// This call should throw an exception
FakeGroupMapping.setThrowException(true);
assertEquals(groups.getGroups("me").size(), 2);
}
@Test @Test
public void testCacheEntriesExpire() throws Exception { public void testCacheEntriesExpire() throws Exception {
conf.setLong( conf.setLong(