HDFS-15661. The DeadNodeDetector should not be shared by different DFSClients. Contributed by Jinglun.
This commit is contained in:
parent
2a38ed0e0c
commit
f8769e0f4b
@ -40,10 +40,10 @@
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
import org.apache.hadoop.net.ScriptBasedMapping;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -119,8 +119,6 @@ public class ClientContext {
|
||||
private NodeBase clientNode;
|
||||
private boolean topologyResolutionEnabled;
|
||||
|
||||
private Daemon deadNodeDetectorThr = null;
|
||||
|
||||
/**
|
||||
* The switch to DeadNodeDetector.
|
||||
*/
|
||||
@ -130,12 +128,18 @@ public class ClientContext {
|
||||
* Detect the dead datanodes in advance, and share this information among all
|
||||
* the DFSInputStreams in the same client.
|
||||
*/
|
||||
private DeadNodeDetector deadNodeDetector = null;
|
||||
private volatile DeadNodeDetector deadNodeDetector = null;
|
||||
|
||||
/**
|
||||
* Count the reference of ClientContext.
|
||||
*/
|
||||
private int counter = 0;
|
||||
|
||||
/**
|
||||
* ShortCircuitCache array size.
|
||||
*/
|
||||
private final int clientShortCircuitNum;
|
||||
private Configuration configuration;
|
||||
|
||||
private ClientContext(String name, DfsClientConf conf,
|
||||
Configuration config) {
|
||||
@ -149,6 +153,7 @@ private ClientContext(String name, DfsClientConf conf,
|
||||
this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
|
||||
}
|
||||
|
||||
this.configuration = config;
|
||||
this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
|
||||
scConf.getSocketCacheExpiry());
|
||||
this.keyProviderCache = new KeyProviderCache(
|
||||
@ -159,11 +164,6 @@ private ClientContext(String name, DfsClientConf conf,
|
||||
this.byteArrayManager = ByteArrayManager.newInstance(
|
||||
conf.getWriteByteArrayManagerConf());
|
||||
this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
|
||||
if (deadNodeDetectionEnabled && deadNodeDetector == null) {
|
||||
deadNodeDetector = new DeadNodeDetector(name, config);
|
||||
deadNodeDetectorThr = new Daemon(deadNodeDetector);
|
||||
deadNodeDetectorThr.start();
|
||||
}
|
||||
initTopologyResolution(config);
|
||||
}
|
||||
|
||||
@ -201,6 +201,7 @@ public static ClientContext get(String name, DfsClientConf conf,
|
||||
context.printConfWarningIfNeeded(conf);
|
||||
}
|
||||
}
|
||||
context.reference();
|
||||
return context;
|
||||
}
|
||||
|
||||
@ -301,17 +302,33 @@ public DeadNodeDetector getDeadNodeDetector() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Close dead node detector thread.
|
||||
* Increment the counter. Start the dead node detector thread if there is no
|
||||
* reference.
|
||||
*/
|
||||
public void stopDeadNodeDetectorThread() {
|
||||
if (deadNodeDetectorThr != null) {
|
||||
deadNodeDetectorThr.interrupt();
|
||||
synchronized void reference() {
|
||||
counter++;
|
||||
if (deadNodeDetectionEnabled && deadNodeDetector == null) {
|
||||
deadNodeDetector = new DeadNodeDetector(name, configuration);
|
||||
deadNodeDetector.start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrement the counter. Close the dead node detector thread if there is no
|
||||
* reference.
|
||||
*/
|
||||
synchronized void unreference() {
|
||||
Preconditions.checkState(counter > 0);
|
||||
counter--;
|
||||
if (counter == 0 && deadNodeDetectionEnabled && deadNodeDetector != null) {
|
||||
deadNodeDetector.interrupt();
|
||||
try {
|
||||
deadNodeDetectorThr.join();
|
||||
deadNodeDetector.join();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Encountered exception while waiting to join on dead " +
|
||||
"node detector thread.", e);
|
||||
}
|
||||
deadNodeDetector = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -648,7 +648,7 @@ public synchronized void close() throws IOException {
|
||||
clientRunning = false;
|
||||
// close dead node detector thread
|
||||
if (!disabledStopDeadNodeDetectorThreadForTest) {
|
||||
clientContext.stopDeadNodeDetectorThread();
|
||||
clientContext.unreference();
|
||||
}
|
||||
|
||||
// close connections to the namenode
|
||||
@ -3441,4 +3441,11 @@ public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream,
|
||||
private boolean isDeadNodeDetectionEnabled() {
|
||||
return clientContext.isDeadNodeDetectionEnabled();
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain DeadNodeDetector of the current client.
|
||||
*/
|
||||
public DeadNodeDetector getDeadNodeDetector() {
|
||||
return clientContext.getDeadNodeDetector();
|
||||
}
|
||||
}
|
||||
|
@ -62,7 +62,7 @@
|
||||
* Detect the dead nodes in advance, and share this information among all the
|
||||
* DFSInputStreams in the same client.
|
||||
*/
|
||||
public class DeadNodeDetector implements Runnable {
|
||||
public class DeadNodeDetector extends Daemon {
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(DeadNodeDetector.class);
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
@ -43,6 +44,11 @@
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Tests for dead node detection in DFSClient.
|
||||
@ -320,6 +326,37 @@ public void testDeadNodeDetectionSuspectNode() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseDeadNodeDetector() throws Exception {
|
||||
DistributedFileSystem dfs0 = (DistributedFileSystem) FileSystem
|
||||
.newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
|
||||
DistributedFileSystem dfs1 = (DistributedFileSystem) FileSystem
|
||||
.newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
|
||||
// The DeadNodeDetector is shared by different DFSClients.
|
||||
DeadNodeDetector detector = dfs0.getClient().getDeadNodeDetector();
|
||||
assertNotNull(detector);
|
||||
assertSame(detector, dfs1.getClient().getDeadNodeDetector());
|
||||
// Close one client. The dead node detector should be alive.
|
||||
dfs0.close();
|
||||
detector = dfs0.getClient().getDeadNodeDetector();
|
||||
assertNotNull(detector);
|
||||
assertSame(detector, dfs1.getClient().getDeadNodeDetector());
|
||||
assertTrue(detector.isAlive());
|
||||
// Close all clients. The dead node detector should be closed.
|
||||
dfs1.close();
|
||||
detector = dfs0.getClient().getDeadNodeDetector();
|
||||
assertNull(detector);
|
||||
assertSame(detector, dfs1.getClient().getDeadNodeDetector());
|
||||
// Create a new client. The dead node detector should be alive.
|
||||
dfs1 = (DistributedFileSystem) FileSystem
|
||||
.newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
|
||||
DeadNodeDetector newDetector = dfs0.getClient().getDeadNodeDetector();
|
||||
assertNotNull(newDetector);
|
||||
assertTrue(newDetector.isAlive());
|
||||
assertNotSame(detector, newDetector);
|
||||
dfs1.close();
|
||||
}
|
||||
|
||||
private void createFile(FileSystem fs, Path filePath) throws IOException {
|
||||
FSDataOutputStream out = null;
|
||||
try {
|
||||
|
Loading…
Reference in New Issue
Block a user