HDFS-8429. Avoid stuck threads if there is an error in DomainSocketWatcher that stops the thread. (zhouyingchao via cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2015-05-28 11:52:28 -07:00
parent d2d95bfe88
commit 246cefa089
4 changed files with 99 additions and 2 deletions

View File

@ -782,6 +782,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12035. shellcheck plugin displays a wrong version potentially HADOOP-12035. shellcheck plugin displays a wrong version potentially
(Kengo Seki via aw) (Kengo Seki via aw)
HDFS-8429. Avoid stuck threads if there is an error in DomainSocketWatcher
that stops the thread. (zhouyingchao via cmccabe)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -470,6 +470,7 @@ public void run() {
// Handle pending additions (before pending removes). // Handle pending additions (before pending removes).
for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) { for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) {
Entry entry = iter.next(); Entry entry = iter.next();
iter.remove();
DomainSocket sock = entry.getDomainSocket(); DomainSocket sock = entry.getDomainSocket();
Entry prevEntry = entries.put(sock.fd, entry); Entry prevEntry = entries.put(sock.fd, entry);
Preconditions.checkState(prevEntry == null, Preconditions.checkState(prevEntry == null,
@ -479,7 +480,6 @@ public void run() {
LOG.trace(this + ": adding fd " + sock.fd); LOG.trace(this + ": adding fd " + sock.fd);
} }
fdSet.add(sock.fd); fdSet.add(sock.fd);
iter.remove();
} }
// Handle pending removals // Handle pending removals
while (true) { while (true) {
@ -525,6 +525,25 @@ public void run() {
} }
entries.clear(); entries.clear();
fdSet.close(); fdSet.close();
closed = true;
if (!(toAdd.isEmpty() && toRemove.isEmpty())) {
// Items in toAdd might not be added to entries, handle it here
for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext();) {
Entry entry = iter.next();
entry.getDomainSocket().refCount.unreference();
entry.getHandler().handle(entry.getDomainSocket());
IOUtils.cleanup(LOG, entry.getDomainSocket());
iter.remove();
}
// Items in toRemove might not be really removed, handle it here
while (true) {
Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry();
if (entry == null)
break;
sendCallback("close", entries, fdSet, entry.getValue().fd);
}
}
processedCond.signalAll();
} finally { } finally {
lock.unlock(); lock.unlock();
} }

View File

@ -212,7 +212,7 @@ done:
free(carr); free(carr);
if (jthr) { if (jthr) {
(*env)->DeleteLocalRef(env, jarr); (*env)->DeleteLocalRef(env, jarr);
jarr = NULL; (*env)->Throw(env, jthr);
} }
return jarr; return jarr;
} }

View File

@ -22,6 +22,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -181,6 +182,80 @@ public void run() {
watcher.close(); watcher.close();
} }
@Test(timeout = 300000)
public void testStressInterruption() throws Exception {
final int SOCKET_NUM = 250;
final ReentrantLock lock = new ReentrantLock();
final DomainSocketWatcher watcher = newDomainSocketWatcher(10);
final ArrayList<DomainSocket[]> pairs = new ArrayList<DomainSocket[]>();
final AtomicInteger handled = new AtomicInteger(0);
final Thread adderThread = new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < SOCKET_NUM; i++) {
DomainSocket pair[] = DomainSocket.socketpair();
watcher.add(pair[1], new DomainSocketWatcher.Handler() {
@Override
public boolean handle(DomainSocket sock) {
handled.incrementAndGet();
return true;
}
});
lock.lock();
try {
pairs.add(pair);
} finally {
lock.unlock();
}
TimeUnit.MILLISECONDS.sleep(1);
}
} catch (Throwable e) {
LOG.error(e);
throw new RuntimeException(e);
}
}
});
final Thread removerThread = new Thread(new Runnable() {
@Override
public void run() {
final Random random = new Random();
try {
while (handled.get() != SOCKET_NUM) {
lock.lock();
try {
if (!pairs.isEmpty()) {
int idx = random.nextInt(pairs.size());
DomainSocket pair[] = pairs.remove(idx);
if (random.nextBoolean()) {
pair[0].close();
} else {
watcher.remove(pair[1]);
}
TimeUnit.MILLISECONDS.sleep(1);
}
} finally {
lock.unlock();
}
}
} catch (Throwable e) {
LOG.error(e);
throw new RuntimeException(e);
}
}
});
adderThread.start();
removerThread.start();
TimeUnit.MILLISECONDS.sleep(100);
watcher.watcherThread.interrupt();
Uninterruptibles.joinUninterruptibly(adderThread);
Uninterruptibles.joinUninterruptibly(removerThread);
Uninterruptibles.joinUninterruptibly(watcher.watcherThread);
}
/** /**
* Creates a new DomainSocketWatcher and tracks its thread for termination due * Creates a new DomainSocketWatcher and tracks its thread for termination due
* to an unexpected exception. At the end of each test, if there was an * to an unexpected exception. At the end of each test, if there was an