HDFS-14575. LeaseRenewer#daemon threads leak in DFSClient. Contributed by Renukaprasad C.
Co-authored-by: Tao Yang <taoyang1@apache.org>
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 10b79a26fe
)
This commit is contained in:
parent
e33509b094
commit
06954af6f0
@ -509,7 +509,15 @@ private void beginFileLease(final long inodeId, final DFSOutputStream out)
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
synchronized (filesBeingWritten) {
|
synchronized (filesBeingWritten) {
|
||||||
putFileBeingWritten(inodeId, out);
|
putFileBeingWritten(inodeId, out);
|
||||||
getLeaseRenewer().put(this);
|
LeaseRenewer renewer = getLeaseRenewer();
|
||||||
|
boolean result = renewer.put(this);
|
||||||
|
if (!result) {
|
||||||
|
// Existing LeaseRenewer cannot add another Daemon, so remove existing
|
||||||
|
// and add new one.
|
||||||
|
LeaseRenewer.remove(renewer);
|
||||||
|
renewer = getLeaseRenewer();
|
||||||
|
renewer.put(this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
@ -79,6 +80,8 @@ public class LeaseRenewer {
|
|||||||
private static long leaseRenewerGraceDefault = 60*1000L;
|
private static long leaseRenewerGraceDefault = 60*1000L;
|
||||||
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
|
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
|
||||||
|
|
||||||
|
private AtomicBoolean isLSRunning = new AtomicBoolean(false);
|
||||||
|
|
||||||
/** Get a {@link LeaseRenewer} instance */
|
/** Get a {@link LeaseRenewer} instance */
|
||||||
public static LeaseRenewer getInstance(final String authority,
|
public static LeaseRenewer getInstance(final String authority,
|
||||||
final UserGroupInformation ugi, final DFSClient dfsc) {
|
final UserGroupInformation ugi, final DFSClient dfsc) {
|
||||||
@ -87,6 +90,15 @@ public static LeaseRenewer getInstance(final String authority,
|
|||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the given renewer from the Factory.
|
||||||
|
* Subsequent call will receive new {@link LeaseRenewer} instance.
|
||||||
|
* @param renewer Instance to be cleared from Factory
|
||||||
|
*/
|
||||||
|
public static void remove(LeaseRenewer renewer) {
|
||||||
|
Factory.INSTANCE.remove(renewer);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A factory for sharing {@link LeaseRenewer} objects
|
* A factory for sharing {@link LeaseRenewer} objects
|
||||||
* among {@link DFSClient} instances
|
* among {@link DFSClient} instances
|
||||||
@ -156,6 +168,9 @@ private synchronized void remove(final LeaseRenewer r) {
|
|||||||
final LeaseRenewer stored = renewers.get(r.factorykey);
|
final LeaseRenewer stored = renewers.get(r.factorykey);
|
||||||
//Since a renewer may expire, the stored renewer can be different.
|
//Since a renewer may expire, the stored renewer can be different.
|
||||||
if (r == stored) {
|
if (r == stored) {
|
||||||
|
// Expire LeaseRenewer daemon thread as soon as possible.
|
||||||
|
r.clearClients();
|
||||||
|
r.setEmptyTime(0);
|
||||||
renewers.remove(r.factorykey);
|
renewers.remove(r.factorykey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -241,6 +256,10 @@ private synchronized void addClient(final DFSClient dfsc) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized void clearClients() {
|
||||||
|
dfsclients.clear();
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized boolean clientsRunning() {
|
private synchronized boolean clientsRunning() {
|
||||||
for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
|
for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
|
||||||
if (!i.next().isClientRunning()) {
|
if (!i.next().isClientRunning()) {
|
||||||
@ -292,11 +311,18 @@ private synchronized boolean isRenewerExpired() {
|
|||||||
&& Time.monotonicNow() - emptyTime > gracePeriod;
|
&& Time.monotonicNow() - emptyTime > gracePeriod;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void put(final DFSClient dfsc) {
|
public synchronized boolean put(final DFSClient dfsc) {
|
||||||
if (dfsc.isClientRunning()) {
|
if (dfsc.isClientRunning()) {
|
||||||
if (!isRunning() || isRenewerExpired()) {
|
if (!isRunning() || isRenewerExpired()) {
|
||||||
//start a new deamon with a new id.
|
// Start a new daemon with a new id.
|
||||||
final int id = ++currentId;
|
final int id = ++currentId;
|
||||||
|
if (isLSRunning.get()) {
|
||||||
|
// Not allowed to add multiple daemons into LeaseRenewer, let client
|
||||||
|
// create new LR and continue to acquire lease.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
isLSRunning.getAndSet(true);
|
||||||
|
|
||||||
daemon = new Daemon(new Runnable() {
|
daemon = new Daemon(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
@ -328,6 +354,7 @@ public String toString() {
|
|||||||
}
|
}
|
||||||
emptyTime = Long.MAX_VALUE;
|
emptyTime = Long.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@ -426,9 +453,6 @@ private void run(final int id) throws InterruptedException {
|
|||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
|
DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
|
||||||
dfsclientsCopy = new ArrayList<>(dfsclients);
|
dfsclientsCopy = new ArrayList<>(dfsclients);
|
||||||
dfsclients.clear();
|
|
||||||
//Expire the current LeaseRenewer thread.
|
|
||||||
emptyTime = 0;
|
|
||||||
Factory.INSTANCE.remove(LeaseRenewer.this);
|
Factory.INSTANCE.remove(LeaseRenewer.this);
|
||||||
}
|
}
|
||||||
for (DFSClient dfsClient : dfsclientsCopy) {
|
for (DFSClient dfsClient : dfsclientsCopy) {
|
||||||
|
@ -31,7 +31,11 @@
|
|||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.lang.management.ThreadInfo;
|
||||||
|
import java.lang.management.ThreadMXBean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import static org.junit.Assert.assertSame;
|
import static org.junit.Assert.assertSame;
|
||||||
|
|
||||||
@ -168,6 +172,11 @@ public Boolean get() {
|
|||||||
|
|
||||||
renewer.closeClient(mockClient1);
|
renewer.closeClient(mockClient1);
|
||||||
renewer.closeClient(mockClient2);
|
renewer.closeClient(mockClient2);
|
||||||
|
renewer.closeClient(MOCK_DFSCLIENT);
|
||||||
|
|
||||||
|
// Make sure renewer is not running due to expiration.
|
||||||
|
Thread.sleep(FAST_GRACE_PERIOD * 2);
|
||||||
|
Assert.assertTrue(!renewer.isRunning());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -197,4 +206,82 @@ public void testThreadName() throws Exception {
|
|||||||
Assert.assertFalse(renewer.isRunning());
|
Assert.assertFalse(renewer.isRunning());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for HDFS-14575. In this fix, the LeaseRenewer clears all clients
|
||||||
|
* and expires immediately via setting empty time to 0 before it's removed
|
||||||
|
* from factory. Previously, LeaseRenewer#daemon thread might leak.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDaemonThreadLeak() throws Exception {
|
||||||
|
Assert.assertFalse("Renewer not initially running", renewer.isRunning());
|
||||||
|
|
||||||
|
// Pretend to create a file#1, daemon#1 starts
|
||||||
|
renewer.put(MOCK_DFSCLIENT);
|
||||||
|
Assert.assertTrue("Renewer should have started running",
|
||||||
|
renewer.isRunning());
|
||||||
|
Pattern daemonThreadNamePattern = Pattern.compile("LeaseRenewer:\\S+");
|
||||||
|
Assert.assertEquals(1, countThreadMatching(daemonThreadNamePattern));
|
||||||
|
|
||||||
|
// Pretend to create file#2, daemon#2 starts due to expiration
|
||||||
|
LeaseRenewer lastRenewer = renewer;
|
||||||
|
renewer =
|
||||||
|
LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
|
||||||
|
Assert.assertEquals(lastRenewer, renewer);
|
||||||
|
|
||||||
|
// Pretend to close file#1
|
||||||
|
renewer.closeClient(MOCK_DFSCLIENT);
|
||||||
|
Assert.assertEquals(1, countThreadMatching(daemonThreadNamePattern));
|
||||||
|
|
||||||
|
// Pretend to be expired
|
||||||
|
renewer.setEmptyTime(0);
|
||||||
|
|
||||||
|
renewer =
|
||||||
|
LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
|
||||||
|
renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
|
||||||
|
boolean success = renewer.put(MOCK_DFSCLIENT);
|
||||||
|
if (!success) {
|
||||||
|
LeaseRenewer.remove(renewer);
|
||||||
|
renewer =
|
||||||
|
LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
|
||||||
|
renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
|
||||||
|
renewer.put(MOCK_DFSCLIENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
int threadCount = countThreadMatching(daemonThreadNamePattern);
|
||||||
|
//Sometimes old LR#Daemon gets closed and lead to count 1 (rare scenario)
|
||||||
|
Assert.assertTrue(1 == threadCount || 2 == threadCount);
|
||||||
|
|
||||||
|
// After grace period, both daemon#1 and renewer#1 will be removed due to
|
||||||
|
// expiration, then daemon#2 will leak before HDFS-14575.
|
||||||
|
Thread.sleep(FAST_GRACE_PERIOD * 2);
|
||||||
|
|
||||||
|
// Pretend to close file#2, renewer#2 will be created
|
||||||
|
lastRenewer = renewer;
|
||||||
|
renewer =
|
||||||
|
LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
|
||||||
|
Assert.assertEquals(lastRenewer, renewer);
|
||||||
|
renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
|
||||||
|
renewer.closeClient(MOCK_DFSCLIENT);
|
||||||
|
renewer.setEmptyTime(0);
|
||||||
|
// Make sure LeaseRenewer#daemon threads will terminate after grace period
|
||||||
|
Thread.sleep(FAST_GRACE_PERIOD * 2);
|
||||||
|
Assert.assertEquals("LeaseRenewer#daemon thread leaks", 0,
|
||||||
|
countThreadMatching(daemonThreadNamePattern));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int countThreadMatching(Pattern pattern) {
|
||||||
|
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
|
||||||
|
ThreadInfo[] infos =
|
||||||
|
threadBean.getThreadInfo(threadBean.getAllThreadIds(), 1);
|
||||||
|
int count = 0;
|
||||||
|
for (ThreadInfo info : infos) {
|
||||||
|
if (info == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (pattern.matcher(info.getThreadName()).matches()) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user