HDFS-14575. LeaseRenewer#daemon threads leak in DFSClient. Contributed by Renukaprasad C.

Co-authored-by: Tao Yang <taoyang1@apache.org>
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
He Xiaoqiao 2021-06-22 00:32:55 +08:00
parent 6e11461eaa
commit 10b79a26fe
No known key found for this signature in database
GPG Key ID: A80CC124E9A0FA63
3 changed files with 125 additions and 6 deletions

View File

@ -505,7 +505,15 @@ private void beginFileLease(final long inodeId, final DFSOutputStream out)
throws IOException { throws IOException {
synchronized (filesBeingWritten) { synchronized (filesBeingWritten) {
putFileBeingWritten(inodeId, out); putFileBeingWritten(inodeId, out);
getLeaseRenewer().put(this); LeaseRenewer renewer = getLeaseRenewer();
boolean result = renewer.put(this);
if (!result) {
// Existing LeaseRenewer cannot add another Daemon, so remove existing
// and add new one.
LeaseRenewer.remove(renewer);
renewer = getLeaseRenewer();
renewer.put(this);
}
} }
} }

View File

@ -26,6 +26,7 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -79,6 +80,8 @@ public class LeaseRenewer {
private static long leaseRenewerGraceDefault = 60*1000L; private static long leaseRenewerGraceDefault = 60*1000L;
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L; static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
private AtomicBoolean isLSRunning = new AtomicBoolean(false);
/** Get a {@link LeaseRenewer} instance */ /** Get a {@link LeaseRenewer} instance */
public static LeaseRenewer getInstance(final String authority, public static LeaseRenewer getInstance(final String authority,
final UserGroupInformation ugi, final DFSClient dfsc) { final UserGroupInformation ugi, final DFSClient dfsc) {
@ -87,6 +90,15 @@ public static LeaseRenewer getInstance(final String authority,
return r; return r;
} }
/**
* Remove the given renewer from the Factory.
* Subsequent call will receive new {@link LeaseRenewer} instance.
* @param renewer Instance to be cleared from Factory
*/
public static void remove(LeaseRenewer renewer) {
Factory.INSTANCE.remove(renewer);
}
/** /**
* A factory for sharing {@link LeaseRenewer} objects * A factory for sharing {@link LeaseRenewer} objects
* among {@link DFSClient} instances * among {@link DFSClient} instances
@ -156,6 +168,9 @@ private synchronized void remove(final LeaseRenewer r) {
final LeaseRenewer stored = renewers.get(r.factorykey); final LeaseRenewer stored = renewers.get(r.factorykey);
//Since a renewer may expire, the stored renewer can be different. //Since a renewer may expire, the stored renewer can be different.
if (r == stored) { if (r == stored) {
// Expire LeaseRenewer daemon thread as soon as possible.
r.clearClients();
r.setEmptyTime(0);
renewers.remove(r.factorykey); renewers.remove(r.factorykey);
} }
} }
@ -241,6 +256,10 @@ private synchronized void addClient(final DFSClient dfsc) {
} }
} }
private synchronized void clearClients() {
dfsclients.clear();
}
private synchronized boolean clientsRunning() { private synchronized boolean clientsRunning() {
for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) { for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
if (!i.next().isClientRunning()) { if (!i.next().isClientRunning()) {
@ -292,11 +311,18 @@ private synchronized boolean isRenewerExpired() {
&& Time.monotonicNow() - emptyTime > gracePeriod; && Time.monotonicNow() - emptyTime > gracePeriod;
} }
public synchronized void put(final DFSClient dfsc) { public synchronized boolean put(final DFSClient dfsc) {
if (dfsc.isClientRunning()) { if (dfsc.isClientRunning()) {
if (!isRunning() || isRenewerExpired()) { if (!isRunning() || isRenewerExpired()) {
//start a new deamon with a new id. // Start a new daemon with a new id.
final int id = ++currentId; final int id = ++currentId;
if (isLSRunning.get()) {
// Not allowed to add multiple daemons into LeaseRenewer, let client
// create new LR and continue to acquire lease.
return false;
}
isLSRunning.getAndSet(true);
daemon = new Daemon(new Runnable() { daemon = new Daemon(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -328,6 +354,7 @@ public String toString() {
} }
emptyTime = Long.MAX_VALUE; emptyTime = Long.MAX_VALUE;
} }
return true;
} }
@VisibleForTesting @VisibleForTesting
@ -426,9 +453,6 @@ private void run(final int id) throws InterruptedException {
synchronized (this) { synchronized (this) {
DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout(); DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
dfsclientsCopy = new ArrayList<>(dfsclients); dfsclientsCopy = new ArrayList<>(dfsclients);
dfsclients.clear();
//Expire the current LeaseRenewer thread.
emptyTime = 0;
Factory.INSTANCE.remove(LeaseRenewer.this); Factory.INSTANCE.remove(LeaseRenewer.this);
} }
for (DFSClient dfsClient : dfsclientsCopy) { for (DFSClient dfsClient : dfsclientsCopy) {

View File

@ -31,7 +31,11 @@
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
@ -168,6 +172,11 @@ public Boolean get() {
renewer.closeClient(mockClient1); renewer.closeClient(mockClient1);
renewer.closeClient(mockClient2); renewer.closeClient(mockClient2);
renewer.closeClient(MOCK_DFSCLIENT);
// Make sure renewer is not running due to expiration.
Thread.sleep(FAST_GRACE_PERIOD * 2);
Assert.assertTrue(!renewer.isRunning());
} }
@Test @Test
@ -197,4 +206,82 @@ public void testThreadName() throws Exception {
Assert.assertFalse(renewer.isRunning()); Assert.assertFalse(renewer.isRunning());
} }
/**
* Test for HDFS-14575. In this fix, the LeaseRenewer clears all clients
* and expires immediately via setting empty time to 0 before it's removed
* from factory. Previously, LeaseRenewer#daemon thread might leak.
*/
@Test
public void testDaemonThreadLeak() throws Exception {
Assert.assertFalse("Renewer not initially running", renewer.isRunning());
// Pretend to create a file#1, daemon#1 starts
renewer.put(MOCK_DFSCLIENT);
Assert.assertTrue("Renewer should have started running",
renewer.isRunning());
Pattern daemonThreadNamePattern = Pattern.compile("LeaseRenewer:\\S+");
Assert.assertEquals(1, countThreadMatching(daemonThreadNamePattern));
// Pretend to create file#2, daemon#2 starts due to expiration
LeaseRenewer lastRenewer = renewer;
renewer =
LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
Assert.assertEquals(lastRenewer, renewer);
// Pretend to close file#1
renewer.closeClient(MOCK_DFSCLIENT);
Assert.assertEquals(1, countThreadMatching(daemonThreadNamePattern));
// Pretend to be expired
renewer.setEmptyTime(0);
renewer =
LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
boolean success = renewer.put(MOCK_DFSCLIENT);
if (!success) {
LeaseRenewer.remove(renewer);
renewer =
LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
renewer.put(MOCK_DFSCLIENT);
}
int threadCount = countThreadMatching(daemonThreadNamePattern);
//Sometimes old LR#Daemon gets closed and lead to count 1 (rare scenario)
Assert.assertTrue(1 == threadCount || 2 == threadCount);
// After grace period, both daemon#1 and renewer#1 will be removed due to
// expiration, then daemon#2 will leak before HDFS-14575.
Thread.sleep(FAST_GRACE_PERIOD * 2);
// Pretend to close file#2, renewer#2 will be created
lastRenewer = renewer;
renewer =
LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
Assert.assertEquals(lastRenewer, renewer);
renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
renewer.closeClient(MOCK_DFSCLIENT);
renewer.setEmptyTime(0);
// Make sure LeaseRenewer#daemon threads will terminate after grace period
Thread.sleep(FAST_GRACE_PERIOD * 2);
Assert.assertEquals("LeaseRenewer#daemon thread leaks", 0,
countThreadMatching(daemonThreadNamePattern));
}
private static int countThreadMatching(Pattern pattern) {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] infos =
threadBean.getThreadInfo(threadBean.getAllThreadIds(), 1);
int count = 0;
for (ThreadInfo info : infos) {
if (info == null) {
continue;
}
if (pattern.matcher(info.getThreadName()).matches()) {
count++;
}
}
return count;
}
} }