diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index b34420da5c..47e985b68e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -40,10 +40,10 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; -import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,8 +119,6 @@ public class ClientContext { private NodeBase clientNode; private boolean topologyResolutionEnabled; - private Daemon deadNodeDetectorThr = null; - /** * The switch to DeadNodeDetector. */ @@ -130,12 +128,18 @@ public class ClientContext { * Detect the dead datanodes in advance, and share this information among all * the DFSInputStreams in the same client. */ - private DeadNodeDetector deadNodeDetector = null; + private volatile DeadNodeDetector deadNodeDetector = null; + + /** + * Count the reference of ClientContext. + */ + private int counter = 0; /** * ShortCircuitCache array size. */ private final int clientShortCircuitNum; + private Configuration configuration; private ClientContext(String name, DfsClientConf conf, Configuration config) { @@ -149,6 +153,7 @@ private ClientContext(String name, DfsClientConf conf, this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf); } + this.configuration = config; this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), scConf.getSocketCacheExpiry()); this.keyProviderCache = new KeyProviderCache( @@ -159,11 +164,6 @@ private ClientContext(String name, DfsClientConf conf, this.byteArrayManager = ByteArrayManager.newInstance( conf.getWriteByteArrayManagerConf()); this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled(); - if (deadNodeDetectionEnabled && deadNodeDetector == null) { - deadNodeDetector = new DeadNodeDetector(name, config); - deadNodeDetectorThr = new Daemon(deadNodeDetector); - deadNodeDetectorThr.start(); - } initTopologyResolution(config); } @@ -201,6 +201,7 @@ public static ClientContext get(String name, DfsClientConf conf, context.printConfWarningIfNeeded(conf); } } + context.reference(); return context; } @@ -301,17 +302,33 @@ public DeadNodeDetector getDeadNodeDetector() { } /** - * Close dead node detector thread. + * Increment the counter. Start the dead node detector thread if there is no + * reference. */ - public void stopDeadNodeDetectorThread() { - if (deadNodeDetectorThr != null) { - deadNodeDetectorThr.interrupt(); + synchronized void reference() { + counter++; + if (deadNodeDetectionEnabled && deadNodeDetector == null) { + deadNodeDetector = new DeadNodeDetector(name, configuration); + deadNodeDetector.start(); + } + } + + /** + * Decrement the counter. Close the dead node detector thread if there is no + * reference. + */ + synchronized void unreference() { + Preconditions.checkState(counter > 0); + counter--; + if (counter == 0 && deadNodeDetectionEnabled && deadNodeDetector != null) { + deadNodeDetector.interrupt(); try { - deadNodeDetectorThr.join(); + deadNodeDetector.join(); } catch (InterruptedException e) { LOG.warn("Encountered exception while waiting to join on dead " + "node detector thread.", e); } + deadNodeDetector = null; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index e9ca594d9d..b210f91041 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -652,7 +652,7 @@ public synchronized void close() throws IOException { clientRunning = false; // close dead node detector thread if (!disabledStopDeadNodeDetectorThreadForTest) { - clientContext.stopDeadNodeDetectorThread(); + clientContext.unreference(); } // close connections to the namenode @@ -3387,4 +3387,11 @@ public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream, private boolean isDeadNodeDetectionEnabled() { return clientContext.isDeadNodeDetectionEnabled(); } + + /** + * Obtain DeadNodeDetector of the current client. + */ + public DeadNodeDetector getDeadNodeDetector() { + return clientContext.getDeadNodeDetector(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java index fd5e9c31c5..8066b8f5c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java @@ -62,7 +62,7 @@ * Detect the dead nodes in advance, and share this information among all the * DFSInputStreams in the same client. */ -public class DeadNodeDetector implements Runnable { +public class DeadNodeDetector extends Daemon { public static final Logger LOG = LoggerFactory.getLogger(DeadNodeDetector.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java index 09e670211a..9c52fcd8d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import java.net.URI; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -43,6 +44,11 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * Tests for dead node detection in DFSClient. @@ -320,6 +326,37 @@ public void testDeadNodeDetectionSuspectNode() throws Exception { } } + @Test + public void testCloseDeadNodeDetector() throws Exception { + DistributedFileSystem dfs0 = (DistributedFileSystem) FileSystem + .newInstance(new URI("hdfs://127.0.0.1:2001/"), conf); + DistributedFileSystem dfs1 = (DistributedFileSystem) FileSystem + .newInstance(new URI("hdfs://127.0.0.1:2001/"), conf); + // The DeadNodeDetector is shared by different DFSClients. + DeadNodeDetector detector = dfs0.getClient().getDeadNodeDetector(); + assertNotNull(detector); + assertSame(detector, dfs1.getClient().getDeadNodeDetector()); + // Close one client. The dead node detector should be alive. + dfs0.close(); + detector = dfs0.getClient().getDeadNodeDetector(); + assertNotNull(detector); + assertSame(detector, dfs1.getClient().getDeadNodeDetector()); + assertTrue(detector.isAlive()); + // Close all clients. The dead node detector should be closed. + dfs1.close(); + detector = dfs0.getClient().getDeadNodeDetector(); + assertNull(detector); + assertSame(detector, dfs1.getClient().getDeadNodeDetector()); + // Create a new client. The dead node detector should be alive. + dfs1 = (DistributedFileSystem) FileSystem + .newInstance(new URI("hdfs://127.0.0.1:2001/"), conf); + DeadNodeDetector newDetector = dfs0.getClient().getDeadNodeDetector(); + assertNotNull(newDetector); + assertTrue(newDetector.isAlive()); + assertNotSame(detector, newDetector); + dfs1.close(); + } + private void createFile(FileSystem fs, Path filePath) throws IOException { FSDataOutputStream out = null; try {