HDFS-15806. DeadNodeDetector should close all the threads when it is closed. Contributed by Jinglun.
(cherry picked from commit ff84a57483
)
This commit is contained in:
parent
4f79df4da1
commit
94766fdb13
@ -321,13 +321,7 @@ synchronized void unreference() {
|
|||||||
Preconditions.checkState(counter > 0);
|
Preconditions.checkState(counter > 0);
|
||||||
counter--;
|
counter--;
|
||||||
if (counter == 0 && deadNodeDetectionEnabled && deadNodeDetector != null) {
|
if (counter == 0 && deadNodeDetectionEnabled && deadNodeDetector != null) {
|
||||||
deadNodeDetector.interrupt();
|
deadNodeDetector.shutdown();
|
||||||
try {
|
|
||||||
deadNodeDetector.join();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn("Encountered exception while waiting to join on dead " +
|
|
||||||
"node detector thread.", e);
|
|
||||||
}
|
|
||||||
deadNodeDetector = null;
|
deadNodeDetector = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -271,6 +271,37 @@ public void run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown all the threads.
|
||||||
|
*/
|
||||||
|
public void shutdown() {
|
||||||
|
threadShutDown(this);
|
||||||
|
threadShutDown(probeDeadNodesSchedulerThr);
|
||||||
|
threadShutDown(probeSuspectNodesSchedulerThr);
|
||||||
|
probeDeadNodesThreadPool.shutdown();
|
||||||
|
probeSuspectNodesThreadPool.shutdown();
|
||||||
|
rpcThreadPool.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void threadShutDown(Thread thread) {
|
||||||
|
if (thread != null && thread.isAlive()) {
|
||||||
|
thread.interrupt();
|
||||||
|
try {
|
||||||
|
thread.join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean isThreadsShutdown() {
|
||||||
|
return !this.isAlive() && !probeDeadNodesSchedulerThr.isAlive()
|
||||||
|
&& !probeSuspectNodesSchedulerThr.isAlive()
|
||||||
|
&& probeDeadNodesThreadPool.isShutdown()
|
||||||
|
&& probeSuspectNodesThreadPool.isShutdown()
|
||||||
|
&& rpcThreadPool.isShutdown();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static void setDisabledProbeThreadForTest(
|
static void setDisabledProbeThreadForTest(
|
||||||
boolean disabledProbeThreadForTest) {
|
boolean disabledProbeThreadForTest) {
|
||||||
|
@ -357,6 +357,18 @@ public void testCloseDeadNodeDetector() throws Exception {
|
|||||||
dfs1.close();
|
dfs1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeadNodeDetectorThreadsShutdown() throws Exception {
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem
|
||||||
|
.newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
|
||||||
|
DeadNodeDetector detector = dfs.getClient().getDeadNodeDetector();
|
||||||
|
assertNotNull(detector);
|
||||||
|
dfs.close();
|
||||||
|
assertTrue(detector.isThreadsShutdown());
|
||||||
|
detector = dfs.getClient().getDeadNodeDetector();
|
||||||
|
assertNull(detector);
|
||||||
|
}
|
||||||
|
|
||||||
private void createFile(FileSystem fs, Path filePath) throws IOException {
|
private void createFile(FileSystem fs, Path filePath) throws IOException {
|
||||||
FSDataOutputStream out = null;
|
FSDataOutputStream out = null;
|
||||||
try {
|
try {
|
||||||
|
Loading…
Reference in New Issue
Block a user