HDFS-14649. Add suspect probe for DeadNodeDetector. Contributed by Lisheng Sun.

This commit is contained in:
Yiqun Lin 2019-11-27 10:57:20 +08:00
parent ef950b0863
commit c8bef4d6a6
4 changed files with 264 additions and 46 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -48,8 +49,14 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
/**
@ -83,13 +90,13 @@ public class DeadNodeDetector implements Runnable {
private final Map<String, DatanodeInfo> deadNodes;
/**
* Record dead nodes by one DFSInputStream. When dead node is not used by one
* DFSInputStream, remove it from dfsInputStreamNodes#DFSInputStream. If
* DFSInputStream does not include any dead node, remove DFSInputStream from
* dfsInputStreamNodes.
* Record suspect and dead nodes by one DFSInputStream. When node is not used
* by one DFSInputStream, remove it from suspectAndDeadNodes#DFSInputStream.
* If DFSInputStream does not include any node, remove DFSInputStream from
* suspectAndDeadNodes.
*/
private final Map<DFSInputStream, HashSet<DatanodeInfo>>
dfsInputStreamNodes;
suspectAndDeadNodes;
/**
* Datanodes that is being probed.
@ -107,11 +114,21 @@ public class DeadNodeDetector implements Runnable {
*/
private long deadNodeDetectInterval = 0;
/**
* Interval time in milliseconds for probing suspect node behavior.
*/
private long suspectNodeDetectInterval = 0;
/**
* The max queue size of probing dead node.
*/
private int maxDeadNodesProbeQueueLen = 0;
/**
* The max queue size of probing suspect node.
*/
private int maxSuspectNodesProbeQueueLen;
/**
* Connection timeout for probing dead node in milliseconds.
*/
@ -122,16 +139,31 @@ public class DeadNodeDetector implements Runnable {
*/
private Queue<DatanodeInfo> deadNodesProbeQueue;
/**
* The suspect node probe queue.
*/
private Queue<DatanodeInfo> suspectNodesProbeQueue;
/**
* The thread pool of probing dead node.
*/
private ExecutorService probeDeadNodesThreadPool;
/**
* The thread pool of probing suspect node.
*/
private ExecutorService probeSuspectNodesThreadPool;
/**
* The scheduler thread of probing dead node.
*/
private Thread probeDeadNodesSchedulerThr;
/**
* The scheduler thread of probing suspect node.
*/
private Thread probeSuspectNodesSchedulerThr;
/**
* The thread pool of probing datanodes' rpc request. Sometimes the data node
* can hang and not respond to the client in a short time. And these node will
@ -145,7 +177,7 @@ public class DeadNodeDetector implements Runnable {
* The type of probe.
*/
private enum ProbeType {
CHECK_DEAD
CHECK_DEAD, CHECK_SUSPECT
}
/**
@ -155,41 +187,61 @@ private enum State {
INIT, CHECK_DEAD, IDLE, ERROR
}
/**
* Disabled start probe suspect/dead thread for the testing.
*/
private static volatile boolean disabledProbeThreadForTest = false;
private State state;
public DeadNodeDetector(String name, Configuration conf) {
this.conf = new Configuration(conf);
this.deadNodes = new ConcurrentHashMap<String, DatanodeInfo>();
this.dfsInputStreamNodes =
this.suspectAndDeadNodes =
new ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>();
this.name = name;
deadNodeDetectInterval = conf.getLong(
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT);
suspectNodeDetectInterval = conf.getLong(
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT);
socketTimeout =
conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
maxDeadNodesProbeQueueLen =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT);
maxSuspectNodesProbeQueueLen =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT);
probeConnectionTimeoutMs = conf.getLong(
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT);
this.deadNodesProbeQueue =
new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen);
this.suspectNodesProbeQueue =
new ArrayBlockingQueue<DatanodeInfo>(maxSuspectNodesProbeQueueLen);
int deadNodeDetectDeadThreads =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT);
int suspectNodeDetectDeadThreads = conf.getInt(
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT);
int rpcThreads = conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT);
probeDeadNodesThreadPool = Executors.newFixedThreadPool(
deadNodeDetectDeadThreads, new Daemon.DaemonFactory());
probeSuspectNodesThreadPool = Executors.newFixedThreadPool(
suspectNodeDetectDeadThreads, new Daemon.DaemonFactory());
rpcThreadPool =
Executors.newFixedThreadPool(rpcThreads, new Daemon.DaemonFactory());
startProbeScheduler();
if (!disabledProbeThreadForTest) {
startProbeScheduler();
}
LOG.info("Start dead node detector for DFSClient {}.", this.name);
state = State.INIT;
@ -223,14 +275,25 @@ public void run() {
}
}
@VisibleForTesting
static void disabledProbeThreadForTest() {
disabledProbeThreadForTest = true;
}
/**
* Start probe dead node thread.
* Start probe dead node and suspect node thread.
*/
private void startProbeScheduler() {
@VisibleForTesting
void startProbeScheduler() {
probeDeadNodesSchedulerThr =
new Thread(new ProbeScheduler(this, ProbeType.CHECK_DEAD));
probeDeadNodesSchedulerThr.setDaemon(true);
probeDeadNodesSchedulerThr.start();
probeSuspectNodesSchedulerThr =
new Thread(new ProbeScheduler(this, ProbeType.CHECK_SUSPECT));
probeSuspectNodesSchedulerThr.setDaemon(true);
probeSuspectNodesSchedulerThr.start();
}
/**
@ -250,6 +313,15 @@ private void scheduleProbe(ProbeType type) {
Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_DEAD);
probeDeadNodesThreadPool.execute(probe);
}
} else if (type == ProbeType.CHECK_SUSPECT) {
while ((datanodeInfo = suspectNodesProbeQueue.poll()) != null) {
if (probeInProg.containsKey(datanodeInfo.getDatanodeUuid())) {
continue;
}
probeInProg.put(datanodeInfo.getDatanodeUuid(), datanodeInfo);
Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_SUSPECT);
probeSuspectNodesThreadPool.execute(probe);
}
}
}
@ -263,7 +335,7 @@ class Probe implements Runnable {
private ProbeType type;
Probe(DeadNodeDetector deadNodeDetector, DatanodeInfo datanodeInfo,
ProbeType type) {
ProbeType type) {
this.deadNodeDetector = deadNodeDetector;
this.datanodeInfo = datanodeInfo;
this.type = type;
@ -323,9 +395,19 @@ private void probeCallBack(Probe probe, boolean success) {
probeInProg.remove(probe.getDatanodeInfo().getDatanodeUuid());
if (success) {
if (probe.getType() == ProbeType.CHECK_DEAD) {
LOG.info("Remove the node out from dead node list: {}. ",
LOG.info("Remove the node out from dead node list: {}.",
probe.getDatanodeInfo());
removeNodeFromDeadNode(probe.getDatanodeInfo());
removeDeadNode(probe.getDatanodeInfo());
} else if (probe.getType() == ProbeType.CHECK_SUSPECT) {
LOG.debug("Remove the node out from suspect node list: {}.",
probe.getDatanodeInfo());
removeNodeFromDeadNodeDetector(probe.getDatanodeInfo());
}
} else {
if (probe.getType() == ProbeType.CHECK_SUSPECT) {
LOG.info("Add the node to dead node list: {}.",
probe.getDatanodeInfo());
addToDead(probe.getDatanodeInfo());
}
}
}
@ -381,34 +463,43 @@ public Queue<DatanodeInfo> getDeadNodesProbeQueue() {
return deadNodesProbeQueue;
}
public Queue<DatanodeInfo> getSuspectNodesProbeQueue() {
return suspectNodesProbeQueue;
}
/**
* Add datanode in deadNodes and dfsInputStreamNodes. The node is considered
* to dead node. The dead node is shared by all the DFSInputStreams in the
* same client.
* Add datanode to suspectNodes and suspectAndDeadNodes.
*/
public synchronized void addNodeToDetect(DFSInputStream dfsInputStream,
DatanodeInfo datanodeInfo) {
HashSet<DatanodeInfo> datanodeInfos =
dfsInputStreamNodes.get(dfsInputStream);
suspectAndDeadNodes.get(dfsInputStream);
if (datanodeInfos == null) {
datanodeInfos = new HashSet<DatanodeInfo>();
datanodeInfos.add(datanodeInfo);
dfsInputStreamNodes.putIfAbsent(dfsInputStream, datanodeInfos);
suspectAndDeadNodes.putIfAbsent(dfsInputStream, datanodeInfos);
} else {
datanodeInfos.add(datanodeInfo);
}
addToDead(datanodeInfo);
addSuspectNodeToDetect(datanodeInfo);
}
/**
* Remove dead node which is not used by any DFSInputStream from deadNodes.
* @return new dead node shared by all DFSInputStreams.
* Add datanode to suspectNodes.
*/
private boolean addSuspectNodeToDetect(DatanodeInfo datanodeInfo) {
return suspectNodesProbeQueue.offer(datanodeInfo);
}
/**
* Remove dead node which is not used by any DFSInputStream from deadNodes.
* @return new dead node shared by all DFSInputStreams.
*/
public synchronized Set<DatanodeInfo> clearAndGetDetectedDeadNodes() {
// remove the dead nodes who doesn't have any inputstream first
Set<DatanodeInfo> newDeadNodes = new HashSet<DatanodeInfo>();
for (HashSet<DatanodeInfo> datanodeInfos : dfsInputStreamNodes.values()) {
for (HashSet<DatanodeInfo> datanodeInfos : suspectAndDeadNodes.values()) {
newDeadNodes.addAll(datanodeInfos);
}
@ -421,34 +512,46 @@ public synchronized Set<DatanodeInfo> clearAndGetDetectedDeadNodes() {
}
/**
* Remove dead node from dfsInputStreamNodes#dfsInputStream. If
* dfsInputStreamNodes#dfsInputStream does not contain any dead node, remove
* it from dfsInputStreamNodes.
* Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and
* local deadNodes.
*/
public synchronized void removeNodeFromDeadNodeDetector(
DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) {
Set<DatanodeInfo> datanodeInfos = dfsInputStreamNodes.get(dfsInputStream);
Set<DatanodeInfo> datanodeInfos = suspectAndDeadNodes.get(dfsInputStream);
if (datanodeInfos != null) {
datanodeInfos.remove(datanodeInfo);
dfsInputStream.removeFromLocalDeadNodes(datanodeInfo);
if (datanodeInfos.isEmpty()) {
dfsInputStreamNodes.remove(dfsInputStream);
suspectAndDeadNodes.remove(dfsInputStream);
}
}
}
/**
* Remove dead node from dfsInputStreamNodes#dfsInputStream and deadNodes.
* Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and
* local deadNodes.
*/
public synchronized void removeNodeFromDeadNode(DatanodeInfo datanodeInfo) {
private synchronized void removeNodeFromDeadNodeDetector(
DatanodeInfo datanodeInfo) {
for (Map.Entry<DFSInputStream, HashSet<DatanodeInfo>> entry :
dfsInputStreamNodes.entrySet()) {
suspectAndDeadNodes.entrySet()) {
Set<DatanodeInfo> datanodeInfos = entry.getValue();
if (datanodeInfos.remove(datanodeInfo)) {
DFSInputStream dfsInputStream = entry.getKey();
dfsInputStream.removeFromLocalDeadNodes(datanodeInfo);
if (datanodeInfos.isEmpty()) {
suspectAndDeadNodes.remove(dfsInputStream);
}
}
}
}
/**
* Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and
* deadNodes.
*/
private void removeDeadNode(DatanodeInfo datanodeInfo) {
removeNodeFromDeadNodeDetector(datanodeInfo);
removeFromDead(datanodeInfo);
}
@ -476,7 +579,11 @@ static class ProbeScheduler implements Runnable {
public void run() {
while (true) {
deadNodeDetector.scheduleProbe(type);
probeSleep(deadNodeDetector.deadNodeDetectInterval);
if (type == ProbeType.CHECK_SUSPECT) {
probeSleep(deadNodeDetector.suspectNodeDetectInterval);
} else {
probeSleep(deadNodeDetector.deadNodeDetectInterval);
}
}
}
}

View File

@ -160,6 +160,10 @@ public interface HdfsClientConfigKeys {
"dfs.client.deadnode.detection.deadnode.queue.max";
int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100;
String DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY =
"dfs.client.deadnode.detection.suspectnode.queue.max";
int DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT = 1000;
String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY =
"dfs.client.deadnode.detection.probe.connection.timeout.ms";
long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT =
@ -169,6 +173,10 @@ public interface HdfsClientConfigKeys {
"dfs.client.deadnode.detection.probe.deadnode.threads";
int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT = 10;
String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY =
"dfs.client.deadnode.detection.probe.suspectnode.threads";
int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT = 10;
String DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY =
"dfs.client.deadnode.detection.rpc.threads";
int DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT = 20;
@ -178,6 +186,11 @@ public interface HdfsClientConfigKeys {
long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT =
60 * 1000; // 60s
String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY =
"dfs.client.deadnode.detection.probe.suspectnode.interval.ms";
long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT =
300; // 300ms
String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
"dfs.datanode.kerberos.principal";
String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";

View File

@ -3004,6 +3004,14 @@
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.suspectnode.queue.max</name>
<value>1000</value>
<description>
The max queue size of probing suspect node.
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.probe.deadnode.threads</name>
<value>10</value>
@ -3012,6 +3020,14 @@
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.probe.suspectnode.threads</name>
<value>10</value>
<description>
The maximum number of threads to use for probing suspect node.
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.rpc.threads</name>
<value>20</value>
@ -3028,6 +3044,14 @@
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.probe.suspectnode.interval.ms</name>
<value>300</value>
<description>
Interval time in milliseconds for probing suspect node behavior.
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.probe.connection.timeout.ms</name>
<value>20000</value>

View File

@ -35,6 +35,8 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
import static org.junit.Assert.assertEquals;
/**
@ -59,10 +61,15 @@ public void tearDown() {
}
@Test
public void testDeadNodeDetectionInBackground() throws IOException {
public void testDeadNodeDetectionInBackground() throws Exception {
conf = new HdfsConfiguration();
conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
1000);
conf.setLong(
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY, 100);
// We'll be using a 512 bytes block size just for tests
// so making sure the checksum bytes too match it.
// so making sure the checksum bytes match it too.
conf.setInt("io.bytes.per.checksum", 512);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
@ -91,22 +98,21 @@ public void testDeadNodeDetectionInBackground() throws IOException {
cluster.stopDataNode(0);
FSDataInputStream in = fs.open(filePath);
DFSInputStream din = null;
DFSClient dfsClient = null;
DFSInputStream din = (DFSInputStream) in.getWrappedStream();
DFSClient dfsClient = din.getDFSClient();
try {
try {
in.read();
} catch (BlockMissingException e) {
}
din = (DFSInputStream) in.getWrappedStream();
dfsClient = din.getDFSClient();
waitForDeadNode(dfsClient, 3);
assertEquals(3, dfsClient.getDeadNodes(din).size());
assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
} finally {
in.close();
fs.delete(new Path("/testDetectDeadNodeInBackground"), true);
fs.delete(filePath, true);
// check the dead node again here, the dead node is expected be removed
assertEquals(0, dfsClient.getDeadNodes(din).size());
assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
@ -119,7 +125,7 @@ public void testDeadNodeDetectionInMultipleDFSInputStream()
throws IOException {
conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
// We'll be using a 512 bytes block size just for tests
// so making sure the checksum bytes too match it.
// so making sure the checksum bytes match it too.
conf.setInt("io.bytes.per.checksum", 512);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
@ -178,7 +184,7 @@ public void testDeadNodeDetectionDeadNodeRecovery() throws Exception {
conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
1000);
// We'll be using a 512 bytes block size just for tests
// so making sure the checksum bytes too match it.
// so making sure the checksum bytes match it too.
conf.setInt("io.bytes.per.checksum", 512);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
@ -201,12 +207,13 @@ public void testDeadNodeDetectionDeadNodeRecovery() throws Exception {
} catch (BlockMissingException e) {
}
waitForDeadNode(dfsClient, 3);
assertEquals(3, dfsClient.getDeadNodes(din).size());
assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
cluster.restartDataNode(one, true);
waitForDeadNodeRecovery(din);
waitForDeadNode(dfsClient, 2);
assertEquals(2, dfsClient.getDeadNodes(din).size());
assertEquals(2, dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
@ -225,15 +232,14 @@ public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception {
conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
1000);
conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1);
// We'll be using a 512 bytes block size just for tests
// so making sure the checksum bytes too match it.
// so making sure the checksum bytes match it too.
conf.setInt("io.bytes.per.checksum", 512);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("testDeadNodeDetectionMaxDeadNodesProbeQueue");
Path filePath = new Path("/testDeadNodeDetectionMaxDeadNodesProbeQueue");
createFile(fs, filePath);
// Remove three DNs,
@ -260,6 +266,55 @@ public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception {
}
}
@Test
public void testDeadNodeDetectionSuspectNode() throws Exception {
conf = new HdfsConfiguration();
conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1);
// We'll be using a 512 bytes block size just for tests
// so making sure the checksum bytes match it too.
conf.setInt("io.bytes.per.checksum", 512);
DeadNodeDetector.disabledProbeThreadForTest();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/testDeadNodeDetectionSuspectNode");
createFile(fs, filePath);
MiniDFSCluster.DataNodeProperties one = cluster.stopDataNode(0);
FSDataInputStream in = fs.open(filePath);
DFSInputStream din = (DFSInputStream) in.getWrappedStream();
DFSClient dfsClient = din.getDFSClient();
DeadNodeDetector deadNodeDetector =
dfsClient.getClientContext().getDeadNodeDetector();
try {
try {
in.read();
} catch (BlockMissingException e) {
}
waitForSuspectNode(din.getDFSClient());
cluster.restartDataNode(one, true);
Assert.assertEquals(1,
deadNodeDetector.getSuspectNodesProbeQueue().size());
Assert.assertEquals(0,
deadNodeDetector.clearAndGetDetectedDeadNodes().size());
deadNodeDetector.startProbeScheduler();
Thread.sleep(1000);
Assert.assertEquals(0,
deadNodeDetector.getSuspectNodesProbeQueue().size());
Assert.assertEquals(0,
deadNodeDetector.clearAndGetDetectedDeadNodes().size());
} finally {
in.close();
deleteFile(fs, filePath);
assertEquals(0, dfsClient.getDeadNodes(din).size());
assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
}
}
private void createFile(FileSystem fs, Path filePath) throws IOException {
FSDataOutputStream out = null;
try {
@ -286,12 +341,31 @@ private void deleteFile(FileSystem fs, Path filePath) throws IOException {
fs.delete(filePath, true);
}
private void waitForDeadNodeRecovery(DFSInputStream din) throws Exception {
private void waitForDeadNode(DFSClient dfsClient, int size) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
if (din.getDFSClient().getDeadNodes(din).size() == 2) {
if (dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size() == size) {
return true;
}
} catch (Exception e) {
// Ignore the exception
}
return false;
}
}, 5000, 100000);
}
private void waitForSuspectNode(DFSClient dfsClient) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
if (dfsClient.getClientContext().getDeadNodeDetector()
.getSuspectNodesProbeQueue().size() > 0) {
return true;
}
} catch (Exception e) {