HDFS-15661. The DeadNodeDetector should not be shared by different DFSClients. Contributed by Jinglun.

(cherry picked from commit f8769e0f4b)
This commit is contained in:
sunlisheng 2021-01-28 10:10:39 +08:00 committed by Wei-Chiu Chuang
parent f07bde90c9
commit 3ecd02ac78
4 changed files with 77 additions and 16 deletions

View File

@ -40,10 +40,10 @@
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping; import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -119,8 +119,6 @@ public class ClientContext {
private NodeBase clientNode; private NodeBase clientNode;
private boolean topologyResolutionEnabled; private boolean topologyResolutionEnabled;
private Daemon deadNodeDetectorThr = null;
/** /**
* The switch to DeadNodeDetector. * The switch to DeadNodeDetector.
*/ */
@ -130,12 +128,18 @@ public class ClientContext {
* Detect the dead datanodes in advance, and share this information among all * Detect the dead datanodes in advance, and share this information among all
* the DFSInputStreams in the same client. * the DFSInputStreams in the same client.
*/ */
private DeadNodeDetector deadNodeDetector = null; private volatile DeadNodeDetector deadNodeDetector = null;
/**
* Count the reference of ClientContext.
*/
private int counter = 0;
/** /**
* ShortCircuitCache array size. * ShortCircuitCache array size.
*/ */
private final int clientShortCircuitNum; private final int clientShortCircuitNum;
private Configuration configuration;
private ClientContext(String name, DfsClientConf conf, private ClientContext(String name, DfsClientConf conf,
Configuration config) { Configuration config) {
@ -149,6 +153,7 @@ private ClientContext(String name, DfsClientConf conf,
this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf); this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
} }
this.configuration = config;
this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
scConf.getSocketCacheExpiry()); scConf.getSocketCacheExpiry());
this.keyProviderCache = new KeyProviderCache( this.keyProviderCache = new KeyProviderCache(
@ -159,11 +164,6 @@ private ClientContext(String name, DfsClientConf conf,
this.byteArrayManager = ByteArrayManager.newInstance( this.byteArrayManager = ByteArrayManager.newInstance(
conf.getWriteByteArrayManagerConf()); conf.getWriteByteArrayManagerConf());
this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled(); this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
if (deadNodeDetectionEnabled && deadNodeDetector == null) {
deadNodeDetector = new DeadNodeDetector(name, config);
deadNodeDetectorThr = new Daemon(deadNodeDetector);
deadNodeDetectorThr.start();
}
initTopologyResolution(config); initTopologyResolution(config);
} }
@ -201,6 +201,7 @@ public static ClientContext get(String name, DfsClientConf conf,
context.printConfWarningIfNeeded(conf); context.printConfWarningIfNeeded(conf);
} }
} }
context.reference();
return context; return context;
} }
@ -301,17 +302,33 @@ public DeadNodeDetector getDeadNodeDetector() {
} }
/** /**
* Close dead node detector thread. * Increment the counter. Start the dead node detector thread if there is no
* reference.
*/ */
public void stopDeadNodeDetectorThread() { synchronized void reference() {
if (deadNodeDetectorThr != null) { counter++;
deadNodeDetectorThr.interrupt(); if (deadNodeDetectionEnabled && deadNodeDetector == null) {
deadNodeDetector = new DeadNodeDetector(name, configuration);
deadNodeDetector.start();
}
}
/**
* Decrement the counter. Close the dead node detector thread if there is no
* reference.
*/
synchronized void unreference() {
Preconditions.checkState(counter > 0);
counter--;
if (counter == 0 && deadNodeDetectionEnabled && deadNodeDetector != null) {
deadNodeDetector.interrupt();
try { try {
deadNodeDetectorThr.join(); deadNodeDetector.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Encountered exception while waiting to join on dead " + LOG.warn("Encountered exception while waiting to join on dead " +
"node detector thread.", e); "node detector thread.", e);
} }
deadNodeDetector = null;
} }
} }
} }

View File

@ -652,7 +652,7 @@ public synchronized void close() throws IOException {
clientRunning = false; clientRunning = false;
// close dead node detector thread // close dead node detector thread
if (!disabledStopDeadNodeDetectorThreadForTest) { if (!disabledStopDeadNodeDetectorThreadForTest) {
clientContext.stopDeadNodeDetectorThread(); clientContext.unreference();
} }
// close connections to the namenode // close connections to the namenode
@ -3387,4 +3387,11 @@ public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream,
private boolean isDeadNodeDetectionEnabled() { private boolean isDeadNodeDetectionEnabled() {
return clientContext.isDeadNodeDetectionEnabled(); return clientContext.isDeadNodeDetectionEnabled();
} }
/**
* Obtain DeadNodeDetector of the current client.
*/
public DeadNodeDetector getDeadNodeDetector() {
return clientContext.getDeadNodeDetector();
}
} }

View File

@ -62,7 +62,7 @@
* Detect the dead nodes in advance, and share this information among all the * Detect the dead nodes in advance, and share this information among all the
* DFSInputStreams in the same client. * DFSInputStreams in the same client.
*/ */
public class DeadNodeDetector implements Runnable { public class DeadNodeDetector extends Daemon {
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(DeadNodeDetector.class); LoggerFactory.getLogger(DeadNodeDetector.class);

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.net.URI;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -43,6 +44,11 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/** /**
* Tests for dead node detection in DFSClient. * Tests for dead node detection in DFSClient.
@ -320,6 +326,37 @@ public void testDeadNodeDetectionSuspectNode() throws Exception {
} }
} }
@Test
public void testCloseDeadNodeDetector() throws Exception {
DistributedFileSystem dfs0 = (DistributedFileSystem) FileSystem
.newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
DistributedFileSystem dfs1 = (DistributedFileSystem) FileSystem
.newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
// The DeadNodeDetector is shared by different DFSClients.
DeadNodeDetector detector = dfs0.getClient().getDeadNodeDetector();
assertNotNull(detector);
assertSame(detector, dfs1.getClient().getDeadNodeDetector());
// Close one client. The dead node detector should be alive.
dfs0.close();
detector = dfs0.getClient().getDeadNodeDetector();
assertNotNull(detector);
assertSame(detector, dfs1.getClient().getDeadNodeDetector());
assertTrue(detector.isAlive());
// Close all clients. The dead node detector should be closed.
dfs1.close();
detector = dfs0.getClient().getDeadNodeDetector();
assertNull(detector);
assertSame(detector, dfs1.getClient().getDeadNodeDetector());
// Create a new client. The dead node detector should be alive.
dfs1 = (DistributedFileSystem) FileSystem
.newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
DeadNodeDetector newDetector = dfs0.getClient().getDeadNodeDetector();
assertNotNull(newDetector);
assertTrue(newDetector.isAlive());
assertNotSame(detector, newDetector);
dfs1.close();
}
private void createFile(FileSystem fs, Path filePath) throws IOException { private void createFile(FileSystem fs, Path filePath) throws IOException {
FSDataOutputStream out = null; FSDataOutputStream out = null;
try { try {